diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-09-21 12:04:15 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-09-21 12:04:15 +0200 |
| commit | 322d91db03a7d74a00ec623ce624f979c0b77c03 (patch) | |
| tree | 73201888564accde4cfa107f4ffdb15e9f93d45c /opendc-trace | |
| parent | 453c25c4b453fa0af26bebbd8863abfb79218119 (diff) | |
| parent | 68ef3700ed2f69bcf0118bb69eda71e6b1f4d54f (diff) | |
merge: Add support for trace writing
This pull request extends the trace API to support writing new traces.
- Unify columns of different tables
- Support column lookup via index
- Use index lookup in trace loader
- Add property for describing partition keys
- Simplify TraceFormat SPI interface
- Add support for writing traces
**Breaking API Changes**
- `TraceFormat` SPI interface has been redesigned.
Diffstat (limited to 'opendc-trace')
61 files changed, 1838 insertions, 2101 deletions
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt index 219002e0..f1977945 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt @@ -29,28 +29,40 @@ import java.time.Instant * Identifier of the resource. */ @JvmField -public val RESOURCE_ID: TableColumn<String> = stringColumn("resource:id") +public val RESOURCE_ID: TableColumn<String> = column("resource:id") + +/** + * The cluster to which the resource belongs. + */ +@JvmField +public val RESOURCE_CLUSTER_ID: TableColumn<String> = column("resource:cluster_id") /** * Start time for the resource. */ @JvmField -public val RESOURCE_START_TIME: TableColumn<Instant> = TableColumn("resource:start_time", Instant::class.java) +public val RESOURCE_START_TIME: TableColumn<Instant> = column("resource:start_time") /** * End time for the resource. */ @JvmField -public val RESOURCE_STOP_TIME: TableColumn<Instant> = TableColumn("resource:stop_time", Instant::class.java) +public val RESOURCE_STOP_TIME: TableColumn<Instant> = column("resource:stop_time") /** * Number of CPUs for the resource. */ @JvmField -public val RESOURCE_CPU_COUNT: TableColumn<Int> = intColumn("resource:cpu_count") +public val RESOURCE_CPU_COUNT: TableColumn<Int> = column("resource:cpu_count") + +/** + * Total CPU capacity of the resource in MHz. + */ +@JvmField +public val RESOURCE_CPU_CAPACITY: TableColumn<Double> = column("resource:cpu_capacity") /** * Memory capacity for the resource in KB. */ @JvmField -public val RESOURCE_MEM_CAPACITY: TableColumn<Double> = doubleColumn("resource:mem_capacity") +public val RESOURCE_MEM_CAPACITY: TableColumn<Double> = column("resource:mem_capacity") diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt index b683923b..44762da5 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt @@ -27,103 +27,73 @@ import java.time.Duration import java.time.Instant /** - * Identifier of the resource. - */ -@JvmField -public val RESOURCE_STATE_ID: TableColumn<String> = stringColumn("resource_state:id") - -/** - * The cluster to which the resource belongs. - */ -@JvmField -public val RESOURCE_STATE_CLUSTER_ID: TableColumn<String> = stringColumn("resource_state:cluster_id") - -/** * Timestamp for the state. */ @JvmField -public val RESOURCE_STATE_TIMESTAMP: TableColumn<Instant> = TableColumn("resource_state:timestamp", Instant::class.java) +public val RESOURCE_STATE_TIMESTAMP: TableColumn<Instant> = column("resource_state:timestamp") /** * Duration for the state. */ @JvmField -public val RESOURCE_STATE_DURATION: TableColumn<Duration> = TableColumn("resource_state:duration", Duration::class.java) +public val RESOURCE_STATE_DURATION: TableColumn<Duration> = column("resource_state:duration") /** * A flag to indicate that the resource is powered on. */ @JvmField -public val RESOURCE_STATE_POWERED_ON: TableColumn<Boolean> = booleanColumn("resource_state:powered_on") - -/** - * Number of CPUs for the resource. - */ -@JvmField -public val RESOURCE_STATE_CPU_COUNT: TableColumn<Int> = intColumn("resource_state:cpu_count") - -/** - * Total CPU capacity of the resource in MHz. - */ -@JvmField -public val RESOURCE_STATE_CPU_CAPACITY: TableColumn<Double> = doubleColumn("resource_state:cpu_capacity") +public val RESOURCE_STATE_POWERED_ON: TableColumn<Boolean> = column("resource_state:powered_on") /** * Total CPU usage of the resource in MHz. */ @JvmField -public val RESOURCE_STATE_CPU_USAGE: TableColumn<Double> = doubleColumn("resource_state:cpu_usage") +public val RESOURCE_STATE_CPU_USAGE: TableColumn<Double> = column("resource_state:cpu_usage") /** * Total CPU usage of the resource in percentage. */ @JvmField -public val RESOURCE_STATE_CPU_USAGE_PCT: TableColumn<Double> = doubleColumn("resource_state:cpu_usage_pct") +public val RESOURCE_STATE_CPU_USAGE_PCT: TableColumn<Double> = column("resource_state:cpu_usage_pct") /** * Total CPU demand of the resource in MHz. */ @JvmField -public val RESOURCE_STATE_CPU_DEMAND: TableColumn<Double> = doubleColumn("resource_state:cpu_demand") +public val RESOURCE_STATE_CPU_DEMAND: TableColumn<Double> = column("resource_state:cpu_demand") /** * CPU ready percentage. */ @JvmField -public val RESOURCE_STATE_CPU_READY_PCT: TableColumn<Double> = doubleColumn("resource_state:cpu_ready_pct") - -/** - * Memory capacity of the resource in KB. - */ -@JvmField -public val RESOURCE_STATE_MEM_CAPACITY: TableColumn<Double> = doubleColumn("resource_state:mem_capacity") +public val RESOURCE_STATE_CPU_READY_PCT: TableColumn<Double> = column("resource_state:cpu_ready_pct") /** * Memory usage of the resource in KB. */ @JvmField -public val RESOURCE_STATE_MEM_USAGE: TableColumn<Double> = doubleColumn("resource_state:mem_usage") +public val RESOURCE_STATE_MEM_USAGE: TableColumn<Double> = column("resource_state:mem_usage") /** * Disk read throughput of the resource in KB/s. */ @JvmField -public val RESOURCE_STATE_DISK_READ: TableColumn<Double> = doubleColumn("resource_state:disk_read") +public val RESOURCE_STATE_DISK_READ: TableColumn<Double> = column("resource_state:disk_read") /** * Disk write throughput of the resource in KB/s. */ @JvmField -public val RESOURCE_STATE_DISK_WRITE: TableColumn<Double> = doubleColumn("resource_state:disk_write") +public val RESOURCE_STATE_DISK_WRITE: TableColumn<Double> = column("resource_state:disk_write") /** * Network receive throughput of the resource in KB/s. */ @JvmField -public val RESOURCE_STATE_NET_RX: TableColumn<Double> = doubleColumn("resource_state:net_rx") +public val RESOURCE_STATE_NET_RX: TableColumn<Double> = column("resource_state:net_rx") /** * Network transmit throughput of the resource in KB/s. */ @JvmField -public val RESOURCE_STATE_NET_TX: TableColumn<Double> = doubleColumn("resource_state:net_tx") +public val RESOURCE_STATE_NET_TX: TableColumn<Double> = column("resource_state:net_tx") diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt index 6aca2051..b0181cbc 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt @@ -32,14 +32,14 @@ public interface Table { public val name: String /** - * A flag to indicate that the table is synthetic (derived from another table). + * The list of columns supported in this table. */ - public val isSynthetic: Boolean + public val columns: List<TableColumn<*>> /** - * The list of columns supported in this table. + * The columns by which the table is partitioned. */ - public val columns: List<TableColumn<*>> + public val partitionKeys: List<TableColumn<*>> /** * Open a [TableReader] for this table. @@ -47,7 +47,9 @@ public interface Table { public fun newReader(): TableReader /** - * Open a [TableReader] for [partition] of the table. + * Open a [TableWriter] for this table. + * + * @throws UnsupportedOperationException if writing is not supported by the table. */ - public fun newReader(partition: String): TableReader + public fun newWriter(): TableWriter } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumns.kt index 64920498..31a58360 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumns.kt @@ -24,36 +24,11 @@ package org.opendc.trace /** - * Construct a [TableColumn] with [Any] type. + * Construct a [TableColumn] with the specified [name] and type [T]. */ -public fun objectColumn(name: String): TableColumn<Any> = TableColumn(name, Any::class.java) +public inline fun <reified T> column(name: String): TableColumn<T> = column(name, T::class.java) /** - * Construct a [TableColumn] with a [String] type. + * Construct a [TableColumn] with the specified [name] and [type]. */ -public fun stringColumn(name: String): TableColumn<String> = TableColumn(name, String::class.java) - -/** - * Construct a [TableColumn] with a [Number] type. - */ -public fun numberColumn(name: String): TableColumn<Number> = TableColumn(name, Number::class.java) - -/** - * Construct a [TableColumn] with an [Int] type. - */ -public fun intColumn(name: String): TableColumn<Int> = TableColumn(name, Int::class.java) - -/** - * Construct a [TableColumn] with a [Long] type. - */ -public fun longColumn(name: String): TableColumn<Long> = TableColumn(name, Long::class.java) - -/** - * Construct a [TableColumn] with a [Double] type. - */ -public fun doubleColumn(name: String): TableColumn<Double> = TableColumn(name, Double::class.java) - -/** - * Construct a [TableColumn] with a [Boolean] type. - */ -public fun booleanColumn(name: String): TableColumn<Boolean> = TableColumn(name, Boolean::class.java) +public fun <T> column(name: String, type: Class<T>): TableColumn<T> = TableColumn(name, type) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt index b5e7669f..8a796e6c 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt @@ -34,34 +34,129 @@ public interface TableReader : AutoCloseable { public fun nextRow(): Boolean /** + * Resolve the index of the specified [column] for this reader. + * + * @param column The column to lookup. + * @return The zero-based index of the column or a negative value if the column is not present in this table. + */ + public fun resolve(column: TableColumn<*>): Int + + /** * Determine whether the [TableReader] supports the specified [column]. */ - public fun hasColumn(column: TableColumn<*>): Boolean + public fun hasColumn(column: TableColumn<*>): Boolean = resolve(column) >= 0 + + /** + * Determine whether the specified [column] has a `null` value for the current row. + * + * @param index The zero-based index of the column to check for a null value. + * @throws IllegalArgumentException if the column index is not valid for this reader. + * @return `true` if the column value for the current value has a `null` value, `false` otherwise. + */ + public fun isNull(index: Int): Boolean + + /** + * Obtain the object value of the column with the specified [index]. + * + * @param index The zero-based index of the column to obtain the value for. + * @throws IllegalArgumentException if the column index is not valid for this reader. + * @return The object value of the column. + */ + public fun get(index: Int): Any? + + /** + * Obtain the boolean value of the column with the specified [index]. + * + * @param index The zero-based index of the column to obtain the value for. + * @throws IllegalArgumentException if the column index is not valid for this reader. + * @return The boolean value of the column or `false` if the column is `null`. + */ + public fun getBoolean(index: Int): Boolean + + /** + * Obtain the integer value of the column with the specified [index]. + * + * @param index The zero-based index of the column to obtain the value for. + * @throws IllegalArgumentException if the column index is not valid for this reader. + * @return The integer value of the column or `0` if the column is `null`. + */ + public fun getInt(index: Int): Int + + /** + * Obtain the double value of the column with the specified [index]. + * + * @param index The zero-based index of the column to obtain the value for. + * @throws IllegalArgumentException if the column index is not valid for this reader. + * @return The long value of the column or `0` if the column is `null`. + */ + public fun getLong(index: Int): Long + + /** + * Obtain the double value of the column with the specified [index]. + * + * @param index The zero-based index of the column to obtain the value for. + * @throws IllegalArgumentException if the column index is not valid for this reader. + * @return The double value of the column or [Double.NaN] if the column is `null`. + */ + public fun getDouble(index: Int): Double + + /** + * Determine whether the specified [column] has a `null` value for the current row. + * + * @param column The column to lookup. + * @throws IllegalArgumentException if the column is not valid for this table. + * @return `true` if the column value for the current value has a `null` value, `false` otherwise. + */ + public fun isNull(column: TableColumn<*>): Boolean = isNull(resolve(column)) /** * Obtain the value of the current column with type [T]. + * + * @param column The column to obtain the value for. + * @throws IllegalArgumentException if the column is not valid for this reader. + * @return The object value of the column. */ - public fun <T> get(column: TableColumn<T>): T + public fun <T> get(column: TableColumn<T>): T { + // This cast should always succeed since the resolve the index of the typed column + @Suppress("UNCHECKED_CAST") + return get(resolve(column)) as T + } /** * Read the specified [column] as boolean. + * + * @param column The column to obtain the value for. + * @throws IllegalArgumentException if the column is not valid for this reader. + * @return The boolean value of the column or `false` if the column is `null`. */ - public fun getBoolean(column: TableColumn<Boolean>): Boolean + public fun getBoolean(column: TableColumn<Boolean>): Boolean = getBoolean(resolve(column)) /** * Read the specified [column] as integer. + * + * @param column The column to obtain the value for. + * @throws IllegalArgumentException if the column is not valid for this reader. + * @return The integer value of the column or `0` if the column is `null`. */ - public fun getInt(column: TableColumn<Int>): Int + public fun getInt(column: TableColumn<Int>): Int = getInt(resolve(column)) /** * Read the specified [column] as long. + * + * @param column The column to obtain the value for. + * @throws IllegalArgumentException if the column is not valid for this reader. + * @return The long value of the column or `0` if the column is `null`. */ - public fun getLong(column: TableColumn<Long>): Long + public fun getLong(column: TableColumn<Long>): Long = getLong(resolve(column)) /** * Read the specified [column] as double. + * + * @param column The column to obtain the value for. + * @throws IllegalArgumentException if the column is not valid for this reader. + * @return The double value of the column or [Double.NaN] if the column is `null`. */ - public fun getDouble(column: TableColumn<Double>): Double + public fun getDouble(column: TableColumn<Double>): Double = getDouble(resolve(column)) /** * Closes the reader so that no further iteration or data access can be made. diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt new file mode 100644 index 00000000..423ce86a --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt @@ -0,0 +1,151 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace + +/** + * Base class for writing workload traces. + */ +public interface TableWriter : AutoCloseable { + /** + * Start a new row in the table. + */ + public fun startRow() + + /** + * Flush the current row to the table. + */ + public fun endRow() + + /** + * Resolve the index of the specified [column] for this writer. + * + * @param column The column to lookup. + * @return The zero-based index of the column or a negative value if the column is not present in this table. + */ + public fun resolve(column: TableColumn<*>): Int + + /** + * Determine whether the [TableReader] supports the specified [column]. + */ + public fun hasColumn(column: TableColumn<*>): Boolean = resolve(column) >= 0 + + /** + * Set [column] to [value]. + * + * @param index The zero-based index of the column to set the value for. + * @param value The value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun set(index: Int, value: Any) + + /** + * Set [column] to boolean [value]. + * + * @param index The zero-based index of the column to set the value for. + * @param value The boolean value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setBoolean(index: Int, value: Boolean) + + /** + * Set [column] to integer [value]. + * + * @param index The zero-based index of the column to set the value for. + * @param value The integer value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setInt(index: Int, value: Int) + + /** + * Set [column] to long [value]. + * + * @param index The zero-based index of the column to set the value for. + * @param value The long value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setLong(index: Int, value: Long) + + /** + * Set [column] to double [value]. + * + * @param index The zero-based index of the column to set the value for. + * @param value The double value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setDouble(index: Int, value: Double) + + /** + * Set [column] to [value]. + * + * @param column The column to set the value for. + * @param value The value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun <T : Any> set(column: TableColumn<T>, value: T): Unit = set(resolve(column), value) + + /** + * Set [column] to boolean [value]. + * + * @param column The column to set the value for. + * @param value The boolean value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setBoolean(column: TableColumn<Boolean>, value: Boolean): Unit = setBoolean(resolve(column), value) + + /** + * Set [column] to integer [value]. + * + * @param column The column to set the value for. + * @param value The integer value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setInt(column: TableColumn<Int>, value: Int): Unit = setInt(resolve(column), value) + + /** + * Set [column] to long [value]. + * + * @param column The column to set the value for. + * @param value The long value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setLong(column: TableColumn<Long>, value: Long): Unit = setLong(resolve(column), value) + + /** + * Set [column] to double [value]. + * + * @param column The column to set the value for. + * @param value The double value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setDouble(column: TableColumn<Double>, value: Double): Unit = setDouble(resolve(column), value) + + /** + * Flush any buffered content to the underlying target. + */ + public fun flush() + + /** + * Close the writer so that no more rows can be written. + */ + public override fun close() +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt index 46920dce..d103bce4 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt @@ -30,72 +30,70 @@ import java.time.Instant * A column containing the task identifier. */ @JvmField -public val TASK_ID: TableColumn<String> = stringColumn("task:id") +public val TASK_ID: TableColumn<String> = column("task:id") /** * A column containing the identifier of the workflow. */ @JvmField -public val TASK_WORKFLOW_ID: TableColumn<String> = stringColumn("task:workflow_id") +public val TASK_WORKFLOW_ID: TableColumn<String> = column("task:workflow_id") /** - * A column containing the submit time of the task. + * A column containing the submission time of the task. */ @JvmField -public val TASK_SUBMIT_TIME: TableColumn<Instant> = TableColumn("task:submit_time", type = Instant::class.java) +public val TASK_SUBMIT_TIME: TableColumn<Instant> = column("task:submit_time") /** * A column containing the wait time of the task. */ @JvmField -public val TASK_WAIT_TIME: TableColumn<Instant> = TableColumn("task:wait_time", type = Instant::class.java) +public val TASK_WAIT_TIME: TableColumn<Instant> = column("task:wait_time") /** * A column containing the runtime time of the task. */ @JvmField -public val TASK_RUNTIME: TableColumn<Duration> = TableColumn("task:runtime", type = Duration::class.java) +public val TASK_RUNTIME: TableColumn<Duration> = column("task:runtime") /** * A column containing the parents of a task. */ -@Suppress("UNCHECKED_CAST") @JvmField -public val TASK_PARENTS: TableColumn<Set<String>> = TableColumn("task:parents", type = Set::class.java as Class<Set<String>>) +public val TASK_PARENTS: TableColumn<Set<String>> = column("task:parents") /** * A column containing the children of a task. */ -@Suppress("UNCHECKED_CAST") @JvmField -public val TASK_CHILDREN: TableColumn<Set<String>> = TableColumn("task:children", type = Set::class.java as Class<Set<String>>) +public val TASK_CHILDREN: TableColumn<Set<String>> = column("task:children") /** * A column containing the requested CPUs of a task. */ @JvmField -public val TASK_REQ_NCPUS: TableColumn<Int> = intColumn("task:req_ncpus") +public val TASK_REQ_NCPUS: TableColumn<Int> = column("task:req_ncpus") /** * A column containing the allocated CPUs of a task. */ @JvmField -public val TASK_ALLOC_NCPUS: TableColumn<Int> = intColumn("task:alloc_ncpus") +public val TASK_ALLOC_NCPUS: TableColumn<Int> = column("task:alloc_ncpus") /** * A column containing the status of a task. */ @JvmField -public val TASK_STATUS: TableColumn<Int> = intColumn("task:status") +public val TASK_STATUS: TableColumn<Int> = column("task:status") /** * A column containing the group id of a task. */ @JvmField -public val TASK_GROUP_ID: TableColumn<Int> = intColumn("task:group_id") +public val TASK_GROUP_ID: TableColumn<Int> = column("task:group_id") /** * A column containing the user id of a task. */ @JvmField -public val TASK_USER_ID: TableColumn<Int> = intColumn("task:user_id") +public val TASK_USER_ID: TableColumn<Int> = column("task:user_id") diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt index 0ae45e86..64e8f272 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt @@ -22,9 +22,9 @@ package org.opendc.trace +import org.opendc.trace.internal.TraceImpl import org.opendc.trace.spi.TraceFormat import java.io.File -import java.net.URL import java.nio.file.Path /** @@ -48,31 +48,48 @@ public interface Trace { public companion object { /** - * Open a [Trace] at the specified [url] in the given [format]. + * Open a [Trace] at the specified [path] in the given [format]. * + * @param path The path to the trace. + * @param format The format of the trace to open. * @throws IllegalArgumentException if [format] is not supported. */ - public fun open(url: URL, format: String): Trace { - val provider = requireNotNull(TraceFormat.byName(format)) { "Unknown format $format" } - return provider.open(url) - } + @JvmStatic + public fun open(path: File, format: String): Trace = open(path.toPath(), format) /** * Open a [Trace] at the specified [path] in the given [format]. * + * @param path The [Path] to the trace. + * @param format The format of the trace to open. * @throws IllegalArgumentException if [format] is not supported. */ - public fun open(path: File, format: String): Trace { - return open(path.toURI().toURL(), format) + @JvmStatic + public fun open(path: Path, format: String): Trace { + val provider = requireNotNull(TraceFormat.byName(format)) { "Unknown format $format" } + return TraceImpl(provider, path) } /** - * Open a [Trace] at the specified [path] in the given [format]. + * Create a [Trace] at the specified [path] in the given [format]. * - * @throws IllegalArgumentException if [format] is not supported. + * @param path The [Path] to the trace. + * @param format The format of the trace to create. */ - public fun open(path: Path, format: String): Trace { - return open(path.toUri().toURL(), format) + @JvmStatic + public fun create(path: File, format: String): Trace = create(path.toPath(), format) + + /** + * Create a [Trace] at the specified [path] in the given [format]. + * + * @param path The [Path] to the trace. + * @param format The format of the trace to create. + */ + @JvmStatic + public fun create(path: Path, format: String): Trace { + val provider = requireNotNull(TraceFormat.byName(format)) { "Unknown format $format" } + provider.create(path) + return TraceImpl(provider, path) } } } diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt index a755a107..24551edb 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt @@ -20,28 +20,36 @@ * SOFTWARE. */ -package org.opendc.trace.wtf +package org.opendc.trace.internal -import org.opendc.trace.TABLE_TASKS import org.opendc.trace.Table -import org.opendc.trace.Trace -import java.nio.file.Path +import org.opendc.trace.TableColumn +import org.opendc.trace.TableReader +import org.opendc.trace.TableWriter +import java.util.* /** - * [Trace] implementation for the WTF format. + * Internal implementation of [Table]. */ -public class WtfTrace internal constructor(private val path: Path) : Trace { - override val tables: List<String> = listOf(TABLE_TASKS) +internal class TableImpl(val trace: TraceImpl, override val name: String) : Table { + /** + * The details of this table. + */ + private val details = trace.format.getDetails(trace.path, name) - override fun containsTable(name: String): Boolean = TABLE_TASKS == name + override val columns: List<TableColumn<*>> + get() = details.columns - override fun getTable(name: String): Table? { - if (!containsTable(name)) { - return null - } + override val partitionKeys: List<TableColumn<*>> + get() = details.partitionKeys - return WtfTaskTable(path) - } + override fun newReader(): TableReader = trace.format.newReader(trace.path, name) - override fun toString(): String = "WtfTrace[$path]" + override fun newWriter(): TableWriter = trace.format.newWriter(trace.path, name) + + override fun toString(): String = "Table[name=$name]" + + override fun hashCode(): Int = Objects.hash(trace, name) + + override fun equals(other: Any?): Boolean = other is TableImpl && trace == other.trace && name == other.name } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTrace.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TraceImpl.kt index 3e5029b4..fd9536ab 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTrace.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TraceImpl.kt @@ -20,30 +20,37 @@ * SOFTWARE. */ -package org.opendc.trace.opendc +package org.opendc.trace.internal -import org.opendc.trace.TABLE_RESOURCES -import org.opendc.trace.TABLE_RESOURCE_STATES import org.opendc.trace.Table import org.opendc.trace.Trace +import org.opendc.trace.spi.TraceFormat import java.nio.file.Path +import java.util.* +import java.util.concurrent.ConcurrentHashMap /** - * A [Trace] in the OpenDC virtual machine trace format. + * Internal implementation of the [Trace] interface. */ -public class OdcVmTrace internal constructor(private val path: Path) : Trace { - override val tables: List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) +internal class TraceImpl(val format: TraceFormat, val path: Path) : Trace { + /** + * A map containing the [TableImpl] instances associated with the trace. + */ + private val tableMap = ConcurrentHashMap<String, TableImpl>() - override fun containsTable(name: String): Boolean = - name == TABLE_RESOURCES || name == TABLE_RESOURCE_STATES + override val tables: List<String> = format.getTables(path) - override fun getTable(name: String): Table? { - return when (name) { - TABLE_RESOURCES -> OdcVmResourceTable(path) - TABLE_RESOURCE_STATES -> OdcVmResourceStateTable(path) - else -> null + init { + for (table in tables) { + tableMap.computeIfAbsent(table) { TableImpl(this, it) } } } - override fun toString(): String = "OdcVmTrace[$path]" + override fun containsTable(name: String): Boolean = tableMap.containsKey(name) + + override fun getTable(name: String): Table? = tableMap[name] + + override fun hashCode(): Int = Objects.hash(format, path) + + override fun equals(other: Any?): Boolean = other is TraceImpl && format == other.format && path == other.path } diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTrace.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TableDetails.kt index d4da735e..1a9b9ee1 100644 --- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTrace.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TableDetails.kt @@ -20,27 +20,18 @@ * SOFTWARE. */ -package org.opendc.trace.swf +package org.opendc.trace.spi -import org.opendc.trace.TABLE_TASKS import org.opendc.trace.Table -import org.opendc.trace.Trace -import java.nio.file.Path +import org.opendc.trace.TableColumn /** - * [Trace] implementation for the SWF format. + * A class used by the [TraceFormat] interface for describing the metadata of a [Table]. + * + * @param columns The available columns in the table. + * @param partitionKeys The table columns that act as partition keys for the table. */ -public class SwfTrace internal constructor(private val path: Path) : Trace { - override val tables: List<String> = listOf(TABLE_TASKS) - - override fun containsTable(name: String): Boolean = TABLE_TASKS == name - - override fun getTable(name: String): Table? { - if (!containsTable(name)) { - return null - } - return SwfTaskTable(path) - } - - override fun toString(): String = "SwfTrace[$path]" -} +public data class TableDetails( + val columns: List<TableColumn<*>>, + val partitionKeys: List<TableColumn<*>> = emptyList() +) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt index 54029fcf..f2e610db 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt @@ -22,8 +22,9 @@ package org.opendc.trace.spi -import org.opendc.trace.Trace -import java.net.URL +import org.opendc.trace.TableReader +import org.opendc.trace.TableWriter +import java.nio.file.Path import java.util.* /** @@ -36,11 +37,52 @@ public interface TraceFormat { public val name: String /** - * Open a new [Trace] with this provider. + * Construct an empty trace at [path]. * - * @param url A reference to the trace. + * @param path The path where to create the empty trace. + * @throws IllegalArgumentException If [path] is invalid. + * @throws UnsupportedOperationException If the table does not support trace creation. */ - public fun open(url: URL): Trace + public fun create(path: Path) + + /** + * Return the name of the tables available in the trace at the specified [path]. + * + * @param path The path to the trace. + * @return The list of tables available in the trace. + */ + public fun getTables(path: Path): List<String> + + /** + * Return the details of [table] in the trace at the specified [path]. + * + * @param path The path to the trace. + * @param table The name of the table to obtain the details for. + * @throws IllegalArgumentException If [table] does not exist. + * @return The [TableDetails] for the specified [table]. + */ + public fun getDetails(path: Path, table: String): TableDetails + + /** + * Open a [TableReader] for the specified [table]. + * + * @param path The path to the trace to open. + * @param table The name of the table to open a [TableReader] for. + * @throws IllegalArgumentException If [table] does not exist. + * @return A [TableReader] instance for the table. + */ + public fun newReader(path: Path, table: String): TableReader + + /** + * Open a [TableWriter] for the specified [table]. + * + * @param path The path to the trace to open. + * @param table The name of the table to open a [TableWriter] for. + * @throws IllegalArgumentException If [table] does not exist. + * @throws UnsupportedOperationException If the format does not support writing. + * @return A [TableWriter] instance for the table. + */ + public fun newWriter(path: Path, table: String): TableWriter /** * A helper object for resolving providers. @@ -49,6 +91,7 @@ public interface TraceFormat { /** * A list of [TraceFormat] that are available on this system. */ + @JvmStatic public val installedProviders: List<TraceFormat> by lazy { val loader = ServiceLoader.load(TraceFormat::class.java) loader.toList() @@ -57,6 +100,7 @@ public interface TraceFormat { /** * Obtain a [TraceFormat] implementation by [name]. */ + @JvmStatic public fun byName(name: String): TraceFormat? = installedProviders.find { it.name == name } } } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/CompositeTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/CompositeTableReader.kt new file mode 100644 index 00000000..dafc0798 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/CompositeTableReader.kt @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.util + +import org.opendc.trace.TableColumn +import org.opendc.trace.TableReader + +/** + * A helper class to chain multiple [TableReader]s. + */ +public abstract class CompositeTableReader : TableReader { + /** + * A flag to indicate that the reader has starting, meaning the user called [nextRow] at least once + * (and in turn [nextReader]). + */ + private var hasStarted = false + + /** + * The active [TableReader] instance. + */ + private var delegate: TableReader? = null + + /** + * Obtain the next [TableReader] instance to read from or `null` if there are no more readers to read from. + */ + protected abstract fun nextReader(): TableReader? + + override fun nextRow(): Boolean { + if (!hasStarted) { + assert(delegate == null) { "Duplicate initialization" } + delegate = nextReader() + hasStarted = true + } + + var delegate = delegate + + while (delegate != null) { + if (delegate.nextRow()) { + break + } + + delegate.close() + delegate = nextReader() + this.delegate = delegate + } + + return delegate != null + } + + override fun resolve(column: TableColumn<*>): Int { + val delegate = delegate + return delegate?.resolve(column) ?: -1 + } + + override fun isNull(index: Int): Boolean { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.isNull(index) + } + + override fun get(index: Int): Any? { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.get(index) + } + + override fun getBoolean(index: Int): Boolean { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getBoolean(index) + } + + override fun getInt(index: Int): Int { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getInt(index) + } + + override fun getLong(index: Int): Long { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getLong(index) + } + + override fun getDouble(index: Int): Double { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getDouble(index) + } + + override fun close() { + delegate?.close() + } + + override fun toString(): String = "CompositeTableReader" +} diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt deleted file mode 100644 index 84c9b347..00000000 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.azure - -import com.fasterxml.jackson.dataformat.csv.CsvFactory -import org.opendc.trace.* -import java.nio.file.Files -import java.nio.file.Path -import java.util.stream.Collectors -import kotlin.io.path.extension -import kotlin.io.path.nameWithoutExtension - -/** - * The resource state [Table] for the Azure v1 VM traces. - */ -internal class AzureResourceStateTable(private val factory: CsvFactory, path: Path) : Table { - /** - * The partitions that belong to the table. - */ - private val partitions = Files.walk(path.resolve("vm_cpu_readings"), 1) - .filter { !Files.isDirectory(it) && it.extension == "csv" } - .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) - .toSortedMap() - - override val name: String = TABLE_RESOURCE_STATES - - override val isSynthetic: Boolean = false - - override val columns: List<TableColumn<*>> = listOf( - RESOURCE_STATE_ID, - RESOURCE_STATE_TIMESTAMP, - RESOURCE_STATE_CPU_USAGE_PCT - ) - - override fun newReader(): TableReader { - val it = partitions.iterator() - - return object : TableReader { - var delegate: TableReader? = nextDelegate() - - override fun nextRow(): Boolean { - var delegate = delegate - - while (delegate != null) { - if (delegate.nextRow()) { - break - } - - delegate.close() - delegate = nextDelegate() - this.delegate = delegate - } - - return delegate != null - } - - override fun hasColumn(column: TableColumn<*>): Boolean = delegate?.hasColumn(column) ?: false - - override fun <T> get(column: TableColumn<T>): T { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.get(column) - } - - override fun getBoolean(column: TableColumn<Boolean>): Boolean { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.getBoolean(column) - } - - override fun getInt(column: TableColumn<Int>): Int { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.getInt(column) - } - - override fun getLong(column: TableColumn<Long>): Long { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.getLong(column) - } - - override fun getDouble(column: TableColumn<Double>): Double { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.getDouble(column) - } - - override fun close() { - delegate?.close() - } - - private fun nextDelegate(): TableReader? { - return if (it.hasNext()) { - val (_, path) = it.next() - return AzureResourceStateTableReader(factory.createParser(path.toFile())) - } else { - null - } - } - - override fun toString(): String = "AzureCompositeTableReader" - } - } - - override fun newReader(partition: String): TableReader { - val path = requireNotNull(partitions[partition]) { "Invalid partition $partition" } - return AzureResourceStateTableReader(factory.createParser(path.toFile())) - } - - override fun toString(): String = "AzureResourceStateTable" -} diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt index c17a17ab..da8181fe 100644 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt @@ -60,42 +60,37 @@ internal class AzureResourceStateTableReader(private val parser: CsvParser) : Ta return true } - override fun hasColumn(column: TableColumn<*>): Boolean { - return when (column) { - RESOURCE_STATE_ID -> true - RESOURCE_STATE_TIMESTAMP -> true - RESOURCE_STATE_CPU_USAGE_PCT -> true - else -> false - } + override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + + override fun isNull(index: Int): Boolean { + require(index in 0..columns.size) { "Invalid column index" } + return false } - override fun <T> get(column: TableColumn<T>): T { - val res: Any? = when (column) { - RESOURCE_STATE_ID -> id - RESOURCE_STATE_TIMESTAMP -> timestamp - RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct - else -> throw IllegalArgumentException("Invalid column") + override fun get(index: Int): Any? { + return when (index) { + COL_ID -> id + COL_TIMESTAMP -> timestamp + COL_CPU_USAGE_PCT -> cpuUsagePct + else -> throw IllegalArgumentException("Invalid column index") } - - @Suppress("UNCHECKED_CAST") - return res as T } - override fun getBoolean(column: TableColumn<Boolean>): Boolean { + override fun getBoolean(index: Int): Boolean { throw IllegalArgumentException("Invalid column") } - override fun getInt(column: TableColumn<Int>): Int { + override fun getInt(index: Int): Int { throw IllegalArgumentException("Invalid column") } - override fun getLong(column: TableColumn<Long>): Long { + override fun getLong(index: Int): Long { throw IllegalArgumentException("Invalid column") } - override fun getDouble(column: TableColumn<Double>): Double { - return when (column) { - RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct + override fun getDouble(index: Int): Double { + return when (index) { + COL_CPU_USAGE_PCT -> cpuUsagePct else -> throw IllegalArgumentException("Invalid column") } } @@ -133,6 +128,15 @@ internal class AzureResourceStateTableReader(private val parser: CsvParser) : Ta cpuUsagePct = Double.NaN } + private val COL_ID = 0 + private val COL_TIMESTAMP = 1 + private val COL_CPU_USAGE_PCT = 2 + private val columns = mapOf( + RESOURCE_ID to COL_ID, + RESOURCE_STATE_TIMESTAMP to COL_TIMESTAMP, + RESOURCE_STATE_CPU_USAGE_PCT to COL_CPU_USAGE_PCT + ) + companion object { /** * The [CsvSchema] that is used to parse the trace. diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTable.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTable.kt deleted file mode 100644 index 96ee3158..00000000 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTable.kt +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.azure - -import com.fasterxml.jackson.dataformat.csv.CsvFactory -import org.opendc.trace.* -import java.nio.file.Path - -/** - * The resource [Table] for the Azure v1 VM traces. - */ -internal class AzureResourceTable(private val factory: CsvFactory, private val path: Path) : Table { - override val name: String = TABLE_RESOURCES - - override val isSynthetic: Boolean = false - - override val columns: List<TableColumn<*>> = listOf( - RESOURCE_ID, - RESOURCE_START_TIME, - RESOURCE_STOP_TIME, - RESOURCE_CPU_COUNT, - RESOURCE_MEM_CAPACITY - ) - - override fun newReader(): TableReader { - return AzureResourceTableReader(factory.createParser(path.resolve("vmtable/vmtable.csv").toFile())) - } - - override fun newReader(partition: String): TableReader { - throw IllegalArgumentException("No partition $partition") - } - - override fun toString(): String = "AzureResourceTable" -} diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt index 5ea97483..a6352613 100644 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt +++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt @@ -62,49 +62,42 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe return true } - override fun hasColumn(column: TableColumn<*>): Boolean { - return when (column) { - RESOURCE_ID -> true - RESOURCE_START_TIME -> true - RESOURCE_STOP_TIME -> true - RESOURCE_CPU_COUNT -> true - RESOURCE_MEM_CAPACITY -> true - else -> false - } + override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + + override fun isNull(index: Int): Boolean { + require(index in 0..columns.size) { "Invalid column index" } + return false } - override fun <T> get(column: TableColumn<T>): T { - val res: Any? = when (column) { - RESOURCE_ID -> id - RESOURCE_START_TIME -> startTime - RESOURCE_STOP_TIME -> stopTime - RESOURCE_CPU_COUNT -> getInt(RESOURCE_CPU_COUNT) - RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_MEM_CAPACITY) + override fun get(index: Int): Any? { + return when (index) { + COL_ID -> id + COL_START_TIME -> startTime + COL_STOP_TIME -> stopTime + COL_CPU_COUNT -> getInt(index) + COL_MEM_CAPACITY -> getDouble(index) else -> throw IllegalArgumentException("Invalid column") } - - @Suppress("UNCHECKED_CAST") - return res as T } - override fun getBoolean(column: TableColumn<Boolean>): Boolean { + override fun getBoolean(index: Int): Boolean { throw IllegalArgumentException("Invalid column") } - override fun getInt(column: TableColumn<Int>): Int { - return when (column) { - RESOURCE_CPU_COUNT -> cpuCores + override fun getInt(index: Int): Int { + return when (index) { + COL_CPU_COUNT -> cpuCores else -> throw IllegalArgumentException("Invalid column") } } - override fun getLong(column: TableColumn<Long>): Long { + override fun getLong(index: Int): Long { throw IllegalArgumentException("Invalid column") } - override fun getDouble(column: TableColumn<Double>): Double { - return when (column) { - RESOURCE_MEM_CAPACITY -> memCapacity + override fun getDouble(index: Int): Double { + return when (index) { + COL_MEM_CAPACITY -> memCapacity else -> throw IllegalArgumentException("Invalid column") } } @@ -138,7 +131,7 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe /** * Reset the state. */ - fun reset() { + private fun reset() { id = null startTime = null stopTime = null @@ -146,6 +139,19 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe memCapacity = Double.NaN } + private val COL_ID = 0 + private val COL_START_TIME = 1 + private val COL_STOP_TIME = 2 + private val COL_CPU_COUNT = 3 + private val COL_MEM_CAPACITY = 4 + private val columns = mapOf( + RESOURCE_ID to COL_ID, + RESOURCE_START_TIME to COL_START_TIME, + RESOURCE_STOP_TIME to COL_STOP_TIME, + RESOURCE_CPU_COUNT to COL_CPU_COUNT, + RESOURCE_MEM_CAPACITY to COL_MEM_CAPACITY + ) + companion object { /** * The [CsvSchema] that is used to parse the trace. diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTrace.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTrace.kt deleted file mode 100644 index c7e7dc36..00000000 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTrace.kt +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.azure - -import com.fasterxml.jackson.dataformat.csv.CsvFactory -import org.opendc.trace.* -import java.nio.file.Path - -/** - * [Trace] implementation for the Azure v1 VM traces. - */ -public class AzureTrace internal constructor(private val factory: CsvFactory, private val path: Path) : Trace { - override val tables: List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) - - override fun containsTable(name: String): Boolean = name in tables - - override fun getTable(name: String): Table? { - return when (name) { - TABLE_RESOURCES -> AzureResourceTable(factory, path) - TABLE_RESOURCE_STATES -> AzureResourceStateTable(factory, path) - else -> null - } - } - - override fun toString(): String = "AzureTrace[$path]" -} diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt index 1230d857..253c7057 100644 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt +++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt @@ -24,10 +24,15 @@ package org.opendc.trace.azure import com.fasterxml.jackson.dataformat.csv.CsvFactory import com.fasterxml.jackson.dataformat.csv.CsvParser +import org.opendc.trace.* +import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat -import java.net.URL -import java.nio.file.Paths -import kotlin.io.path.exists +import org.opendc.trace.util.CompositeTableReader +import java.nio.file.Files +import java.nio.file.Path +import java.util.stream.Collectors +import kotlin.io.path.extension +import kotlin.io.path.nameWithoutExtension /** * A format implementation for the Azure v1 format. @@ -45,12 +50,68 @@ public class AzureTraceFormat : TraceFormat { .enable(CsvParser.Feature.ALLOW_COMMENTS) .enable(CsvParser.Feature.TRIM_SPACES) + override fun create(path: Path) { + throw UnsupportedOperationException("Writing not supported for this format") + } + + override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) + + override fun getDetails(path: Path, table: String): TableDetails { + return when (table) { + TABLE_RESOURCES -> TableDetails( + listOf( + RESOURCE_ID, + RESOURCE_START_TIME, + RESOURCE_STOP_TIME, + RESOURCE_CPU_COUNT, + RESOURCE_MEM_CAPACITY + ) + ) + TABLE_RESOURCE_STATES -> TableDetails( + listOf( + RESOURCE_ID, + RESOURCE_STATE_TIMESTAMP, + RESOURCE_STATE_CPU_USAGE_PCT + ), + listOf(RESOURCE_STATE_TIMESTAMP) + ) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newReader(path: Path, table: String): TableReader { + return when (table) { + TABLE_RESOURCES -> AzureResourceTableReader(factory.createParser(path.resolve("vmtable/vmtable.csv").toFile())) + TABLE_RESOURCE_STATES -> newResourceStateReader(path) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newWriter(path: Path, table: String): TableWriter { + throw UnsupportedOperationException("Writing not supported for this format") + } + /** - * Open the trace file. + * Construct a [TableReader] for reading over all VM CPU readings. */ - override fun open(url: URL): AzureTrace { - val path = Paths.get(url.toURI()) - require(path.exists()) { "URL $url does not exist" } - return AzureTrace(factory, path) + private fun newResourceStateReader(path: Path): TableReader { + val partitions = Files.walk(path.resolve("vm_cpu_readings"), 1) + .filter { !Files.isDirectory(it) && it.extension == "csv" } + .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) + .toSortedMap() + val it = partitions.iterator() + + return object : CompositeTableReader() { + override fun nextReader(): TableReader? { + return if (it.hasNext()) { + val (_, partPath) = it.next() + return AzureResourceStateTableReader(factory.createParser(partPath.toFile())) + } else { + null + } + } + + override fun toString(): String = "AzureCompositeTableReader" + } } } diff --git a/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt b/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt index e5735f0d..b73bb728 100644 --- a/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt @@ -26,8 +26,7 @@ import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows import org.opendc.trace.* -import java.io.File -import java.net.URL +import java.nio.file.Paths /** * Test suite for the [AzureTraceFormat] class. @@ -36,54 +35,29 @@ class AzureTraceFormatTest { private val format = AzureTraceFormat() @Test - fun testTraceExists() { - val url = File("src/test/resources/trace").toURI().toURL() - assertDoesNotThrow { - format.open(url) - } - } - - @Test - fun testTraceDoesNotExists() { - val url = File("src/test/resources/trace").toURI().toURL() - assertThrows<IllegalArgumentException> { - format.open(URL(url.toString() + "help")) - } - } - - @Test fun testTables() { - val url = File("src/test/resources/trace").toURI().toURL() - val trace = format.open(url) + val path = Paths.get("src/test/resources/trace") - assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), trace.tables) + assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), format.getTables(path)) } @Test fun testTableExists() { - val url = File("src/test/resources/trace").toURI().toURL() - val table = format.open(url).getTable(TABLE_RESOURCE_STATES) + val path = Paths.get("src/test/resources/trace") - assertNotNull(table) - assertDoesNotThrow { table!!.newReader() } + assertDoesNotThrow { format.getDetails(path, TABLE_RESOURCE_STATES) } } @Test fun testTableDoesNotExist() { - val url = File("src/test/resources/trace").toURI().toURL() - val trace = format.open(url) - - assertFalse(trace.containsTable("test")) - assertNull(trace.getTable("test")) + val path = Paths.get("src/test/resources/trace") + assertThrows<IllegalArgumentException> { format.getDetails(path, "test") } } @Test fun testResources() { - val url = File("src/test/resources/trace").toURI().toURL() - val trace = format.open(url) - - val reader = trace.getTable(TABLE_RESOURCES)!!.newReader() - + val path = Paths.get("src/test/resources/trace") + val reader = format.newReader(path, TABLE_RESOURCES) assertAll( { assertTrue(reader.nextRow()) }, { assertEquals("x/XsOfHO4ocsV99i4NluqKDuxctW2MMVmwqOPAlg4wp8mqbBOe3wxBlQo0+Qx+uf", reader.get(RESOURCE_ID)) }, @@ -96,14 +70,12 @@ class AzureTraceFormatTest { @Test fun testSmoke() { - val url = File("src/test/resources/trace").toURI().toURL() - val trace = format.open(url) - - val reader = trace.getTable(TABLE_RESOURCE_STATES)!!.newReader() + val path = Paths.get("src/test/resources/trace") + val reader = format.newReader(path, TABLE_RESOURCE_STATES) assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("+ZcrOp5/c/fJ6mVgP5qMZlOAGDwyjaaDNM0WoWOt2IDb47gT0UwK9lFwkPQv3C7Q", reader.get(RESOURCE_STATE_ID)) }, + { assertEquals("+ZcrOp5/c/fJ6mVgP5qMZlOAGDwyjaaDNM0WoWOt2IDb47gT0UwK9lFwkPQv3C7Q", reader.get(RESOURCE_ID)) }, { assertEquals(0, reader.get(RESOURCE_STATE_TIMESTAMP).epochSecond) }, { assertEquals(2.86979, reader.getDouble(RESOURCE_STATE_CPU_USAGE_PCT), 0.01) } ) diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt deleted file mode 100644 index 4a60dff3..00000000 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.bitbrains - -import org.opendc.trace.* -import java.nio.file.Files -import java.nio.file.Path -import java.util.stream.Collectors -import kotlin.io.path.bufferedReader -import kotlin.io.path.extension -import kotlin.io.path.nameWithoutExtension - -/** - * The resource state [Table] in the extended Bitbrains format. - */ -internal class BitbrainsExResourceStateTable(path: Path) : Table { - /** - * The partitions that belong to the table. - */ - private val partitions = Files.walk(path, 1) - .filter { !Files.isDirectory(it) && it.extension == "txt" } - .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) - .toSortedMap() - - override val name: String = TABLE_RESOURCE_STATES - - override val isSynthetic: Boolean = false - - override val columns: List<TableColumn<*>> = listOf( - RESOURCE_STATE_ID, - RESOURCE_STATE_CLUSTER_ID, - RESOURCE_STATE_TIMESTAMP, - RESOURCE_STATE_CPU_COUNT, - RESOURCE_STATE_CPU_CAPACITY, - RESOURCE_STATE_CPU_USAGE, - RESOURCE_STATE_CPU_USAGE_PCT, - RESOURCE_STATE_CPU_DEMAND, - RESOURCE_STATE_CPU_READY_PCT, - RESOURCE_STATE_MEM_CAPACITY, - RESOURCE_STATE_DISK_READ, - RESOURCE_STATE_DISK_WRITE, - ) - - override fun newReader(): TableReader { - val it = partitions.iterator() - - return object : TableReader { - var delegate: TableReader? = nextDelegate() - - override fun nextRow(): Boolean { - var delegate = delegate - - while (delegate != null) { - if (delegate.nextRow()) { - break - } - - delegate.close() - delegate = nextDelegate() - this.delegate = delegate - } - - return delegate != null - } - - override fun hasColumn(column: TableColumn<*>): Boolean = delegate?.hasColumn(column) ?: false - - override fun <T> get(column: TableColumn<T>): T { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.get(column) - } - - override fun getBoolean(column: TableColumn<Boolean>): Boolean { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.getBoolean(column) - } - - override fun getInt(column: TableColumn<Int>): Int { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.getInt(column) - } - - override fun getLong(column: TableColumn<Long>): Long { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.getLong(column) - } - - override fun getDouble(column: TableColumn<Double>): Double { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.getDouble(column) - } - - override fun close() { - delegate?.close() - } - - private fun nextDelegate(): TableReader? { - return if (it.hasNext()) { - val (_, path) = it.next() - val reader = path.bufferedReader() - return BitbrainsExResourceStateTableReader(reader) - } else { - null - } - } - - override fun toString(): String = "SvCompositeTableReader" - } - } - - override fun newReader(partition: String): TableReader { - val path = requireNotNull(partitions[partition]) { "Invalid partition $partition" } - val reader = path.bufferedReader() - return BitbrainsExResourceStateTableReader(reader) - } - - override fun toString(): String = "BitbrainsExResourceStateTable" -} diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt index f1cf7307..c1b6f5ba 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt @@ -88,70 +88,53 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR return true } - override fun hasColumn(column: TableColumn<*>): Boolean { - return when (column) { - RESOURCE_STATE_ID -> true - RESOURCE_STATE_CLUSTER_ID -> true - RESOURCE_STATE_TIMESTAMP -> true - RESOURCE_STATE_CPU_COUNT -> true - RESOURCE_STATE_CPU_CAPACITY -> true - RESOURCE_STATE_CPU_USAGE -> true - RESOURCE_STATE_CPU_USAGE_PCT -> true - RESOURCE_STATE_CPU_DEMAND -> true - RESOURCE_STATE_CPU_READY_PCT -> true - RESOURCE_STATE_MEM_CAPACITY -> true - RESOURCE_STATE_DISK_READ -> true - RESOURCE_STATE_DISK_WRITE -> true - else -> false - } + override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + + override fun isNull(index: Int): Boolean { + require(index in 0..COL_MAX) { "Invalid column index" } + return false } - override fun <T> get(column: TableColumn<T>): T { - val res: Any? = when (column) { - RESOURCE_STATE_ID -> id - RESOURCE_STATE_CLUSTER_ID -> cluster - RESOURCE_STATE_TIMESTAMP -> timestamp - RESOURCE_STATE_CPU_COUNT -> getInt(RESOURCE_STATE_CPU_COUNT) - RESOURCE_STATE_CPU_CAPACITY -> getDouble(RESOURCE_STATE_CPU_CAPACITY) - RESOURCE_STATE_CPU_USAGE -> getDouble(RESOURCE_STATE_CPU_USAGE) - RESOURCE_STATE_CPU_USAGE_PCT -> getDouble(RESOURCE_STATE_CPU_USAGE_PCT) - RESOURCE_STATE_MEM_CAPACITY -> getDouble(RESOURCE_STATE_MEM_CAPACITY) - RESOURCE_STATE_DISK_READ -> getDouble(RESOURCE_STATE_DISK_READ) - RESOURCE_STATE_DISK_WRITE -> getDouble(RESOURCE_STATE_DISK_WRITE) + override fun get(index: Int): Any? { + return when (index) { + COL_ID -> id + COL_CLUSTER_ID -> cluster + COL_TIMESTAMP -> timestamp + COL_NCPUS -> getInt(index) + COL_POWERED_ON -> getInt(index) + COL_CPU_CAPACITY, COL_CPU_USAGE, COL_CPU_USAGE_PCT, COL_CPU_READY_PCT, COL_CPU_DEMAND, COL_MEM_CAPACITY, COL_DISK_READ, COL_DISK_WRITE -> getDouble(index) else -> throw IllegalArgumentException("Invalid column") } - - @Suppress("UNCHECKED_CAST") - return res as T } - override fun getBoolean(column: TableColumn<Boolean>): Boolean { - return when (column) { - RESOURCE_STATE_POWERED_ON -> poweredOn + override fun getBoolean(index: Int): Boolean { + return when (index) { + COL_POWERED_ON -> poweredOn else -> throw IllegalArgumentException("Invalid column") } } - override fun getInt(column: TableColumn<Int>): Int { - return when (column) { - RESOURCE_STATE_CPU_COUNT -> cpuCores + override fun getInt(index: Int): Int { + return when (index) { + COL_NCPUS -> cpuCores else -> throw IllegalArgumentException("Invalid column") } } - override fun getLong(column: TableColumn<Long>): Long { + override fun getLong(index: Int): Long { throw IllegalArgumentException("Invalid column") } - override fun getDouble(column: TableColumn<Double>): Double { - return when (column) { - RESOURCE_STATE_CPU_CAPACITY -> cpuCapacity - RESOURCE_STATE_CPU_USAGE -> cpuUsage - RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsage / cpuCapacity - RESOURCE_STATE_CPU_DEMAND -> cpuDemand - RESOURCE_STATE_MEM_CAPACITY -> memCapacity - RESOURCE_STATE_DISK_READ -> diskRead - RESOURCE_STATE_DISK_WRITE -> diskWrite + override fun getDouble(index: Int): Double { + return when (index) { + COL_CPU_CAPACITY -> cpuCapacity + COL_CPU_USAGE -> cpuUsage + COL_CPU_USAGE_PCT -> cpuUsage / cpuCapacity + COL_CPU_READY_PCT -> cpuReadyPct + COL_CPU_DEMAND -> cpuDemand + COL_MEM_CAPACITY -> memCapacity + COL_DISK_READ -> diskRead + COL_DISK_WRITE -> diskWrite else -> throw IllegalArgumentException("Invalid column") } } @@ -209,4 +192,21 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR private val COL_CPU_CAPACITY = 18 private val COL_ID = 19 private val COL_MEM_CAPACITY = 20 + private val COL_CPU_USAGE_PCT = 21 + private val COL_MAX = COL_CPU_USAGE_PCT + 1 + + private val columns = mapOf( + RESOURCE_ID to COL_ID, + RESOURCE_CLUSTER_ID to COL_CLUSTER_ID, + RESOURCE_STATE_TIMESTAMP to COL_TIMESTAMP, + RESOURCE_CPU_COUNT to COL_NCPUS, + RESOURCE_CPU_CAPACITY to COL_CPU_CAPACITY, + RESOURCE_STATE_CPU_USAGE to COL_CPU_USAGE, + RESOURCE_STATE_CPU_USAGE_PCT to COL_CPU_USAGE_PCT, + RESOURCE_STATE_CPU_DEMAND to COL_CPU_DEMAND, + RESOURCE_STATE_CPU_READY_PCT to COL_CPU_READY_PCT, + RESOURCE_MEM_CAPACITY to COL_MEM_CAPACITY, + RESOURCE_STATE_DISK_READ to COL_DISK_READ, + RESOURCE_STATE_DISK_WRITE to COL_DISK_WRITE + ) } diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTrace.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTrace.kt deleted file mode 100644 index f16c493d..00000000 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTrace.kt +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.bitbrains - -import org.opendc.trace.* -import java.nio.file.Path - -/** - * [Trace] implementation for the extended Bitbrains format. - */ -public class BitbrainsExTrace internal constructor(private val path: Path) : Trace { - override val tables: List<String> = listOf(TABLE_RESOURCE_STATES) - - override fun containsTable(name: String): Boolean = TABLE_RESOURCE_STATES == name - - override fun getTable(name: String): Table? { - if (!containsTable(name)) { - return null - } - - return BitbrainsExResourceStateTable(path) - } - - override fun toString(): String = "BitbrainsExTrace[$path]" -} diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt index 06388a84..20222c8a 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt @@ -22,10 +22,16 @@ package org.opendc.trace.bitbrains +import org.opendc.trace.* +import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat -import java.net.URL -import java.nio.file.Paths -import kotlin.io.path.exists +import org.opendc.trace.util.CompositeTableReader +import java.nio.file.Files +import java.nio.file.Path +import java.util.stream.Collectors +import kotlin.io.path.bufferedReader +import kotlin.io.path.extension +import kotlin.io.path.nameWithoutExtension /** * A format implementation for the extended Bitbrains trace format. @@ -36,12 +42,67 @@ public class BitbrainsExTraceFormat : TraceFormat { */ override val name: String = "bitbrains-ex" + override fun create(path: Path) { + throw UnsupportedOperationException("Writing not supported for this format") + } + + override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCE_STATES) + + override fun getDetails(path: Path, table: String): TableDetails { + return when (table) { + TABLE_RESOURCE_STATES -> TableDetails( + listOf( + RESOURCE_ID, + RESOURCE_CLUSTER_ID, + RESOURCE_STATE_TIMESTAMP, + RESOURCE_CPU_COUNT, + RESOURCE_CPU_CAPACITY, + RESOURCE_STATE_CPU_USAGE, + RESOURCE_STATE_CPU_USAGE_PCT, + RESOURCE_STATE_CPU_DEMAND, + RESOURCE_STATE_CPU_READY_PCT, + RESOURCE_MEM_CAPACITY, + RESOURCE_STATE_DISK_READ, + RESOURCE_STATE_DISK_WRITE + ), + listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP) + ) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newReader(path: Path, table: String): TableReader { + return when (table) { + TABLE_RESOURCE_STATES -> newResourceStateReader(path) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newWriter(path: Path, table: String): TableWriter { + throw UnsupportedOperationException("Writing not supported for this format") + } + /** - * Open the trace file. + * Construct a [TableReader] for reading over all resource state partitions. */ - override fun open(url: URL): BitbrainsExTrace { - val path = Paths.get(url.toURI()) - require(path.exists()) { "URL $url does not exist" } - return BitbrainsExTrace(path) + private fun newResourceStateReader(path: Path): TableReader { + val partitions = Files.walk(path, 1) + .filter { !Files.isDirectory(it) && it.extension == "txt" } + .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) + .toSortedMap() + val it = partitions.iterator() + + return object : CompositeTableReader() { + override fun nextReader(): TableReader? { + return if (it.hasNext()) { + val (_, partPath) = it.next() + return BitbrainsExResourceStateTableReader(partPath.bufferedReader()) + } else { + null + } + } + + override fun toString(): String = "BitbrainsExCompositeTableReader" + } } } diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt deleted file mode 100644 index 7241b18b..00000000 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.bitbrains - -import com.fasterxml.jackson.dataformat.csv.CsvFactory -import org.opendc.trace.* -import java.nio.file.Files -import java.nio.file.Path -import java.util.stream.Collectors -import kotlin.io.path.extension -import kotlin.io.path.nameWithoutExtension - -/** - * The resource state [Table] in the Bitbrains format. - */ -internal class BitbrainsResourceStateTable(private val factory: CsvFactory, path: Path) : Table { - /** - * The partitions that belong to the table. - */ - private val partitions = - Files.walk(path, 1) - .filter { !Files.isDirectory(it) && it.extension == "csv" } - .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) - .toSortedMap() - - override val name: String = TABLE_RESOURCE_STATES - - override val isSynthetic: Boolean = false - - override val columns: List<TableColumn<*>> = listOf( - RESOURCE_STATE_ID, - RESOURCE_STATE_TIMESTAMP, - RESOURCE_STATE_CPU_COUNT, - RESOURCE_STATE_CPU_CAPACITY, - RESOURCE_STATE_CPU_USAGE, - RESOURCE_STATE_CPU_USAGE_PCT, - RESOURCE_STATE_MEM_CAPACITY, - RESOURCE_STATE_MEM_USAGE, - RESOURCE_STATE_DISK_READ, - RESOURCE_STATE_DISK_WRITE, - RESOURCE_STATE_NET_RX, - RESOURCE_STATE_NET_TX, - ) - - override fun newReader(): TableReader { - val it = partitions.iterator() - - return object : TableReader { - var delegate: TableReader? = nextDelegate() - - override fun nextRow(): Boolean { - var delegate = delegate - - while (delegate != null) { - if (delegate.nextRow()) { - break - } - - delegate.close() - delegate = nextDelegate() - this.delegate = delegate - } - - return delegate != null - } - - override fun hasColumn(column: TableColumn<*>): Boolean = delegate?.hasColumn(column) ?: false - - override fun <T> get(column: TableColumn<T>): T { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.get(column) - } - - override fun getBoolean(column: TableColumn<Boolean>): Boolean { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.getBoolean(column) - } - - override fun getInt(column: TableColumn<Int>): Int { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.getInt(column) - } - - override fun getLong(column: TableColumn<Long>): Long { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.getLong(column) - } - - override fun getDouble(column: TableColumn<Double>): Double { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.getDouble(column) - } - - override fun close() { - delegate?.close() - } - - private fun nextDelegate(): TableReader? { - return if (it.hasNext()) { - val (partition, path) = it.next() - return BitbrainsResourceStateTableReader(partition, factory.createParser(path.toFile())) - } else { - null - } - } - - override fun toString(): String = "BitbrainsCompositeTableReader" - } - } - - override fun newReader(partition: String): TableReader { - val path = requireNotNull(partitions[partition]) { "Invalid partition $partition" } - return BitbrainsResourceStateTableReader(partition, factory.createParser(path.toFile())) - } - - override fun toString(): String = "BitbrainsResourceStateTable" -} diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt index 56e66f5c..3a8839b4 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt @@ -111,71 +111,49 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, return true } - override fun hasColumn(column: TableColumn<*>): Boolean { - return when (column) { - RESOURCE_STATE_ID -> true - RESOURCE_STATE_TIMESTAMP -> true - RESOURCE_STATE_CPU_COUNT -> true - RESOURCE_STATE_CPU_CAPACITY -> true - RESOURCE_STATE_CPU_USAGE -> true - RESOURCE_STATE_CPU_USAGE_PCT -> true - RESOURCE_STATE_MEM_CAPACITY -> true - RESOURCE_STATE_MEM_USAGE -> true - RESOURCE_STATE_DISK_READ -> true - RESOURCE_STATE_DISK_WRITE -> true - RESOURCE_STATE_NET_RX -> true - RESOURCE_STATE_NET_TX -> true - else -> false - } + override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + + override fun isNull(index: Int): Boolean { + check(index in 0..columns.size) { "Invalid column index" } + return false } - override fun <T> get(column: TableColumn<T>): T { - val res: Any? = when (column) { - RESOURCE_STATE_ID -> partition - RESOURCE_STATE_TIMESTAMP -> timestamp - RESOURCE_STATE_CPU_COUNT -> cpuCores - RESOURCE_STATE_CPU_CAPACITY -> cpuCapacity - RESOURCE_STATE_CPU_USAGE -> cpuUsage - RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct - RESOURCE_STATE_MEM_CAPACITY -> memCapacity - RESOURCE_STATE_MEM_USAGE -> memUsage - RESOURCE_STATE_DISK_READ -> diskRead - RESOURCE_STATE_DISK_WRITE -> diskWrite - RESOURCE_STATE_NET_RX -> netReceived - RESOURCE_STATE_NET_TX -> netTransmitted + override fun get(index: Int): Any? { + return when (index) { + COL_ID -> partition + COL_TIMESTAMP -> timestamp + COL_CPU_COUNT -> getInt(index) + COL_CPU_CAPACITY, COL_CPU_USAGE, COL_CPU_USAGE_PCT, COL_MEM_CAPACITY, COL_MEM_USAGE, COL_DISK_READ, COL_DISK_WRITE, COL_NET_RX, COL_NET_TX -> getDouble(index) else -> throw IllegalArgumentException("Invalid column") } - - @Suppress("UNCHECKED_CAST") - return res as T } - override fun getBoolean(column: TableColumn<Boolean>): Boolean { + override fun getBoolean(index: Int): Boolean { throw IllegalArgumentException("Invalid column") } - override fun getInt(column: TableColumn<Int>): Int { - return when (column) { - RESOURCE_STATE_CPU_COUNT -> cpuCores + override fun getInt(index: Int): Int { + return when (index) { + COL_CPU_COUNT -> cpuCores else -> throw IllegalArgumentException("Invalid column") } } - override fun getLong(column: TableColumn<Long>): Long { + override fun getLong(index: Int): Long { throw IllegalArgumentException("Invalid column") } - override fun getDouble(column: TableColumn<Double>): Double { - return when (column) { - RESOURCE_STATE_CPU_CAPACITY -> cpuCapacity - RESOURCE_STATE_CPU_USAGE -> cpuUsage - RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct - RESOURCE_STATE_MEM_CAPACITY -> memCapacity - RESOURCE_STATE_MEM_USAGE -> memUsage - RESOURCE_STATE_DISK_READ -> diskRead - RESOURCE_STATE_DISK_WRITE -> diskWrite - RESOURCE_STATE_NET_RX -> netReceived - RESOURCE_STATE_NET_TX -> netTransmitted + override fun getDouble(index: Int): Double { + return when (index) { + COL_CPU_CAPACITY -> cpuCapacity + COL_CPU_USAGE -> cpuUsage + COL_CPU_USAGE_PCT -> cpuUsagePct + COL_MEM_CAPACITY -> memCapacity + COL_MEM_USAGE -> memUsage + COL_DISK_READ -> diskRead + COL_DISK_WRITE -> diskWrite + COL_NET_RX -> netReceived + COL_NET_TX -> netTransmitted else -> throw IllegalArgumentException("Invalid column") } } @@ -249,6 +227,34 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, netTransmitted = Double.NaN } + private val COL_TIMESTAMP = 0 + private val COL_CPU_COUNT = 1 + private val COL_CPU_CAPACITY = 2 + private val COL_CPU_USAGE = 3 + private val COL_CPU_USAGE_PCT = 4 + private val COL_MEM_CAPACITY = 5 + private val COL_MEM_USAGE = 6 + private val COL_DISK_READ = 7 + private val COL_DISK_WRITE = 8 + private val COL_NET_RX = 9 + private val COL_NET_TX = 10 + private val COL_ID = 11 + + private val columns = mapOf( + RESOURCE_ID to COL_ID, + RESOURCE_STATE_TIMESTAMP to COL_TIMESTAMP, + RESOURCE_CPU_COUNT to COL_CPU_COUNT, + RESOURCE_CPU_CAPACITY to COL_CPU_CAPACITY, + RESOURCE_STATE_CPU_USAGE to COL_CPU_USAGE, + RESOURCE_STATE_CPU_USAGE_PCT to COL_CPU_USAGE_PCT, + RESOURCE_MEM_CAPACITY to COL_MEM_CAPACITY, + RESOURCE_STATE_MEM_USAGE to COL_MEM_USAGE, + RESOURCE_STATE_DISK_READ to COL_DISK_READ, + RESOURCE_STATE_DISK_WRITE to COL_DISK_WRITE, + RESOURCE_STATE_NET_RX to COL_NET_RX, + RESOURCE_STATE_NET_TX to COL_NET_TX + ) + /** * The type of the timestamp in the trace. */ diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTable.kt deleted file mode 100644 index bc4f0b7d..00000000 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTable.kt +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.bitbrains - -import com.fasterxml.jackson.dataformat.csv.CsvFactory -import org.opendc.trace.* -import java.nio.file.Files -import java.nio.file.Path -import java.util.stream.Collectors -import kotlin.io.path.extension -import kotlin.io.path.nameWithoutExtension - -/** - * The resources [Table] in the Bitbrains format. - */ -internal class BitbrainsResourceTable(private val factory: CsvFactory, path: Path) : Table { - /** - * The VMs that belong to the table. - */ - private val vms = - Files.walk(path, 1) - .filter { !Files.isDirectory(it) && it.extension == "csv" } - .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) - .toSortedMap() - - override val name: String = TABLE_RESOURCES - - override val isSynthetic: Boolean = true - - override val columns: List<TableColumn<*>> = listOf(RESOURCE_ID) - - override fun newReader(): TableReader { - return BitbrainsResourceTableReader(factory, vms) - } - - override fun newReader(partition: String): TableReader { - throw IllegalArgumentException("Unknown partition $partition") - } - - override fun toString(): String = "BitbrainsResourceTable" -} diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt index c02dc5ae..3701994a 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt @@ -43,13 +43,14 @@ internal class BitbrainsResourceTableReader(private val factory: CsvFactory, vms val parser = factory.createParser(path.toFile()) val reader = BitbrainsResourceStateTableReader(name, parser) + val idCol = reader.resolve(RESOURCE_ID) try { if (!reader.nextRow()) { continue } - id = reader.get(RESOURCE_STATE_ID) + id = reader.get(idCol) as String return true } finally { reader.close() @@ -59,36 +60,33 @@ internal class BitbrainsResourceTableReader(private val factory: CsvFactory, vms return false } - override fun hasColumn(column: TableColumn<*>): Boolean { - return when (column) { - RESOURCE_ID -> true - else -> false - } + override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + + override fun isNull(index: Int): Boolean { + check(index in 0..columns.size) { "Invalid column index" } + return false } - override fun <T> get(column: TableColumn<T>): T { - val res: Any? = when (column) { - RESOURCE_ID -> id + override fun get(index: Int): Any? { + return when (index) { + COL_ID -> id else -> throw IllegalArgumentException("Invalid column") } - - @Suppress("UNCHECKED_CAST") - return res as T } - override fun getBoolean(column: TableColumn<Boolean>): Boolean { + override fun getBoolean(index: Int): Boolean { throw IllegalArgumentException("Invalid column") } - override fun getInt(column: TableColumn<Int>): Int { + override fun getInt(index: Int): Int { throw IllegalArgumentException("Invalid column") } - override fun getLong(column: TableColumn<Long>): Long { + override fun getLong(index: Int): Long { throw IllegalArgumentException("Invalid column") } - override fun getDouble(column: TableColumn<Double>): Double { + override fun getDouble(index: Int): Double { throw IllegalArgumentException("Invalid column") } @@ -105,4 +103,7 @@ internal class BitbrainsResourceTableReader(private val factory: CsvFactory, vms private fun reset() { id = null } + + private val COL_ID = 0 + private val columns = mapOf(RESOURCE_ID to COL_ID) } diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt deleted file mode 100644 index bcd8dd52..00000000 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.bitbrains - -import com.fasterxml.jackson.dataformat.csv.CsvFactory -import org.opendc.trace.* -import java.nio.file.Path - -/** - * [Trace] implementation for the Bitbrains format. - */ -public class BitbrainsTrace internal constructor(private val factory: CsvFactory, private val path: Path) : Trace { - override val tables: List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) - - override fun containsTable(name: String): Boolean = tables.contains(name) - - override fun getTable(name: String): Table? { - return when (name) { - TABLE_RESOURCES -> BitbrainsResourceTable(factory, path) - TABLE_RESOURCE_STATES -> BitbrainsResourceStateTable(factory, path) - else -> null - } - } - - override fun toString(): String = "BitbrainsTrace[$path]" -} diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt index 55b11fe3..3885c931 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt @@ -24,10 +24,15 @@ package org.opendc.trace.bitbrains import com.fasterxml.jackson.dataformat.csv.CsvFactory import com.fasterxml.jackson.dataformat.csv.CsvParser +import org.opendc.trace.* +import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat -import java.net.URL -import java.nio.file.Paths -import kotlin.io.path.exists +import org.opendc.trace.util.CompositeTableReader +import java.nio.file.Files +import java.nio.file.Path +import java.util.stream.Collectors +import kotlin.io.path.extension +import kotlin.io.path.nameWithoutExtension /** * A format implementation for the GWF trace format. @@ -45,12 +50,75 @@ public class BitbrainsTraceFormat : TraceFormat { .enable(CsvParser.Feature.ALLOW_COMMENTS) .enable(CsvParser.Feature.TRIM_SPACES) + override fun create(path: Path) { + throw UnsupportedOperationException("Writing not supported for this format") + } + + override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) + + override fun getDetails(path: Path, table: String): TableDetails { + return when (table) { + TABLE_RESOURCES -> TableDetails(listOf(RESOURCE_ID)) + TABLE_RESOURCE_STATES -> TableDetails( + listOf( + RESOURCE_ID, + RESOURCE_STATE_TIMESTAMP, + RESOURCE_CPU_COUNT, + RESOURCE_CPU_CAPACITY, + RESOURCE_STATE_CPU_USAGE, + RESOURCE_STATE_CPU_USAGE_PCT, + RESOURCE_MEM_CAPACITY, + RESOURCE_STATE_MEM_USAGE, + RESOURCE_STATE_DISK_READ, + RESOURCE_STATE_DISK_WRITE, + RESOURCE_STATE_NET_RX, + RESOURCE_STATE_NET_TX, + ), + listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP) + ) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newReader(path: Path, table: String): TableReader { + return when (table) { + TABLE_RESOURCES -> { + val vms = Files.walk(path, 1) + .filter { !Files.isDirectory(it) && it.extension == "csv" } + .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) + .toSortedMap() + BitbrainsResourceTableReader(factory, vms) + } + TABLE_RESOURCE_STATES -> newResourceStateReader(path) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newWriter(path: Path, table: String): TableWriter { + throw UnsupportedOperationException("Writing not supported for this format") + } + /** - * Open a Bitbrains trace. + * Construct a [TableReader] for reading over all resource state partitions. */ - override fun open(url: URL): BitbrainsTrace { - val path = Paths.get(url.toURI()) - require(path.exists()) { "URL $url does not exist" } - return BitbrainsTrace(factory, path) + private fun newResourceStateReader(path: Path): TableReader { + val partitions = Files.walk(path, 1) + .filter { !Files.isDirectory(it) && it.extension == "csv" } + .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) + .toSortedMap() + val it = partitions.iterator() + + return object : CompositeTableReader() { + override fun nextReader(): TableReader? { + return if (it.hasNext()) { + val (partition, partPath) = it.next() + return BitbrainsResourceStateTableReader(partition, factory.createParser(partPath.toFile())) + } else { + null + } + } + + override fun toString(): String = "BitbrainsCompositeTableReader" + } } } diff --git a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt index 2e4f176a..d734cf5f 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt @@ -26,62 +26,38 @@ import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows import org.opendc.trace.* -import java.net.URL +import java.nio.file.Paths /** * Test suite for the [BitbrainsExTraceFormat] class. */ -class BitbrainsExTraceFormatTest { +internal class BitbrainsExTraceFormatTest { private val format = BitbrainsExTraceFormat() @Test - fun testTraceExists() { - val url = checkNotNull(BitbrainsExTraceFormatTest::class.java.getResource("/vm.txt")) - assertDoesNotThrow { - format.open(url) - } - } - - @Test - fun testTraceDoesNotExists() { - val url = checkNotNull(BitbrainsExTraceFormatTest::class.java.getResource("/vm.txt")) - assertThrows<IllegalArgumentException> { - format.open(URL(url.toString() + "help")) - } - } - - @Test fun testTables() { - val url = checkNotNull(BitbrainsExTraceFormatTest::class.java.getResource("/vm.txt")) - val trace = format.open(url) + val path = Paths.get("src/test/resources/vm.txt") - assertEquals(listOf(TABLE_RESOURCE_STATES), trace.tables) + assertEquals(listOf(TABLE_RESOURCE_STATES), format.getTables(path)) } @Test fun testTableExists() { - val url = checkNotNull(BitbrainsExTraceFormatTest::class.java.getResource("/vm.txt")) - val table = format.open(url).getTable(TABLE_RESOURCE_STATES) + val path = Paths.get("src/test/resources/vm.txt") - assertNotNull(table) - assertDoesNotThrow { table!!.newReader() } + assertDoesNotThrow { format.getDetails(path, TABLE_RESOURCE_STATES) } } @Test fun testTableDoesNotExist() { - val url = checkNotNull(BitbrainsExTraceFormatTest::class.java.getResource("/vm.txt")) - val trace = format.open(url) - - assertFalse(trace.containsTable("test")) - assertNull(trace.getTable("test")) + val path = Paths.get("src/test/resources/vm.txt") + assertThrows<IllegalArgumentException> { format.getDetails(path, "test") } } @Test fun testSmoke() { - val url = checkNotNull(BitbrainsExTraceFormatTest::class.java.getResource("/vm.txt")) - val trace = format.open(url) - - val reader = trace.getTable(TABLE_RESOURCE_STATES)!!.newReader() + val path = Paths.get("src/test/resources/vm.txt") + val reader = format.newReader(path, TABLE_RESOURCE_STATES) assertAll( { assertTrue(reader.nextRow()) }, diff --git a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt index ff4a33f8..41e7def2 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt @@ -26,66 +26,38 @@ import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows import org.opendc.trace.* -import java.net.URL +import java.nio.file.Paths /** * Test suite for the [BitbrainsTraceFormat] class. */ class BitbrainsTraceFormatTest { - @Test - fun testTraceExists() { - val format = BitbrainsTraceFormat() - val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv")) - assertDoesNotThrow { - format.open(url) - } - } - - @Test - fun testTraceDoesNotExists() { - val format = BitbrainsTraceFormat() - val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv")) - assertThrows<IllegalArgumentException> { - format.open(URL(url.toString() + "help")) - } - } + private val format = BitbrainsTraceFormat() @Test fun testTables() { - val format = BitbrainsTraceFormat() - val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv")) - val trace = format.open(url) + val path = Paths.get("src/test/resources/bitbrains.csv") - assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), trace.tables) + assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), format.getTables(path)) } @Test fun testTableExists() { - val format = BitbrainsTraceFormat() - val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv")) - val table = format.open(url).getTable(TABLE_RESOURCE_STATES) + val path = Paths.get("src/test/resources/bitbrains.csv") - assertNotNull(table) - assertDoesNotThrow { table!!.newReader() } + assertDoesNotThrow { format.getDetails(path, TABLE_RESOURCE_STATES) } } @Test fun testTableDoesNotExist() { - val format = BitbrainsTraceFormat() - val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv")) - val trace = format.open(url) - - assertFalse(trace.containsTable("test")) - assertNull(trace.getTable("test")) + val path = Paths.get("src/test/resources/bitbrains.csv") + assertThrows<IllegalArgumentException> { format.getDetails(path, "test") } } @Test fun testResources() { - val format = BitbrainsTraceFormat() - val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv")) - val trace = format.open(url) - - val reader = trace.getTable(TABLE_RESOURCES)!!.newReader() + val path = Paths.get("src/test/resources/bitbrains.csv") + val reader = format.newReader(path, TABLE_RESOURCES) assertAll( { assertTrue(reader.nextRow()) }, @@ -98,11 +70,8 @@ class BitbrainsTraceFormatTest { @Test fun testSmoke() { - val format = BitbrainsTraceFormat() - val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv")) - val trace = format.open(url) - - val reader = trace.getTable(TABLE_RESOURCE_STATES)!!.newReader() + val path = Paths.get("src/test/resources/bitbrains.csv") + val reader = format.newReader(path, TABLE_RESOURCE_STATES) assertAll( { assertTrue(reader.nextRow()) }, diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt deleted file mode 100644 index fd7bd068..00000000 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.gwf - -import com.fasterxml.jackson.dataformat.csv.CsvFactory -import org.opendc.trace.* -import java.net.URL - -/** - * A [Table] containing the tasks in a GWF trace. - */ -internal class GwfTaskTable(private val factory: CsvFactory, private val url: URL) : Table { - override val name: String = TABLE_TASKS - - override val isSynthetic: Boolean = false - - override val columns: List<TableColumn<*>> = listOf( - TASK_WORKFLOW_ID, - TASK_ID, - TASK_SUBMIT_TIME, - TASK_RUNTIME, - TASK_REQ_NCPUS, - TASK_ALLOC_NCPUS, - TASK_PARENTS - ) - - override fun newReader(): TableReader { - return GwfTaskTableReader(factory.createParser(url)) - } - - override fun newReader(partition: String): TableReader { - throw IllegalArgumentException("Invalid partition $partition") - } - - override fun toString(): String = "GwfTaskTable" -} diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt index 39eb5520..aa4c543b 100644 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt @@ -67,52 +67,43 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { return true } - override fun hasColumn(column: TableColumn<*>): Boolean { - return when (column) { - TASK_WORKFLOW_ID -> true - TASK_ID -> true - TASK_SUBMIT_TIME -> true - TASK_RUNTIME -> true - TASK_REQ_NCPUS -> true - TASK_ALLOC_NCPUS -> true - TASK_PARENTS -> true - else -> false - } + override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + + override fun isNull(index: Int): Boolean { + check(index in 0..columns.size) { "Invalid column" } + return false } - override fun <T> get(column: TableColumn<T>): T { - val res: Any? = when (column) { - TASK_WORKFLOW_ID -> workflowId - TASK_ID -> jobId - TASK_SUBMIT_TIME -> submitTime - TASK_RUNTIME -> runtime - TASK_REQ_NCPUS -> nProcs - TASK_ALLOC_NCPUS -> reqNProcs - TASK_PARENTS -> dependencies + override fun get(index: Int): Any? { + return when (index) { + COL_JOB_ID -> jobId + COL_WORKFLOW_ID -> workflowId + COL_SUBMIT_TIME -> submitTime + COL_RUNTIME -> runtime + COL_REQ_NPROC -> getInt(index) + COL_NPROC -> getInt(index) + COL_DEPS -> dependencies else -> throw IllegalArgumentException("Invalid column") } - - @Suppress("UNCHECKED_CAST") - return res as T } - override fun getBoolean(column: TableColumn<Boolean>): Boolean { + override fun getBoolean(index: Int): Boolean { throw IllegalArgumentException("Invalid column") } - override fun getInt(column: TableColumn<Int>): Int { - return when (column) { - TASK_REQ_NCPUS -> nProcs - TASK_ALLOC_NCPUS -> reqNProcs + override fun getInt(index: Int): Int { + return when (index) { + COL_REQ_NPROC -> reqNProcs + COL_NPROC -> nProcs else -> throw IllegalArgumentException("Invalid column") } } - override fun getLong(column: TableColumn<Long>): Long { + override fun getLong(index: Int): Long { throw IllegalArgumentException("Invalid column") } - override fun getDouble(column: TableColumn<Double>): Double { + override fun getDouble(index: Int): Double { throw IllegalArgumentException("Invalid column") } @@ -180,6 +171,24 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { dependencies = emptySet() } + private val COL_WORKFLOW_ID = 0 + private val COL_JOB_ID = 1 + private val COL_SUBMIT_TIME = 2 + private val COL_RUNTIME = 3 + private val COL_NPROC = 4 + private val COL_REQ_NPROC = 5 + private val COL_DEPS = 6 + + private val columns = mapOf( + TASK_ID to COL_JOB_ID, + TASK_WORKFLOW_ID to COL_WORKFLOW_ID, + TASK_SUBMIT_TIME to COL_SUBMIT_TIME, + TASK_RUNTIME to COL_RUNTIME, + TASK_ALLOC_NCPUS to COL_NPROC, + TASK_REQ_NCPUS to COL_REQ_NPROC, + TASK_PARENTS to COL_DEPS + ) + companion object { /** * The [CsvSchema] that is used to parse the trace. diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTrace.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTrace.kt deleted file mode 100644 index 166c1e56..00000000 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTrace.kt +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.gwf - -import com.fasterxml.jackson.dataformat.csv.CsvFactory -import org.opendc.trace.* -import java.net.URL - -/** - * [Trace] implementation for the GWF format. - */ -public class GwfTrace internal constructor(private val factory: CsvFactory, private val url: URL) : Trace { - override val tables: List<String> = listOf(TABLE_TASKS) - - override fun containsTable(name: String): Boolean = TABLE_TASKS == name - - override fun getTable(name: String): Table? { - if (!containsTable(name)) { - return null - } - - return GwfTaskTable(factory, url) - } - - override fun toString(): String = "GwfTrace[$url]" -} diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt index 6d542503..d4287420 100644 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt @@ -24,10 +24,10 @@ package org.opendc.trace.gwf import com.fasterxml.jackson.dataformat.csv.CsvFactory import com.fasterxml.jackson.dataformat.csv.CsvParser +import org.opendc.trace.* +import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat -import java.net.URL -import java.nio.file.Paths -import kotlin.io.path.exists +import java.nio.file.Path /** * A [TraceFormat] implementation for the GWF trace format. @@ -45,12 +45,38 @@ public class GwfTraceFormat : TraceFormat { .enable(CsvParser.Feature.ALLOW_COMMENTS) .enable(CsvParser.Feature.TRIM_SPACES) - /** - * Read the tasks in the GWF trace. - */ - public override fun open(url: URL): GwfTrace { - val path = Paths.get(url.toURI()) - require(path.exists()) { "URL $url does not exist" } - return GwfTrace(factory, url) + override fun create(path: Path) { + throw UnsupportedOperationException("Writing not supported for this format") + } + + override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS) + + override fun getDetails(path: Path, table: String): TableDetails { + return when (table) { + TABLE_TASKS -> TableDetails( + listOf( + TASK_WORKFLOW_ID, + TASK_ID, + TASK_SUBMIT_TIME, + TASK_RUNTIME, + TASK_REQ_NCPUS, + TASK_ALLOC_NCPUS, + TASK_PARENTS, + ), + listOf(TASK_WORKFLOW_ID) + ) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newReader(path: Path, table: String): TableReader { + return when (table) { + TABLE_TASKS -> GwfTaskTableReader(factory.createParser(path.toFile())) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newWriter(path: Path, table: String): TableWriter { + throw UnsupportedOperationException("Writing not supported for this format") } } diff --git a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt index b209b979..7fe403b2 100644 --- a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt @@ -22,13 +22,10 @@ package org.opendc.trace.gwf +import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.* -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertAll -import org.junit.jupiter.api.assertDoesNotThrow -import org.junit.jupiter.api.assertThrows import org.opendc.trace.* -import java.net.URL +import java.nio.file.Paths import java.time.Duration import java.time.Instant @@ -36,59 +33,32 @@ import java.time.Instant * Test suite for the [GwfTraceFormat] class. */ internal class GwfTraceFormatTest { - @Test - fun testTraceExists() { - val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")) - val format = GwfTraceFormat() - assertDoesNotThrow { - format.open(input) - } - } - - @Test - fun testTraceDoesNotExists() { - val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")) - val format = GwfTraceFormat() - assertThrows<IllegalArgumentException> { - format.open(URL(input.toString() + "help")) - } - } + private val format = GwfTraceFormat() @Test fun testTables() { - val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")) - val format = GwfTraceFormat() - val trace = format.open(input) + val path = Paths.get(checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")).toURI()) - assertEquals(listOf(TABLE_TASKS), trace.tables) + assertEquals(listOf(TABLE_TASKS), format.getTables(path)) } @Test fun testTableExists() { - val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")) - val format = GwfTraceFormat() - val table = format.open(input).getTable(TABLE_TASKS) - - assertNotNull(table) - assertDoesNotThrow { table!!.newReader() } + val path = Paths.get(checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")).toURI()) + assertDoesNotThrow { format.getDetails(path, TABLE_TASKS) } } @Test fun testTableDoesNotExist() { - val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")) - val format = GwfTraceFormat() - val trace = format.open(input) + val path = Paths.get(checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")).toURI()) - assertFalse(trace.containsTable("test")) - assertNull(trace.getTable("test")) + assertThrows<IllegalArgumentException> { format.getDetails(path, "test") } } @Test fun testTableReader() { - val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")) - val format = GwfTraceFormat() - val table = format.open(input).getTable(TABLE_TASKS)!! - val reader = table.newReader() + val path = Paths.get(checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")).toURI()) + val reader = format.newReader(path, TABLE_TASKS) assertAll( { assertTrue(reader.nextRow()) }, @@ -99,13 +69,4 @@ internal class GwfTraceFormatTest { { assertEquals(emptySet<String>(), reader.get(TASK_PARENTS)) }, ) } - - @Test - fun testTableReaderPartition() { - val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")) - val format = GwfTraceFormat() - val table = format.open(input).getTable(TABLE_TASKS)!! - - assertThrows<IllegalArgumentException> { table.newReader("test") } - } } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt deleted file mode 100644 index bee4ba7e..00000000 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.opendc - -import org.apache.avro.generic.GenericRecord -import org.opendc.trace.* -import org.opendc.trace.util.parquet.LocalParquetReader -import java.nio.file.Path - -/** - * The resource state [Table] in the OpenDC virtual machine trace format. - */ -internal class OdcVmResourceStateTable(private val path: Path) : Table { - override val name: String = TABLE_RESOURCE_STATES - override val isSynthetic: Boolean = false - - override val columns: List<TableColumn<*>> = listOf( - RESOURCE_STATE_ID, - RESOURCE_STATE_TIMESTAMP, - RESOURCE_STATE_DURATION, - RESOURCE_STATE_CPU_COUNT, - RESOURCE_STATE_CPU_USAGE, - ) - - override fun newReader(): TableReader { - val reader = LocalParquetReader<GenericRecord>(path.resolve("trace.parquet")) - return OdcVmResourceStateTableReader(reader) - } - - override fun newReader(partition: String): TableReader { - throw IllegalArgumentException("Unknown partition $partition") - } -} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt index df3bcfa6..b5043f82 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt @@ -55,54 +55,46 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea return record != null } - override fun hasColumn(column: TableColumn<*>): Boolean { - return when (column) { - RESOURCE_STATE_ID -> true - RESOURCE_STATE_TIMESTAMP -> true - RESOURCE_STATE_DURATION -> true - RESOURCE_STATE_CPU_COUNT -> true - RESOURCE_STATE_CPU_USAGE -> true - else -> false - } + override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + + override fun isNull(index: Int): Boolean { + check(index in 0..columns.size) { "Invalid column index" } + return get(index) == null } - override fun <T> get(column: TableColumn<T>): T { + override fun get(index: Int): Any? { val record = checkNotNull(record) { "Reader in invalid state" } - @Suppress("UNCHECKED_CAST") - val res: Any = when (column) { - RESOURCE_STATE_ID -> record[COL_ID].toString() - RESOURCE_STATE_TIMESTAMP -> Instant.ofEpochMilli(record[COL_TIMESTAMP] as Long) - RESOURCE_STATE_DURATION -> Duration.ofMillis(record[COL_DURATION] as Long) - RESOURCE_STATE_CPU_COUNT -> getInt(RESOURCE_STATE_CPU_COUNT) - RESOURCE_STATE_CPU_USAGE -> getDouble(RESOURCE_STATE_CPU_USAGE) + return when (index) { + COL_ID -> record[AVRO_COL_ID].toString() + COL_TIMESTAMP -> Instant.ofEpochMilli(record[AVRO_COL_TIMESTAMP] as Long) + COL_DURATION -> Duration.ofMillis(record[AVRO_COL_DURATION] as Long) + COL_CPU_COUNT -> getInt(index) + COL_CPU_USAGE -> getDouble(index) else -> throw IllegalArgumentException("Invalid column") } - - @Suppress("UNCHECKED_CAST") - return res as T } - override fun getBoolean(column: TableColumn<Boolean>): Boolean { + override fun getBoolean(index: Int): Boolean { throw IllegalArgumentException("Invalid column") } - override fun getInt(column: TableColumn<Int>): Int { + override fun getInt(index: Int): Int { val record = checkNotNull(record) { "Reader in invalid state" } - return when (column) { - RESOURCE_STATE_CPU_COUNT -> record[COL_CPU_COUNT] as Int + return when (index) { + COL_CPU_COUNT -> record[AVRO_COL_CPU_COUNT] as Int else -> throw IllegalArgumentException("Invalid column") } } - override fun getLong(column: TableColumn<Long>): Long { + override fun getLong(index: Int): Long { throw IllegalArgumentException("Invalid column") } - override fun getDouble(column: TableColumn<Double>): Double { + override fun getDouble(index: Int): Double { val record = checkNotNull(record) { "Reader in invalid state" } - return when (column) { - RESOURCE_STATE_CPU_USAGE -> (record[COL_CPU_USAGE] as Number).toDouble() + return when (index) { + COL_CPU_USAGE -> (record[AVRO_COL_CPU_USAGE] as Number).toDouble() else -> throw IllegalArgumentException("Invalid column") } } @@ -118,20 +110,34 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea */ private fun initColumns(schema: Schema) { try { - COL_ID = schema.getField("id").pos() - COL_TIMESTAMP = (schema.getField("timestamp") ?: schema.getField("time")).pos() - COL_DURATION = schema.getField("duration").pos() - COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("cores")).pos() - COL_CPU_USAGE = (schema.getField("cpu_usage") ?: schema.getField("cpuUsage")).pos() + AVRO_COL_ID = schema.getField("id").pos() + AVRO_COL_TIMESTAMP = (schema.getField("timestamp") ?: schema.getField("time")).pos() + AVRO_COL_DURATION = schema.getField("duration").pos() + AVRO_COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("cores")).pos() + AVRO_COL_CPU_USAGE = (schema.getField("cpu_usage") ?: schema.getField("cpuUsage")).pos() } catch (e: NullPointerException) { // This happens when the field we are trying to access does not exist throw IllegalArgumentException("Invalid schema", e) } } - private var COL_ID = -1 - private var COL_TIMESTAMP = -1 - private var COL_DURATION = -1 - private var COL_CPU_COUNT = -1 - private var COL_CPU_USAGE = -1 + private var AVRO_COL_ID = -1 + private var AVRO_COL_TIMESTAMP = -1 + private var AVRO_COL_DURATION = -1 + private var AVRO_COL_CPU_COUNT = -1 + private var AVRO_COL_CPU_USAGE = -1 + + private val COL_ID = 0 + private val COL_TIMESTAMP = 1 + private val COL_DURATION = 2 + private val COL_CPU_COUNT = 3 + private val COL_CPU_USAGE = 4 + + private val columns = mapOf( + RESOURCE_ID to COL_ID, + RESOURCE_STATE_TIMESTAMP to COL_TIMESTAMP, + RESOURCE_STATE_DURATION to COL_DURATION, + RESOURCE_CPU_COUNT to COL_CPU_COUNT, + RESOURCE_STATE_CPU_USAGE to COL_CPU_USAGE, + ) } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt new file mode 100644 index 00000000..15a8cb85 --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.opendc + +import org.apache.avro.Schema +import org.apache.avro.generic.GenericRecord +import org.apache.avro.generic.GenericRecordBuilder +import org.apache.parquet.hadoop.ParquetWriter +import org.opendc.trace.* +import java.time.Duration +import java.time.Instant + +/** + * A [TableWriter] implementation for the OpenDC virtual machine trace format. + */ +internal class OdcVmResourceStateTableWriter( + private val writer: ParquetWriter<GenericRecord>, + private val schema: Schema +) : TableWriter { + /** + * The current builder for the record that is being written. + */ + private var builder: GenericRecordBuilder? = null + + /** + * The fields belonging to the resource state schema. + */ + private val fields = schema.fields + + override fun startRow() { + builder = GenericRecordBuilder(schema) + } + + override fun endRow() { + val builder = checkNotNull(builder) { "No active row" } + this.builder = null + + val record = builder.build() + val id = record[COL_ID] as String + val timestamp = record[COL_TIMESTAMP] as Long + + check(lastId != id || timestamp >= lastTimestamp) { "Records need to be ordered by (id, timestamp)" } + + writer.write(builder.build()) + + lastId = id + lastTimestamp = timestamp + } + + override fun resolve(column: TableColumn<*>): Int { + val schema = schema + return when (column) { + RESOURCE_ID -> schema.getField("id").pos() + RESOURCE_STATE_TIMESTAMP -> (schema.getField("timestamp") ?: schema.getField("time")).pos() + RESOURCE_STATE_DURATION -> schema.getField("duration").pos() + RESOURCE_CPU_COUNT -> (schema.getField("cpu_count") ?: schema.getField("cores")).pos() + RESOURCE_STATE_CPU_USAGE -> (schema.getField("cpu_usage") ?: schema.getField("cpuUsage")).pos() + else -> -1 + } + } + + override fun set(index: Int, value: Any) { + val builder = checkNotNull(builder) { "No active row" } + + builder.set( + fields[index], + when (index) { + COL_TIMESTAMP -> (value as Instant).toEpochMilli() + COL_DURATION -> (value as Duration).toMillis() + else -> value + } + ) + } + + override fun setBoolean(index: Int, value: Boolean) = set(index, value) + + override fun setInt(index: Int, value: Int) = set(index, value) + + override fun setLong(index: Int, value: Long) = set(index, value) + + override fun setDouble(index: Int, value: Double) = set(index, value) + + override fun flush() { + // Not available + } + + override fun close() { + writer.close() + } + + /** + * Last column values that are used to check for correct partitioning. + */ + private var lastId: String? = null + private var lastTimestamp: Long = Long.MIN_VALUE + + /** + * Columns with special behavior. + */ + private val COL_ID = resolve(RESOURCE_ID) + private val COL_TIMESTAMP = resolve(RESOURCE_STATE_TIMESTAMP) + private val COL_DURATION = resolve(RESOURCE_STATE_DURATION) +} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt deleted file mode 100644 index b1456560..00000000 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.opendc - -import org.apache.avro.generic.GenericRecord -import org.opendc.trace.* -import org.opendc.trace.util.parquet.LocalParquetReader -import java.nio.file.Path - -/** - * The resource [Table] for the OpenDC virtual machine trace format. - */ -internal class OdcVmResourceTable(private val path: Path) : Table { - override val name: String = TABLE_RESOURCES - override val isSynthetic: Boolean = false - - override val columns: List<TableColumn<*>> = listOf( - RESOURCE_ID, - RESOURCE_START_TIME, - RESOURCE_STOP_TIME, - RESOURCE_CPU_COUNT, - RESOURCE_MEM_CAPACITY, - ) - - override fun newReader(): TableReader { - val reader = LocalParquetReader<GenericRecord>(path.resolve("meta.parquet")) - return OdcVmResourceTableReader(reader) - } - - override fun newReader(partition: String): TableReader { - throw IllegalArgumentException("Unknown partition $partition") - } -} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt index c52da62d..d93929aa 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt @@ -54,56 +54,48 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G return record != null } - override fun hasColumn(column: TableColumn<*>): Boolean { - return when (column) { - RESOURCE_ID -> true - RESOURCE_START_TIME -> true - RESOURCE_STOP_TIME -> true - RESOURCE_CPU_COUNT -> true - RESOURCE_MEM_CAPACITY -> true - else -> false - } + override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + + override fun isNull(index: Int): Boolean { + check(index in 0..columns.size) { "Invalid column index" } + return get(index) == null } - override fun <T> get(column: TableColumn<T>): T { + override fun get(index: Int): Any? { val record = checkNotNull(record) { "Reader in invalid state" } - @Suppress("UNCHECKED_CAST") - val res: Any = when (column) { - RESOURCE_ID -> record[COL_ID].toString() - RESOURCE_START_TIME -> Instant.ofEpochMilli(record[COL_START_TIME] as Long) - RESOURCE_STOP_TIME -> Instant.ofEpochMilli(record[COL_STOP_TIME] as Long) - RESOURCE_CPU_COUNT -> getInt(RESOURCE_CPU_COUNT) - RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_MEM_CAPACITY) + return when (index) { + COL_ID -> record[AVRO_COL_ID].toString() + COL_START_TIME -> Instant.ofEpochMilli(record[AVRO_COL_START_TIME] as Long) + COL_STOP_TIME -> Instant.ofEpochMilli(record[AVRO_COL_STOP_TIME] as Long) + COL_CPU_COUNT -> getInt(index) + COL_MEM_CAPACITY -> getDouble(index) else -> throw IllegalArgumentException("Invalid column") } - - @Suppress("UNCHECKED_CAST") - return res as T } - override fun getBoolean(column: TableColumn<Boolean>): Boolean { + override fun getBoolean(index: Int): Boolean { throw IllegalArgumentException("Invalid column") } - override fun getInt(column: TableColumn<Int>): Int { + override fun getInt(index: Int): Int { val record = checkNotNull(record) { "Reader in invalid state" } - return when (column) { - RESOURCE_CPU_COUNT -> record[COL_CPU_COUNT] as Int + return when (index) { + COL_CPU_COUNT -> record[AVRO_COL_CPU_COUNT] as Int else -> throw IllegalArgumentException("Invalid column") } } - override fun getLong(column: TableColumn<Long>): Long { + override fun getLong(index: Int): Long { throw IllegalArgumentException("Invalid column") } - override fun getDouble(column: TableColumn<Double>): Double { + override fun getDouble(index: Int): Double { val record = checkNotNull(record) { "Reader in invalid state" } - return when (column) { - RESOURCE_MEM_CAPACITY -> (record[COL_MEM_CAPACITY] as Number).toDouble() + return when (index) { + COL_MEM_CAPACITY -> (record[AVRO_COL_MEM_CAPACITY] as Number).toDouble() else -> throw IllegalArgumentException("Invalid column") } } @@ -119,20 +111,34 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G */ private fun initColumns(schema: Schema) { try { - COL_ID = schema.getField("id").pos() - COL_START_TIME = (schema.getField("start_time") ?: schema.getField("submissionTime")).pos() - COL_STOP_TIME = (schema.getField("stop_time") ?: schema.getField("endTime")).pos() - COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("maxCores")).pos() - COL_MEM_CAPACITY = (schema.getField("mem_capacity") ?: schema.getField("requiredMemory")).pos() + AVRO_COL_ID = schema.getField("id").pos() + AVRO_COL_START_TIME = (schema.getField("start_time") ?: schema.getField("submissionTime")).pos() + AVRO_COL_STOP_TIME = (schema.getField("stop_time") ?: schema.getField("endTime")).pos() + AVRO_COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("maxCores")).pos() + AVRO_COL_MEM_CAPACITY = (schema.getField("mem_capacity") ?: schema.getField("requiredMemory")).pos() } catch (e: NullPointerException) { // This happens when the field we are trying to access does not exist throw IllegalArgumentException("Invalid schema") } } - private var COL_ID = -1 - private var COL_START_TIME = -1 - private var COL_STOP_TIME = -1 - private var COL_CPU_COUNT = -1 - private var COL_MEM_CAPACITY = -1 + private var AVRO_COL_ID = -1 + private var AVRO_COL_START_TIME = -1 + private var AVRO_COL_STOP_TIME = -1 + private var AVRO_COL_CPU_COUNT = -1 + private var AVRO_COL_MEM_CAPACITY = -1 + + private val COL_ID = 0 + private val COL_START_TIME = 1 + private val COL_STOP_TIME = 2 + private val COL_CPU_COUNT = 3 + private val COL_MEM_CAPACITY = 4 + + private val columns = mapOf( + RESOURCE_ID to COL_ID, + RESOURCE_START_TIME to COL_START_TIME, + RESOURCE_STOP_TIME to COL_STOP_TIME, + RESOURCE_CPU_COUNT to COL_CPU_COUNT, + RESOURCE_MEM_CAPACITY to COL_MEM_CAPACITY, + ) } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt new file mode 100644 index 00000000..9cc6ca7d --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt @@ -0,0 +1,106 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.opendc + +import org.apache.avro.Schema +import org.apache.avro.generic.GenericRecord +import org.apache.avro.generic.GenericRecordBuilder +import org.apache.parquet.hadoop.ParquetWriter +import org.opendc.trace.* +import java.time.Instant +import kotlin.math.roundToLong + +/** + * A [TableWriter] implementation for the OpenDC virtual machine trace format. + */ +internal class OdcVmResourceTableWriter( + private val writer: ParquetWriter<GenericRecord>, + private val schema: Schema +) : TableWriter { + /** + * The current builder for the record that is being written. + */ + private var builder: GenericRecordBuilder? = null + + /** + * The fields belonging to the resource schema. + */ + private val fields = schema.fields + + override fun startRow() { + builder = GenericRecordBuilder(schema) + } + + override fun endRow() { + val builder = checkNotNull(builder) { "No active row" } + this.builder = null + writer.write(builder.build()) + } + + override fun resolve(column: TableColumn<*>): Int { + val schema = schema + return when (column) { + RESOURCE_ID -> schema.getField("id").pos() + RESOURCE_START_TIME -> (schema.getField("start_time") ?: schema.getField("submissionTime")).pos() + RESOURCE_STOP_TIME -> (schema.getField("stop_time") ?: schema.getField("endTime")).pos() + RESOURCE_CPU_COUNT -> (schema.getField("cpu_count") ?: schema.getField("maxCores")).pos() + RESOURCE_MEM_CAPACITY -> (schema.getField("mem_capacity") ?: schema.getField("requiredMemory")).pos() + else -> -1 + } + } + + override fun set(index: Int, value: Any) { + val builder = checkNotNull(builder) { "No active row" } + builder.set( + fields[index], + when (index) { + COL_START_TIME, COL_STOP_TIME -> (value as Instant).toEpochMilli() + COL_MEM_CAPACITY -> (value as Double).roundToLong() + else -> value + } + ) + } + + override fun setBoolean(index: Int, value: Boolean) = set(index, value) + + override fun setInt(index: Int, value: Int) = set(index, value) + + override fun setLong(index: Int, value: Long) = set(index, value) + + override fun setDouble(index: Int, value: Double) = set(index, value) + + override fun flush() { + // Not available + } + + override fun close() { + writer.close() + } + + /** + * Columns with special behavior. + */ + private val COL_START_TIME = resolve(RESOURCE_START_TIME) + private val COL_STOP_TIME = resolve(RESOURCE_STOP_TIME) + private val COL_MEM_CAPACITY = resolve(RESOURCE_MEM_CAPACITY) +} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt index 8edba725..9b32f8fd 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt @@ -24,11 +24,18 @@ package org.opendc.trace.opendc import org.apache.avro.Schema import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericRecord +import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.ParquetFileWriter +import org.apache.parquet.hadoop.metadata.CompressionCodecName +import org.opendc.trace.* +import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat +import org.opendc.trace.util.parquet.LocalOutputFile +import org.opendc.trace.util.parquet.LocalParquetReader import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA -import java.net.URL -import java.nio.file.Paths -import kotlin.io.path.exists +import java.nio.file.Files +import java.nio.file.Path /** * A [TraceFormat] implementation of the OpenDC virtual machine trace format. @@ -39,13 +46,83 @@ public class OdcVmTraceFormat : TraceFormat { */ override val name: String = "opendc-vm" - /** - * Open a Bitbrains Parquet trace. - */ - override fun open(url: URL): OdcVmTrace { - val path = Paths.get(url.toURI()) - require(path.exists()) { "URL $url does not exist" } - return OdcVmTrace(path) + override fun create(path: Path) { + // Construct directory containing the trace files + Files.createDirectory(path) + + val tables = getTables(path) + + for (table in tables) { + val writer = newWriter(path, table) + writer.close() + } + } + + override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) + + override fun getDetails(path: Path, table: String): TableDetails { + return when (table) { + TABLE_RESOURCES -> TableDetails( + listOf( + RESOURCE_ID, + RESOURCE_START_TIME, + RESOURCE_STOP_TIME, + RESOURCE_CPU_COUNT, + RESOURCE_MEM_CAPACITY, + ) + ) + TABLE_RESOURCE_STATES -> TableDetails( + listOf( + RESOURCE_ID, + RESOURCE_STATE_TIMESTAMP, + RESOURCE_STATE_DURATION, + RESOURCE_CPU_COUNT, + RESOURCE_STATE_CPU_USAGE, + ), + listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP) + ) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newReader(path: Path, table: String): TableReader { + return when (table) { + TABLE_RESOURCES -> { + val reader = LocalParquetReader<GenericRecord>(path.resolve("meta.parquet")) + OdcVmResourceTableReader(reader) + } + TABLE_RESOURCE_STATES -> { + val reader = LocalParquetReader<GenericRecord>(path.resolve("trace.parquet")) + OdcVmResourceStateTableReader(reader) + } + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newWriter(path: Path, table: String): TableWriter { + return when (table) { + TABLE_RESOURCES -> { + val schema = RESOURCES_SCHEMA + val writer = AvroParquetWriter.builder<GenericRecord>(LocalOutputFile(path.resolve("meta.parquet"))) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.ZSTD) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .build() + OdcVmResourceTableWriter(writer, schema) + } + TABLE_RESOURCE_STATES -> { + val schema = RESOURCE_STATES_SCHEMA + val writer = AvroParquetWriter.builder<GenericRecord>(LocalOutputFile(path.resolve("trace.parquet"))) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.ZSTD) + .withDictionaryEncoding("id", true) + .withBloomFilterEnabled("id", true) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .build() + OdcVmResourceStateTableWriter(writer, schema) + } + else -> throw IllegalArgumentException("Table $table not supported") + } } public companion object { diff --git a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt index 42eb369e..bfe0f881 100644 --- a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt @@ -29,8 +29,7 @@ import org.junit.jupiter.api.assertThrows import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource import org.opendc.trace.* -import java.io.File -import java.net.URL +import java.nio.file.Paths /** * Test suite for the [OdcVmTraceFormat] implementation. @@ -39,52 +38,30 @@ internal class OdcVmTraceFormatTest { private val format = OdcVmTraceFormat() @Test - fun testTraceExists() { - val url = File("src/test/resources/trace-v2.1").toURI().toURL() - assertDoesNotThrow { format.open(url) } - } - - @Test - fun testTraceDoesNotExists() { - val url = File("src/test/resources/trace-v2.1").toURI().toURL() - assertThrows<IllegalArgumentException> { - format.open(URL(url.toString() + "help")) - } - } - - @Test fun testTables() { - val url = File("src/test/resources/trace-v2.1").toURI().toURL() - val trace = format.open(url) + val path = Paths.get("src/test/resources/trace-v2.1") - assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), trace.tables) + assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), format.getTables(path)) } @Test fun testTableExists() { - val url = File("src/test/resources/trace-v2.1").toURI().toURL() - val table = format.open(url).getTable(TABLE_RESOURCE_STATES) + val path = Paths.get("src/test/resources/trace-v2.1") - assertNotNull(table) - assertDoesNotThrow { table!!.newReader() } + assertDoesNotThrow { format.getDetails(path, TABLE_RESOURCE_STATES) } } @Test fun testTableDoesNotExist() { - val url = File("src/test/resources/trace-v2.1").toURI().toURL() - val trace = format.open(url) - - assertFalse(trace.containsTable("test")) - assertNull(trace.getTable("test")) + val path = Paths.get("src/test/resources/trace-v2.1") + assertThrows<IllegalArgumentException> { format.getDetails(path, "test") } } @ParameterizedTest @ValueSource(strings = ["trace-v2.0", "trace-v2.1"]) fun testResources(name: String) { - val url = File("src/test/resources/$name").toURI().toURL() - val trace = format.open(url) - - val reader = trace.getTable(TABLE_RESOURCES)!!.newReader() + val path = Paths.get("src/test/resources/$name") + val reader = format.newReader(path, TABLE_RESOURCES) assertAll( { assertTrue(reader.nextRow()) }, @@ -104,14 +81,12 @@ internal class OdcVmTraceFormatTest { @ParameterizedTest @ValueSource(strings = ["trace-v2.0", "trace-v2.1"]) fun testSmoke(name: String) { - val url = File("src/test/resources/$name").toURI().toURL() - val trace = format.open(url) - - val reader = trace.getTable(TABLE_RESOURCE_STATES)!!.newReader() + val path = Paths.get("src/test/resources/$name") + val reader = format.newReader(path, TABLE_RESOURCE_STATES) assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("1019", reader.get(RESOURCE_STATE_ID)) }, + { assertEquals("1019", reader.get(RESOURCE_ID)) }, { assertEquals(1376314846, reader.get(RESOURCE_STATE_TIMESTAMP).epochSecond) }, { assertEquals(0.0, reader.getDouble(RESOURCE_STATE_CPU_USAGE), 0.01) } ) diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt deleted file mode 100644 index 7ec0d607..00000000 --- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.swf - -import org.opendc.trace.* -import java.nio.file.Path -import kotlin.io.path.bufferedReader - -/** - * A [Table] containing the tasks in a SWF trace. - */ -internal class SwfTaskTable(private val path: Path) : Table { - override val name: String = TABLE_TASKS - - override val isSynthetic: Boolean = false - - override val columns: List<TableColumn<*>> = listOf( - TASK_ID, - TASK_SUBMIT_TIME, - TASK_WAIT_TIME, - TASK_RUNTIME, - TASK_REQ_NCPUS, - TASK_ALLOC_NCPUS, - TASK_PARENTS, - TASK_STATUS, - TASK_GROUP_ID, - TASK_USER_ID - ) - - override fun newReader(): TableReader { - val reader = path.bufferedReader() - return SwfTaskTableReader(reader) - } - - override fun newReader(partition: String): TableReader { - throw IllegalArgumentException("Invalid partition $partition") - } - - override fun toString(): String = "SwfTaskTable" -} diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt index 3f49c770..2f6ea6ee 100644 --- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt +++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt @@ -69,64 +69,43 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea return true } - override fun hasColumn(column: TableColumn<*>): Boolean { - return when (column) { - TASK_ID -> true - TASK_SUBMIT_TIME -> true - TASK_WAIT_TIME -> true - TASK_RUNTIME -> true - TASK_REQ_NCPUS -> true - TASK_ALLOC_NCPUS -> true - TASK_PARENTS -> true - TASK_STATUS -> true - TASK_GROUP_ID -> true - TASK_USER_ID -> true - else -> false - } + override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + + override fun isNull(index: Int): Boolean { + require(index in columns.values) { "Invalid column index" } + return false } - override fun <T> get(column: TableColumn<T>): T { - val res: Any = when (column) { - TASK_ID -> fields[COL_JOB_ID] - TASK_SUBMIT_TIME -> Instant.ofEpochSecond(fields[COL_SUBMIT_TIME].toLong(10)) - TASK_WAIT_TIME -> Duration.ofSeconds(fields[COL_WAIT_TIME].toLong(10)) - TASK_RUNTIME -> Duration.ofSeconds(fields[COL_RUN_TIME].toLong(10)) - TASK_REQ_NCPUS -> getInt(TASK_REQ_NCPUS) - TASK_ALLOC_NCPUS -> getInt(TASK_ALLOC_NCPUS) - TASK_PARENTS -> { - val parent = fields[COL_PARENT_JOB].toLong(10) + override fun get(index: Int): Any? { + return when (index) { + COL_JOB_ID -> fields[index] + COL_SUBMIT_TIME -> Instant.ofEpochSecond(fields[index].toLong(10)) + COL_WAIT_TIME, COL_RUN_TIME -> Duration.ofSeconds(fields[index].toLong(10)) + COL_REQ_NCPUS, COL_ALLOC_NCPUS, COL_STATUS, COL_GROUP_ID, COL_USER_ID -> getInt(index) + COL_PARENT_JOB -> { + val parent = fields[index].toLong(10) if (parent < 0) emptySet() else setOf(parent) } - TASK_STATUS -> getInt(TASK_STATUS) - TASK_GROUP_ID -> getInt(TASK_GROUP_ID) - TASK_USER_ID -> getInt(TASK_USER_ID) else -> throw IllegalArgumentException("Invalid column") } - - @Suppress("UNCHECKED_CAST") - return res as T } - override fun getBoolean(column: TableColumn<Boolean>): Boolean { + override fun getBoolean(index: Int): Boolean { throw IllegalArgumentException("Invalid column") } - override fun getInt(column: TableColumn<Int>): Int { - return when (column) { - TASK_REQ_NCPUS -> fields[COL_REQ_NCPUS].toInt(10) - TASK_ALLOC_NCPUS -> fields[COL_ALLOC_NCPUS].toInt(10) - TASK_STATUS -> fields[COL_STATUS].toInt(10) - TASK_GROUP_ID -> fields[COL_GROUP_ID].toInt(10) - TASK_USER_ID -> fields[COL_USER_ID].toInt(10) + override fun getInt(index: Int): Int { + return when (index) { + COL_REQ_NCPUS, COL_ALLOC_NCPUS, COL_STATUS, COL_GROUP_ID, COL_USER_ID -> fields[index].toInt(10) else -> throw IllegalArgumentException("Invalid column") } } - override fun getLong(column: TableColumn<Long>): Long { + override fun getLong(index: Int): Long { throw IllegalArgumentException("Invalid column") } - override fun getDouble(column: TableColumn<Double>): Double { + override fun getDouble(index: Int): Double { throw IllegalArgumentException("Invalid column") } @@ -155,4 +134,17 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea private val COL_PART_NUM = 15 private val COL_PARENT_JOB = 16 private val COL_PARENT_THINK_TIME = 17 + + private val columns = mapOf( + TASK_ID to COL_JOB_ID, + TASK_SUBMIT_TIME to COL_SUBMIT_TIME, + TASK_WAIT_TIME to COL_WAIT_TIME, + TASK_RUNTIME to COL_RUN_TIME, + TASK_ALLOC_NCPUS to COL_ALLOC_NCPUS, + TASK_REQ_NCPUS to COL_REQ_NCPUS, + TASK_STATUS to COL_STATUS, + TASK_USER_ID to COL_USER_ID, + TASK_GROUP_ID to COL_GROUP_ID, + TASK_PARENTS to COL_PARENT_JOB + ) } diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt index 36c3122e..1fd076d5 100644 --- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt +++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt @@ -22,10 +22,11 @@ package org.opendc.trace.swf +import org.opendc.trace.* +import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat -import java.net.URL -import java.nio.file.Paths -import kotlin.io.path.exists +import java.nio.file.Path +import kotlin.io.path.bufferedReader /** * Support for the Standard Workload Format (SWF) in OpenDC. @@ -35,9 +36,41 @@ import kotlin.io.path.exists public class SwfTraceFormat : TraceFormat { override val name: String = "swf" - override fun open(url: URL): SwfTrace { - val path = Paths.get(url.toURI()) - require(path.exists()) { "URL $url does not exist" } - return SwfTrace(path) + override fun create(path: Path) { + throw UnsupportedOperationException("Writing not supported for this format") + } + + override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS) + + override fun getDetails(path: Path, table: String): TableDetails { + return when (table) { + TABLE_TASKS -> TableDetails( + listOf( + TASK_ID, + TASK_SUBMIT_TIME, + TASK_WAIT_TIME, + TASK_RUNTIME, + TASK_REQ_NCPUS, + TASK_ALLOC_NCPUS, + TASK_PARENTS, + TASK_STATUS, + TASK_GROUP_ID, + TASK_USER_ID + ), + emptyList() + ) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newReader(path: Path, table: String): TableReader { + return when (table) { + TABLE_TASKS -> SwfTaskTableReader(path.bufferedReader()) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newWriter(path: Path, table: String): TableWriter { + throw UnsupportedOperationException("Writing not supported for this format") } } diff --git a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt index 828c2bfa..4dcd43f6 100644 --- a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt @@ -27,61 +27,38 @@ import org.junit.jupiter.api.Assertions.* import org.opendc.trace.TABLE_TASKS import org.opendc.trace.TASK_ALLOC_NCPUS import org.opendc.trace.TASK_ID -import java.net.URL +import java.nio.file.Paths /** * Test suite for the [SwfTraceFormat] class. */ internal class SwfTraceFormatTest { - @Test - fun testTraceExists() { - val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")) - val format = SwfTraceFormat() - assertDoesNotThrow { - format.open(input) - } - } - - @Test - fun testTraceDoesNotExists() { - val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")) - val format = SwfTraceFormat() - assertThrows<IllegalArgumentException> { - format.open(URL(input.toString() + "help")) - } - } + private val format = SwfTraceFormat() @Test fun testTables() { - val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")) - val trace = SwfTraceFormat().open(input) + val path = Paths.get(checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")).toURI()) - assertEquals(listOf(TABLE_TASKS), trace.tables) + assertEquals(listOf(TABLE_TASKS), format.getTables(path)) } @Test fun testTableExists() { - val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")) - val table = SwfTraceFormat().open(input).getTable(TABLE_TASKS) - - assertNotNull(table) - assertDoesNotThrow { table!!.newReader() } + val path = Paths.get(checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")).toURI()) + assertDoesNotThrow { format.getDetails(path, TABLE_TASKS) } } @Test fun testTableDoesNotExist() { - val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")) - val trace = SwfTraceFormat().open(input) + val path = Paths.get(checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")).toURI()) - assertFalse(trace.containsTable("test")) - assertNull(trace.getTable("test")) + assertThrows<IllegalArgumentException> { format.getDetails(path, "test") } } @Test fun testReader() { - val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")) - val trace = SwfTraceFormat().open(input) - val reader = trace.getTable(TABLE_TASKS)!!.newReader() + val path = Paths.get(checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")).toURI()) + val reader = format.newReader(path, TABLE_TASKS) assertAll( { assertTrue(reader.nextRow()) }, @@ -94,14 +71,4 @@ internal class SwfTraceFormatTest { reader.close() } - - @Test - fun testReaderPartition() { - val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")) - val trace = SwfTraceFormat().open(input) - - assertThrows<IllegalArgumentException> { - trace.getTable(TABLE_TASKS)!!.newReader("test") - } - } } diff --git a/opendc-trace/opendc-trace-tools/build.gradle.kts b/opendc-trace/opendc-trace-tools/build.gradle.kts index 35190dba..14a0fc7c 100644 --- a/opendc-trace/opendc-trace-tools/build.gradle.kts +++ b/opendc-trace/opendc-trace-tools/build.gradle.kts @@ -29,19 +29,18 @@ plugins { } application { - mainClass.set("org.opendc.trace.tools.TraceConverterKt") + mainClass.set("org.opendc.trace.tools.TraceConverter") } dependencies { api(platform(projects.opendcPlatform)) - implementation(projects.opendcTrace.opendcTraceParquet) - implementation(projects.opendcTrace.opendcTraceOpendc) - implementation(projects.opendcTrace.opendcTraceAzure) - implementation(projects.opendcTrace.opendcTraceBitbrains) - + implementation(projects.opendcTrace.opendcTraceApi) implementation(libs.kotlin.logging) implementation(libs.clikt) + runtimeOnly(projects.opendcTrace.opendcTraceOpendc) + runtimeOnly(projects.opendcTrace.opendcTraceBitbrains) + runtimeOnly(projects.opendcTrace.opendcTraceAzure) runtimeOnly(libs.log4j.slf4j) } diff --git a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt index 322464cd..6fad43be 100644 --- a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt +++ b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt @@ -20,6 +20,7 @@ * SOFTWARE. */ +@file:JvmName("TraceConverter") package org.opendc.trace.tools import com.github.ajalt.clikt.core.CliktCommand @@ -29,28 +30,19 @@ import com.github.ajalt.clikt.parameters.groups.cooccurring import com.github.ajalt.clikt.parameters.options.* import com.github.ajalt.clikt.parameters.types.* import mu.KotlinLogging -import org.apache.avro.generic.GenericData -import org.apache.avro.generic.GenericRecordBuilder -import org.apache.parquet.avro.AvroParquetWriter -import org.apache.parquet.hadoop.ParquetWriter -import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.opendc.trace.* -import org.opendc.trace.azure.AzureTraceFormat -import org.opendc.trace.bitbrains.BitbrainsExTraceFormat -import org.opendc.trace.bitbrains.BitbrainsTraceFormat -import org.opendc.trace.opendc.OdcVmTraceFormat -import org.opendc.trace.util.parquet.LocalOutputFile import java.io.File +import java.time.Duration +import java.time.Instant import java.util.* import kotlin.math.abs import kotlin.math.max import kotlin.math.min -import kotlin.math.roundToLong /** * A script to convert a trace in text format into a Parquet trace. */ -public fun main(args: Array<String>): Unit = TraceConverterCli().main(args) +fun main(args: Array<String>): Unit = TraceConverterCli().main(args) /** * Represents the command for converting traces @@ -77,15 +69,16 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { /** * The input format of the trace. */ - private val format by option("-f", "--format", help = "input format of trace") - .choice( - "solvinity" to BitbrainsExTraceFormat(), - "bitbrains" to BitbrainsTraceFormat(), - "azure" to AzureTraceFormat() - ) + private val inputFormat by option("-f", "--input-format", help = "format of output trace") .required() /** + * The format of the output trace. + */ + private val outputFormat by option("--output-format", help = "format of output trace") + .default("opendc-vm") + + /** * The sampling options. */ private val samplingOptions by SamplingOptions().cooccurring() @@ -101,17 +94,14 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { traceParquet.delete() } - val trace = format.open(input.toURI().toURL()) + val inputTrace = Trace.open(input, format = inputFormat) + val outputTrace = Trace.create(output, format = outputFormat) logger.info { "Building resources table" } - val metaWriter = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(metaParquet)) - .withSchema(OdcVmTraceFormat.RESOURCES_SCHEMA) - .withCompressionCodec(CompressionCodecName.ZSTD) - .enablePageWriteChecksum() - .build() + val metaWriter = outputTrace.getTable(TABLE_RESOURCES)!!.newWriter() - val selectedVms = metaWriter.use { convertResources(trace, it) } + val selectedVms = metaWriter.use { convertResources(inputTrace, it) } if (selectedVms.isEmpty()) { logger.warn { "No VMs selected" } @@ -121,23 +111,16 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { logger.info { "Wrote ${selectedVms.size} rows" } logger.info { "Building resource states table" } - val writer = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(traceParquet)) - .withSchema(OdcVmTraceFormat.RESOURCE_STATES_SCHEMA) - .withCompressionCodec(CompressionCodecName.ZSTD) - .withDictionaryEncoding("id", true) - .withBloomFilterEnabled("id", true) - .withBloomFilterNDV("id", selectedVms.size.toLong()) - .enableValidation() - .build() + val writer = outputTrace.getTable(TABLE_RESOURCE_STATES)!!.newWriter() - val statesCount = writer.use { convertResourceStates(trace, it, selectedVms) } + val statesCount = writer.use { convertResourceStates(inputTrace, it, selectedVms) } logger.info { "Wrote $statesCount rows" } } /** * Convert the resources table for the trace. */ - private fun convertResources(trace: Trace, writer: ParquetWriter<GenericData.Record>): Set<String> { + private fun convertResources(trace: Trace, writer: TableWriter): Set<String> { val random = samplingOptions?.let { Random(it.seed) } val samplingFraction = samplingOptions?.fraction ?: 1.0 val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() @@ -154,39 +137,37 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { var stopTime = Long.MIN_VALUE do { - id = reader.get(RESOURCE_STATE_ID) + id = reader.get(RESOURCE_ID) val timestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli() startTime = min(startTime, timestamp) stopTime = max(stopTime, timestamp) - numCpus = max(numCpus, reader.getInt(RESOURCE_STATE_CPU_COUNT)) + numCpus = max(numCpus, reader.getInt(RESOURCE_CPU_COUNT)) - memCapacity = max(memCapacity, reader.getDouble(RESOURCE_STATE_MEM_CAPACITY)) + memCapacity = max(memCapacity, reader.getDouble(RESOURCE_MEM_CAPACITY)) if (reader.hasColumn(RESOURCE_STATE_MEM_USAGE)) { memUsage = max(memUsage, reader.getDouble(RESOURCE_STATE_MEM_USAGE)) } hasNextRow = reader.nextRow() - } while (hasNextRow && id == reader.get(RESOURCE_STATE_ID)) + } while (hasNextRow && id == reader.get(RESOURCE_ID)) // Sample only a fraction of the VMs if (random != null && random.nextDouble() > samplingFraction) { continue } - val builder = GenericRecordBuilder(OdcVmTraceFormat.RESOURCES_SCHEMA) - - builder["id"] = id - builder["start_time"] = startTime - builder["stop_time"] = stopTime - builder["cpu_count"] = numCpus - builder["mem_capacity"] = max(memCapacity, memUsage).roundToLong() - logger.info { "Selecting VM $id" } - - writer.write(builder.build()) selectedVms.add(id) + + writer.startRow() + writer.set(RESOURCE_ID, id) + writer.set(RESOURCE_START_TIME, Instant.ofEpochMilli(startTime)) + writer.set(RESOURCE_STOP_TIME, Instant.ofEpochMilli(stopTime)) + writer.setInt(RESOURCE_CPU_COUNT, numCpus) + writer.setDouble(RESOURCE_MEM_CAPACITY, max(memCapacity, memUsage)) + writer.endRow() } return selectedVms @@ -195,7 +176,7 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { /** * Convert the resource states table for the trace. */ - private fun convertResourceStates(trace: Trace, writer: ParquetWriter<GenericData.Record>, selectedVms: Set<String>): Int { + private fun convertResourceStates(trace: Trace, writer: TableWriter, selectedVms: Set<String>): Int { val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() var hasNextRow = reader.nextRow() @@ -204,14 +185,14 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { var lastTimestamp = 0L while (hasNextRow) { - val id = reader.get(RESOURCE_STATE_ID) + val id = reader.get(RESOURCE_ID) if (id !in selectedVms) { hasNextRow = reader.nextRow() continue } - val cpuCount = reader.getInt(RESOURCE_STATE_CPU_COUNT) + val cpuCount = reader.getInt(RESOURCE_CPU_COUNT) val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE) val startTimestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli() @@ -233,20 +214,18 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { break } - val shouldContinue = id == reader.get(RESOURCE_STATE_ID) && + val shouldContinue = id == reader.get(RESOURCE_ID) && abs(cpuUsage - reader.getDouble(RESOURCE_STATE_CPU_USAGE)) < 0.01 && - cpuCount == reader.getInt(RESOURCE_STATE_CPU_COUNT) + cpuCount == reader.getInt(RESOURCE_CPU_COUNT) } while (shouldContinue) - val builder = GenericRecordBuilder(OdcVmTraceFormat.RESOURCE_STATES_SCHEMA) - - builder["id"] = id - builder["timestamp"] = startTimestamp - builder["duration"] = duration - builder["cpu_count"] = cpuCount - builder["cpu_usage"] = cpuUsage - - writer.write(builder.build()) + writer.startRow() + writer.set(RESOURCE_ID, id) + writer.set(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(startTimestamp)) + writer.set(RESOURCE_STATE_DURATION, Duration.ofMillis(duration)) + writer.setInt(RESOURCE_CPU_COUNT, cpuCount) + writer.setDouble(RESOURCE_STATE_CPU_USAGE, cpuUsage) + writer.endRow() count++ diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt deleted file mode 100644 index 7b7f979f..00000000 --- a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.wfformat - -import com.fasterxml.jackson.core.JsonFactory -import org.opendc.trace.* -import java.nio.file.Path - -/** - * A [Table] containing the tasks in a WfCommons workload trace. - */ -internal class WfFormatTaskTable(private val factory: JsonFactory, private val path: Path) : Table { - override val name: String = TABLE_TASKS - - override val isSynthetic: Boolean = false - - override val columns: List<TableColumn<*>> = listOf( - TASK_ID, - TASK_WORKFLOW_ID, - TASK_RUNTIME, - TASK_REQ_NCPUS, - TASK_PARENTS, - TASK_CHILDREN - ) - - override fun newReader(): TableReader { - val parser = factory.createParser(path.toFile()) - return WfFormatTaskTableReader(parser) - } - - override fun newReader(partition: String): TableReader { - throw IllegalArgumentException("Invalid partition $partition") - } - - override fun toString(): String = "WfFormatTaskTable" -} diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt index 4408ba5c..7f378d80 100644 --- a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt +++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt @@ -94,49 +94,41 @@ internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableRe return hasJob } - override fun hasColumn(column: TableColumn<*>): Boolean { - return when (column) { - TASK_ID -> true - TASK_WORKFLOW_ID -> true - TASK_RUNTIME -> true - TASK_REQ_NCPUS -> true - TASK_PARENTS -> true - TASK_CHILDREN -> true - else -> false - } + override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + + override fun isNull(index: Int): Boolean { + check(index in 0..columns.size) { "Invalid column value" } + return false } - override fun <T> get(column: TableColumn<T>): T { - val res: Any? = when (column) { - TASK_ID -> id - TASK_WORKFLOW_ID -> workflowId - TASK_RUNTIME -> runtime - TASK_PARENTS -> parents - TASK_CHILDREN -> children - TASK_REQ_NCPUS -> getInt(TASK_REQ_NCPUS) + override fun get(index: Int): Any? { + return when (index) { + COL_ID -> id + COL_WORKFLOW_ID -> workflowId + COL_RUNTIME -> runtime + COL_PARENTS -> parents + COL_CHILDREN -> children + COL_NPROC -> getInt(index) else -> throw IllegalArgumentException("Invalid column") } - - @Suppress("UNCHECKED_CAST") - return res as T } - override fun getBoolean(column: TableColumn<Boolean>): Boolean { + override fun getBoolean(index: Int): Boolean { throw IllegalArgumentException("Invalid column") } - override fun getInt(column: TableColumn<Int>): Int { - return when (column) { - TASK_REQ_NCPUS -> cores + override fun getInt(index: Int): Int { + return when (index) { + COL_NPROC -> cores else -> throw IllegalArgumentException("Invalid column") } } - override fun getLong(column: TableColumn<Long>): Long { + override fun getLong(index: Int): Long { throw IllegalArgumentException("Invalid column") } - override fun getDouble(column: TableColumn<Double>): Double { + override fun getDouble(index: Int): Double { throw IllegalArgumentException("Invalid column") } @@ -231,4 +223,20 @@ internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableRe children = null cores = -1 } + + private val COL_ID = 0 + private val COL_WORKFLOW_ID = 1 + private val COL_RUNTIME = 3 + private val COL_NPROC = 4 + private val COL_PARENTS = 5 + private val COL_CHILDREN = 6 + + private val columns = mapOf( + TASK_ID to COL_ID, + TASK_WORKFLOW_ID to COL_WORKFLOW_ID, + TASK_RUNTIME to COL_RUNTIME, + TASK_REQ_NCPUS to COL_NPROC, + TASK_PARENTS to COL_PARENTS, + TASK_CHILDREN to COL_CHILDREN, + ) } diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTrace.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTrace.kt deleted file mode 100644 index 2d9c79fb..00000000 --- a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTrace.kt +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.wfformat - -import com.fasterxml.jackson.core.JsonFactory -import org.opendc.trace.TABLE_TASKS -import org.opendc.trace.Table -import org.opendc.trace.Trace -import java.nio.file.Path - -/** - * [Trace] implementation for the WfCommons workload trace format. - */ -public class WfFormatTrace internal constructor(private val factory: JsonFactory, private val path: Path) : Trace { - override val tables: List<String> = listOf(TABLE_TASKS) - - override fun containsTable(name: String): Boolean = TABLE_TASKS == name - - override fun getTable(name: String): Table? { - return when (name) { - TABLE_TASKS -> WfFormatTaskTable(factory, path) - else -> null - } - } - - override fun toString(): String = "WfFormatTrace[$path]" -} diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt index ff8d054c..c75e3cbb 100644 --- a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt +++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt @@ -23,10 +23,10 @@ package org.opendc.trace.wfformat import com.fasterxml.jackson.core.JsonFactory +import org.opendc.trace.* +import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat -import java.net.URL -import java.nio.file.Paths -import kotlin.io.path.exists +import java.nio.file.Path /** * A [TraceFormat] implementation for the WfCommons workload trace format. @@ -39,9 +39,37 @@ public class WfFormatTraceFormat : TraceFormat { override val name: String = "wfformat" - override fun open(url: URL): WfFormatTrace { - val path = Paths.get(url.toURI()) - require(path.exists()) { "URL $url does not exist" } - return WfFormatTrace(factory, path) + override fun create(path: Path) { + throw UnsupportedOperationException("Writing not supported for this format") + } + + override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS) + + override fun getDetails(path: Path, table: String): TableDetails { + return when (table) { + TABLE_TASKS -> TableDetails( + listOf( + TASK_ID, + TASK_WORKFLOW_ID, + TASK_RUNTIME, + TASK_REQ_NCPUS, + TASK_PARENTS, + TASK_CHILDREN + ), + emptyList() + ) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newReader(path: Path, table: String): TableReader { + return when (table) { + TABLE_TASKS -> WfFormatTaskTableReader(factory.createParser(path.toFile())) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newWriter(path: Path, table: String): TableWriter { + throw UnsupportedOperationException("Writing not supported for this format") } } diff --git a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt index 0bfc8840..217b175d 100644 --- a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt @@ -22,59 +22,38 @@ package org.opendc.trace.wfformat +import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow import org.junit.jupiter.api.assertThrows import org.opendc.trace.* -import java.io.File -import java.net.URL +import java.nio.file.Paths /** * Test suite for the [WfFormatTraceFormat] class. */ class WfFormatTraceFormatTest { - @Test - fun testTraceExists() { - val input = File("src/test/resources/trace.json").toURI().toURL() - val format = WfFormatTraceFormat() - assertDoesNotThrow { format.open(input) } - } - - @Test - fun testTraceDoesNotExists() { - val input = File("src/test/resources/trace.json").toURI().toURL() - val format = WfFormatTraceFormat() - assertThrows<IllegalArgumentException> { format.open(URL(input.toString() + "help")) } - } + private val format = WfFormatTraceFormat() @Test fun testTables() { - val input = File("src/test/resources/trace.json").toURI().toURL() - val format = WfFormatTraceFormat() - val trace = format.open(input) + val path = Paths.get("src/test/resources/trace.json") - assertEquals(listOf(TABLE_TASKS), trace.tables) + assertEquals(listOf(TABLE_TASKS), format.getTables(path)) } @Test fun testTableExists() { - val input = File("src/test/resources/trace.json").toURI().toURL() - val format = WfFormatTraceFormat() - val table = format.open(input).getTable(TABLE_TASKS) - - assertNotNull(table) - assertDoesNotThrow { table!!.newReader() } + val path = Paths.get("src/test/resources/trace.json") + Assertions.assertDoesNotThrow { format.getDetails(path, TABLE_TASKS) } } @Test fun testTableDoesNotExist() { - val input = File("src/test/resources/trace.json").toURI().toURL() - val format = WfFormatTraceFormat() - val trace = format.open(input) + val path = Paths.get("src/test/resources/trace.json") - assertFalse(trace.containsTable("test")) - assertNull(trace.getTable("test")) + assertThrows<IllegalArgumentException> { format.getDetails(path, "test") } } /** @@ -82,9 +61,8 @@ class WfFormatTraceFormatTest { */ @Test fun testTableReader() { - val input = File("src/test/resources/trace.json").toURI().toURL() - val trace = WfFormatTraceFormat().open(input) - val reader = trace.getTable(TABLE_TASKS)!!.newReader() + val path = Paths.get("src/test/resources/trace.json") + val reader = format.newReader(path, TABLE_TASKS) assertAll( { assertTrue(reader.nextRow()) }, @@ -110,9 +88,8 @@ class WfFormatTraceFormatTest { */ @Test fun testTableReaderFull() { - val input = File("src/test/resources/trace.json").toURI().toURL() - val trace = WfFormatTraceFormat().open(input) - val reader = trace.getTable(TABLE_TASKS)!!.newReader() + val path = Paths.get("src/test/resources/trace.json") + val reader = format.newReader(path, TABLE_TASKS) assertDoesNotThrow { while (reader.nextRow()) { @@ -121,13 +98,4 @@ class WfFormatTraceFormatTest { reader.close() } } - - @Test - fun testTableReaderPartition() { - val input = File("src/test/resources/trace.json").toURI().toURL() - val format = WfFormatTraceFormat() - val table = format.open(input).getTable(TABLE_TASKS)!! - - assertThrows<IllegalArgumentException> { table.newReader("test") } - } } diff --git a/opendc-trace/opendc-trace-wtf/build.gradle.kts b/opendc-trace/opendc-trace-wtf/build.gradle.kts index 5051c7b0..e4f0ab3a 100644 --- a/opendc-trace/opendc-trace-wtf/build.gradle.kts +++ b/opendc-trace/opendc-trace-wtf/build.gradle.kts @@ -34,4 +34,6 @@ dependencies { api(projects.opendcTrace.opendcTraceApi) implementation(projects.opendcTrace.opendcTraceParquet) + + testRuntimeOnly(libs.slf4j.simple) } diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt deleted file mode 100644 index 74202718..00000000 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.wtf - -import org.apache.avro.generic.GenericRecord -import org.opendc.trace.* -import org.opendc.trace.util.parquet.LocalParquetReader -import java.nio.file.Path - -/** - * A [Table] containing the tasks in a GWF trace. - */ -internal class WtfTaskTable(private val path: Path) : Table { - override val name: String = TABLE_TASKS - - override val isSynthetic: Boolean = false - - override val columns: List<TableColumn<*>> = listOf( - TASK_ID, - TASK_WORKFLOW_ID, - TASK_SUBMIT_TIME, - TASK_WAIT_TIME, - TASK_RUNTIME, - TASK_REQ_NCPUS, - TASK_PARENTS, - TASK_CHILDREN, - TASK_GROUP_ID, - TASK_USER_ID - ) - - override fun newReader(): TableReader { - val reader = LocalParquetReader<GenericRecord>(path.resolve("tasks/schema-1.0")) - return WtfTaskTableReader(reader) - } - - override fun newReader(partition: String): TableReader { - throw IllegalArgumentException("Invalid partition $partition") - } - - override fun toString(): String = "WtfTaskTable" -} diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt index 5e2463f8..45ec25dd 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt @@ -22,6 +22,7 @@ package org.opendc.trace.wtf +import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord import org.opendc.trace.* import org.opendc.trace.util.parquet.LocalParquetReader @@ -37,73 +38,126 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Generic */ private var record: GenericRecord? = null + /** + * A flag to indicate that the columns have been initialized. + */ + private var hasInitializedColumns = false + override fun nextRow(): Boolean { - record = reader.read() + val record = reader.read() + this.record = record + + if (!hasInitializedColumns && record != null) { + initColumns(record.schema) + hasInitializedColumns = true + } + return record != null } - override fun hasColumn(column: TableColumn<*>): Boolean { - return when (column) { - TASK_ID -> true - TASK_WORKFLOW_ID -> true - TASK_SUBMIT_TIME -> true - TASK_WAIT_TIME -> true - TASK_RUNTIME -> true - TASK_REQ_NCPUS -> true - TASK_PARENTS -> true - TASK_CHILDREN -> true - TASK_GROUP_ID -> true - TASK_USER_ID -> true - else -> false - } + override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + + override fun isNull(index: Int): Boolean { + check(index in 0..columns.size) { "Invalid column index" } + return get(index) == null } - override fun <T> get(column: TableColumn<T>): T { + override fun get(index: Int): Any? { val record = checkNotNull(record) { "Reader in invalid state" } - @Suppress("UNCHECKED_CAST") - val res: Any = when (column) { - TASK_ID -> (record["id"] as Long).toString() - TASK_WORKFLOW_ID -> (record["workflow_id"] as Long).toString() - TASK_SUBMIT_TIME -> Instant.ofEpochMilli(record["ts_submit"] as Long) - TASK_WAIT_TIME -> Duration.ofMillis(record["wait_time"] as Long) - TASK_RUNTIME -> Duration.ofMillis(record["runtime"] as Long) - TASK_REQ_NCPUS -> (record["resource_amount_requested"] as Double).toInt() - TASK_PARENTS -> (record["parents"] as ArrayList<GenericRecord>).map { it["item"].toString() }.toSet() - TASK_CHILDREN -> (record["children"] as ArrayList<GenericRecord>).map { it["item"].toString() }.toSet() - TASK_GROUP_ID -> record["group_id"] - TASK_USER_ID -> record["user_id"] + return when (index) { + COL_ID -> (record[AVRO_COL_ID] as Long).toString() + COL_WORKFLOW_ID -> (record[AVRO_COL_WORKFLOW_ID] as Long).toString() + COL_SUBMIT_TIME -> Instant.ofEpochMilli(record[AVRO_COL_SUBMIT_TIME] as Long) + COL_WAIT_TIME -> Duration.ofMillis(record[AVRO_COL_WAIT_TIME] as Long) + COL_RUNTIME -> Duration.ofMillis(record[AVRO_COL_RUNTIME] as Long) + COL_REQ_NCPUS, COL_GROUP_ID, COL_USER_ID -> getInt(index) + COL_PARENTS -> (record[AVRO_COL_PARENTS] as ArrayList<GenericRecord>).map { it["item"].toString() }.toSet() + COL_CHILDREN -> (record[AVRO_COL_CHILDREN] as ArrayList<GenericRecord>).map { it["item"].toString() }.toSet() else -> throw IllegalArgumentException("Invalid column") } - - @Suppress("UNCHECKED_CAST") - return res as T } - override fun getBoolean(column: TableColumn<Boolean>): Boolean { + override fun getBoolean(index: Int): Boolean { throw IllegalArgumentException("Invalid column") } - override fun getInt(column: TableColumn<Int>): Int { + override fun getInt(index: Int): Int { val record = checkNotNull(record) { "Reader in invalid state" } - return when (column) { - TASK_REQ_NCPUS -> (record["resource_amount_requested"] as Double).toInt() - TASK_GROUP_ID -> record["group_id"] as Int - TASK_USER_ID -> record["user_id"] as Int + return when (index) { + COL_REQ_NCPUS -> (record[AVRO_COL_REQ_NCPUS] as Double).toInt() + COL_GROUP_ID -> record[AVRO_COL_GROUP_ID] as Int + COL_USER_ID -> record[AVRO_COL_USER_ID] as Int else -> throw IllegalArgumentException("Invalid column") } } - override fun getLong(column: TableColumn<Long>): Long { + override fun getLong(index: Int): Long { throw IllegalArgumentException("Invalid column") } - override fun getDouble(column: TableColumn<Double>): Double { + override fun getDouble(index: Int): Double { throw IllegalArgumentException("Invalid column") } override fun close() { reader.close() } + + /** + * Initialize the columns for the reader based on [schema]. + */ + private fun initColumns(schema: Schema) { + try { + AVRO_COL_ID = schema.getField("id").pos() + AVRO_COL_WORKFLOW_ID = schema.getField("workflow_id").pos() + AVRO_COL_SUBMIT_TIME = schema.getField("ts_submit").pos() + AVRO_COL_WAIT_TIME = schema.getField("wait_time").pos() + AVRO_COL_RUNTIME = schema.getField("runtime").pos() + AVRO_COL_REQ_NCPUS = schema.getField("resource_amount_requested").pos() + AVRO_COL_PARENTS = schema.getField("parents").pos() + AVRO_COL_CHILDREN = schema.getField("children").pos() + AVRO_COL_GROUP_ID = schema.getField("group_id").pos() + AVRO_COL_USER_ID = schema.getField("user_id").pos() + } catch (e: NullPointerException) { + // This happens when the field we are trying to access does not exist + throw IllegalArgumentException("Invalid schema", e) + } + } + + private var AVRO_COL_ID = -1 + private var AVRO_COL_WORKFLOW_ID = -1 + private var AVRO_COL_SUBMIT_TIME = -1 + private var AVRO_COL_WAIT_TIME = -1 + private var AVRO_COL_RUNTIME = -1 + private var AVRO_COL_REQ_NCPUS = -1 + private var AVRO_COL_PARENTS = -1 + private var AVRO_COL_CHILDREN = -1 + private var AVRO_COL_GROUP_ID = -1 + private var AVRO_COL_USER_ID = -1 + + private val COL_ID = 0 + private val COL_WORKFLOW_ID = 1 + private val COL_SUBMIT_TIME = 2 + private val COL_WAIT_TIME = 3 + private val COL_RUNTIME = 4 + private val COL_REQ_NCPUS = 5 + private val COL_PARENTS = 6 + private val COL_CHILDREN = 7 + private val COL_GROUP_ID = 8 + private val COL_USER_ID = 9 + + private val columns = mapOf( + TASK_ID to COL_ID, + TASK_WORKFLOW_ID to COL_WORKFLOW_ID, + TASK_SUBMIT_TIME to COL_SUBMIT_TIME, + TASK_WAIT_TIME to COL_WAIT_TIME, + TASK_RUNTIME to COL_RUNTIME, + TASK_REQ_NCPUS to COL_REQ_NCPUS, + TASK_PARENTS to COL_PARENTS, + TASK_CHILDREN to COL_CHILDREN, + TASK_GROUP_ID to COL_GROUP_ID, + TASK_USER_ID to COL_USER_ID, + ) } diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt index 781cb335..ef88d295 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt @@ -22,10 +22,12 @@ package org.opendc.trace.wtf +import org.apache.avro.generic.GenericRecord +import org.opendc.trace.* +import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat -import java.net.URL -import java.nio.file.Paths -import kotlin.io.path.exists +import org.opendc.trace.util.parquet.LocalParquetReader +import java.nio.file.Path /** * A [TraceFormat] implementation for the Workflow Trace Format (WTF). @@ -33,9 +35,44 @@ import kotlin.io.path.exists public class WtfTraceFormat : TraceFormat { override val name: String = "wtf" - override fun open(url: URL): WtfTrace { - val path = Paths.get(url.toURI()) - require(path.exists()) { "URL $url does not exist" } - return WtfTrace(path) + override fun create(path: Path) { + throw UnsupportedOperationException("Writing not supported for this format") + } + + override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS) + + override fun getDetails(path: Path, table: String): TableDetails { + return when (table) { + TABLE_TASKS -> TableDetails( + listOf( + TASK_ID, + TASK_WORKFLOW_ID, + TASK_SUBMIT_TIME, + TASK_WAIT_TIME, + TASK_RUNTIME, + TASK_REQ_NCPUS, + TASK_PARENTS, + TASK_CHILDREN, + TASK_GROUP_ID, + TASK_USER_ID + ), + listOf(TASK_SUBMIT_TIME) + ) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newReader(path: Path, table: String): TableReader { + return when (table) { + TABLE_TASKS -> { + val reader = LocalParquetReader<GenericRecord>(path.resolve("tasks/schema-1.0")) + WtfTaskTableReader(reader) + } + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newWriter(path: Path, table: String): TableWriter { + throw UnsupportedOperationException("Writing not supported for this format") } } diff --git a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt index b155f265..09c3703a 100644 --- a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt @@ -26,8 +26,7 @@ import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows import org.opendc.trace.* -import java.io.File -import java.net.URL +import java.nio.file.Paths import java.time.Duration import java.time.Instant @@ -35,51 +34,25 @@ import java.time.Instant * Test suite for the [WtfTraceFormat] class. */ class WtfTraceFormatTest { - @Test - fun testTraceExists() { - val input = File("src/test/resources/wtf-trace").toURI().toURL() - val format = WtfTraceFormat() - org.junit.jupiter.api.assertDoesNotThrow { - format.open(input) - } - } - - @Test - fun testTraceDoesNotExists() { - val input = File("src/test/resources/wtf-trace").toURI().toURL() - val format = WtfTraceFormat() - assertThrows<IllegalArgumentException> { - format.open(URL(input.toString() + "help")) - } - } + private val format = WtfTraceFormat() @Test fun testTables() { - val input = File("src/test/resources/wtf-trace").toURI().toURL() - val format = WtfTraceFormat() - val trace = format.open(input) - - assertEquals(listOf(TABLE_TASKS), trace.tables) + val path = Paths.get("src/test/resources/wtf-trace") + assertEquals(listOf(TABLE_TASKS), format.getTables(path)) } @Test fun testTableExists() { - val input = File("src/test/resources/wtf-trace").toURI().toURL() - val format = WtfTraceFormat() - val table = format.open(input).getTable(TABLE_TASKS) - - assertNotNull(table) - org.junit.jupiter.api.assertDoesNotThrow { table!!.newReader() } + val path = Paths.get("src/test/resources/wtf-trace") + assertDoesNotThrow { format.getDetails(path, TABLE_TASKS) } } @Test fun testTableDoesNotExist() { - val input = File("src/test/resources/wtf-trace").toURI().toURL() - val format = WtfTraceFormat() - val trace = format.open(input) + val path = Paths.get("src/test/resources/wtf-trace") - assertFalse(trace.containsTable("test")) - assertNull(trace.getTable("test")) + assertThrows<IllegalArgumentException> { format.getDetails(path, "test") } } /** @@ -87,9 +60,8 @@ class WtfTraceFormatTest { */ @Test fun testTableReader() { - val input = File("src/test/resources/wtf-trace") - val trace = WtfTraceFormat().open(input.toURI().toURL()) - val reader = trace.getTable(TABLE_TASKS)!!.newReader() + val path = Paths.get("src/test/resources/wtf-trace") + val reader = format.newReader(path, TABLE_TASKS) assertAll( { assertTrue(reader.nextRow()) }, @@ -111,13 +83,4 @@ class WtfTraceFormatTest { reader.close() } - - @Test - fun testTableReaderPartition() { - val input = File("src/test/resources/wtf-trace").toURI().toURL() - val format = WtfTraceFormat() - val table = format.open(input).getTable(TABLE_TASKS)!! - - assertThrows<IllegalArgumentException> { table.newReader("test") } - } } |
