summaryrefslogtreecommitdiff
path: root/opendc-trace
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-trace')
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt22
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt54
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt14
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumns.kt33
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt107
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt151
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt28
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt41
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt (renamed from opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt)38
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TraceImpl.kt (renamed from opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTrace.kt)35
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TableDetails.kt (renamed from opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTrace.kt)29
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt54
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/CompositeTableReader.kt110
-rw-r--r--opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt127
-rw-r--r--opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt48
-rw-r--r--opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTable.kt54
-rw-r--r--opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt62
-rw-r--r--opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTrace.kt46
-rw-r--r--opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt77
-rw-r--r--opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt52
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt138
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt94
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTrace.kt45
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt77
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt137
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt104
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTable.kt61
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt33
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt46
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt84
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt44
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt55
-rw-r--r--opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt56
-rw-r--r--opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt69
-rw-r--r--opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTrace.kt46
-rw-r--r--opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt46
-rw-r--r--opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt61
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt53
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt82
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt123
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt53
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt82
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt106
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt97
-rw-r--r--opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt49
-rw-r--r--opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt60
-rw-r--r--opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt72
-rw-r--r--opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt47
-rw-r--r--opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt53
-rw-r--r--opendc-trace/opendc-trace-tools/build.gradle.kts11
-rw-r--r--opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt105
-rw-r--r--opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt56
-rw-r--r--opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt62
-rw-r--r--opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTrace.kt47
-rw-r--r--opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt42
-rw-r--r--opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt58
-rw-r--r--opendc-trace/opendc-trace-wtf/build.gradle.kts2
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt61
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt132
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt51
-rw-r--r--opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt57
61 files changed, 1838 insertions, 2101 deletions
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt
index 219002e0..f1977945 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt
@@ -29,28 +29,40 @@ import java.time.Instant
* Identifier of the resource.
*/
@JvmField
-public val RESOURCE_ID: TableColumn<String> = stringColumn("resource:id")
+public val RESOURCE_ID: TableColumn<String> = column("resource:id")
+
+/**
+ * The cluster to which the resource belongs.
+ */
+@JvmField
+public val RESOURCE_CLUSTER_ID: TableColumn<String> = column("resource:cluster_id")
/**
* Start time for the resource.
*/
@JvmField
-public val RESOURCE_START_TIME: TableColumn<Instant> = TableColumn("resource:start_time", Instant::class.java)
+public val RESOURCE_START_TIME: TableColumn<Instant> = column("resource:start_time")
/**
* End time for the resource.
*/
@JvmField
-public val RESOURCE_STOP_TIME: TableColumn<Instant> = TableColumn("resource:stop_time", Instant::class.java)
+public val RESOURCE_STOP_TIME: TableColumn<Instant> = column("resource:stop_time")
/**
* Number of CPUs for the resource.
*/
@JvmField
-public val RESOURCE_CPU_COUNT: TableColumn<Int> = intColumn("resource:cpu_count")
+public val RESOURCE_CPU_COUNT: TableColumn<Int> = column("resource:cpu_count")
+
+/**
+ * Total CPU capacity of the resource in MHz.
+ */
+@JvmField
+public val RESOURCE_CPU_CAPACITY: TableColumn<Double> = column("resource:cpu_capacity")
/**
* Memory capacity for the resource in KB.
*/
@JvmField
-public val RESOURCE_MEM_CAPACITY: TableColumn<Double> = doubleColumn("resource:mem_capacity")
+public val RESOURCE_MEM_CAPACITY: TableColumn<Double> = column("resource:mem_capacity")
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt
index b683923b..44762da5 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt
@@ -27,103 +27,73 @@ import java.time.Duration
import java.time.Instant
/**
- * Identifier of the resource.
- */
-@JvmField
-public val RESOURCE_STATE_ID: TableColumn<String> = stringColumn("resource_state:id")
-
-/**
- * The cluster to which the resource belongs.
- */
-@JvmField
-public val RESOURCE_STATE_CLUSTER_ID: TableColumn<String> = stringColumn("resource_state:cluster_id")
-
-/**
* Timestamp for the state.
*/
@JvmField
-public val RESOURCE_STATE_TIMESTAMP: TableColumn<Instant> = TableColumn("resource_state:timestamp", Instant::class.java)
+public val RESOURCE_STATE_TIMESTAMP: TableColumn<Instant> = column("resource_state:timestamp")
/**
* Duration for the state.
*/
@JvmField
-public val RESOURCE_STATE_DURATION: TableColumn<Duration> = TableColumn("resource_state:duration", Duration::class.java)
+public val RESOURCE_STATE_DURATION: TableColumn<Duration> = column("resource_state:duration")
/**
* A flag to indicate that the resource is powered on.
*/
@JvmField
-public val RESOURCE_STATE_POWERED_ON: TableColumn<Boolean> = booleanColumn("resource_state:powered_on")
-
-/**
- * Number of CPUs for the resource.
- */
-@JvmField
-public val RESOURCE_STATE_CPU_COUNT: TableColumn<Int> = intColumn("resource_state:cpu_count")
-
-/**
- * Total CPU capacity of the resource in MHz.
- */
-@JvmField
-public val RESOURCE_STATE_CPU_CAPACITY: TableColumn<Double> = doubleColumn("resource_state:cpu_capacity")
+public val RESOURCE_STATE_POWERED_ON: TableColumn<Boolean> = column("resource_state:powered_on")
/**
* Total CPU usage of the resource in MHz.
*/
@JvmField
-public val RESOURCE_STATE_CPU_USAGE: TableColumn<Double> = doubleColumn("resource_state:cpu_usage")
+public val RESOURCE_STATE_CPU_USAGE: TableColumn<Double> = column("resource_state:cpu_usage")
/**
* Total CPU usage of the resource in percentage.
*/
@JvmField
-public val RESOURCE_STATE_CPU_USAGE_PCT: TableColumn<Double> = doubleColumn("resource_state:cpu_usage_pct")
+public val RESOURCE_STATE_CPU_USAGE_PCT: TableColumn<Double> = column("resource_state:cpu_usage_pct")
/**
* Total CPU demand of the resource in MHz.
*/
@JvmField
-public val RESOURCE_STATE_CPU_DEMAND: TableColumn<Double> = doubleColumn("resource_state:cpu_demand")
+public val RESOURCE_STATE_CPU_DEMAND: TableColumn<Double> = column("resource_state:cpu_demand")
/**
* CPU ready percentage.
*/
@JvmField
-public val RESOURCE_STATE_CPU_READY_PCT: TableColumn<Double> = doubleColumn("resource_state:cpu_ready_pct")
-
-/**
- * Memory capacity of the resource in KB.
- */
-@JvmField
-public val RESOURCE_STATE_MEM_CAPACITY: TableColumn<Double> = doubleColumn("resource_state:mem_capacity")
+public val RESOURCE_STATE_CPU_READY_PCT: TableColumn<Double> = column("resource_state:cpu_ready_pct")
/**
* Memory usage of the resource in KB.
*/
@JvmField
-public val RESOURCE_STATE_MEM_USAGE: TableColumn<Double> = doubleColumn("resource_state:mem_usage")
+public val RESOURCE_STATE_MEM_USAGE: TableColumn<Double> = column("resource_state:mem_usage")
/**
* Disk read throughput of the resource in KB/s.
*/
@JvmField
-public val RESOURCE_STATE_DISK_READ: TableColumn<Double> = doubleColumn("resource_state:disk_read")
+public val RESOURCE_STATE_DISK_READ: TableColumn<Double> = column("resource_state:disk_read")
/**
* Disk write throughput of the resource in KB/s.
*/
@JvmField
-public val RESOURCE_STATE_DISK_WRITE: TableColumn<Double> = doubleColumn("resource_state:disk_write")
+public val RESOURCE_STATE_DISK_WRITE: TableColumn<Double> = column("resource_state:disk_write")
/**
* Network receive throughput of the resource in KB/s.
*/
@JvmField
-public val RESOURCE_STATE_NET_RX: TableColumn<Double> = doubleColumn("resource_state:net_rx")
+public val RESOURCE_STATE_NET_RX: TableColumn<Double> = column("resource_state:net_rx")
/**
* Network transmit throughput of the resource in KB/s.
*/
@JvmField
-public val RESOURCE_STATE_NET_TX: TableColumn<Double> = doubleColumn("resource_state:net_tx")
+public val RESOURCE_STATE_NET_TX: TableColumn<Double> = column("resource_state:net_tx")
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt
index 6aca2051..b0181cbc 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt
@@ -32,14 +32,14 @@ public interface Table {
public val name: String
/**
- * A flag to indicate that the table is synthetic (derived from another table).
+ * The list of columns supported in this table.
*/
- public val isSynthetic: Boolean
+ public val columns: List<TableColumn<*>>
/**
- * The list of columns supported in this table.
+ * The columns by which the table is partitioned.
*/
- public val columns: List<TableColumn<*>>
+ public val partitionKeys: List<TableColumn<*>>
/**
* Open a [TableReader] for this table.
@@ -47,7 +47,9 @@ public interface Table {
public fun newReader(): TableReader
/**
- * Open a [TableReader] for [partition] of the table.
+ * Open a [TableWriter] for this table.
+ *
+ * @throws UnsupportedOperationException if writing is not supported by the table.
*/
- public fun newReader(partition: String): TableReader
+ public fun newWriter(): TableWriter
}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumns.kt
index 64920498..31a58360 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumns.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumns.kt
@@ -24,36 +24,11 @@
package org.opendc.trace
/**
- * Construct a [TableColumn] with [Any] type.
+ * Construct a [TableColumn] with the specified [name] and type [T].
*/
-public fun objectColumn(name: String): TableColumn<Any> = TableColumn(name, Any::class.java)
+public inline fun <reified T> column(name: String): TableColumn<T> = column(name, T::class.java)
/**
- * Construct a [TableColumn] with a [String] type.
+ * Construct a [TableColumn] with the specified [name] and [type].
*/
-public fun stringColumn(name: String): TableColumn<String> = TableColumn(name, String::class.java)
-
-/**
- * Construct a [TableColumn] with a [Number] type.
- */
-public fun numberColumn(name: String): TableColumn<Number> = TableColumn(name, Number::class.java)
-
-/**
- * Construct a [TableColumn] with an [Int] type.
- */
-public fun intColumn(name: String): TableColumn<Int> = TableColumn(name, Int::class.java)
-
-/**
- * Construct a [TableColumn] with a [Long] type.
- */
-public fun longColumn(name: String): TableColumn<Long> = TableColumn(name, Long::class.java)
-
-/**
- * Construct a [TableColumn] with a [Double] type.
- */
-public fun doubleColumn(name: String): TableColumn<Double> = TableColumn(name, Double::class.java)
-
-/**
- * Construct a [TableColumn] with a [Boolean] type.
- */
-public fun booleanColumn(name: String): TableColumn<Boolean> = TableColumn(name, Boolean::class.java)
+public fun <T> column(name: String, type: Class<T>): TableColumn<T> = TableColumn(name, type)
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt
index b5e7669f..8a796e6c 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt
@@ -34,34 +34,129 @@ public interface TableReader : AutoCloseable {
public fun nextRow(): Boolean
/**
+ * Resolve the index of the specified [column] for this reader.
+ *
+ * @param column The column to lookup.
+ * @return The zero-based index of the column or a negative value if the column is not present in this table.
+ */
+ public fun resolve(column: TableColumn<*>): Int
+
+ /**
* Determine whether the [TableReader] supports the specified [column].
*/
- public fun hasColumn(column: TableColumn<*>): Boolean
+ public fun hasColumn(column: TableColumn<*>): Boolean = resolve(column) >= 0
+
+ /**
+ * Determine whether the specified [column] has a `null` value for the current row.
+ *
+ * @param index The zero-based index of the column to check for a null value.
+ * @throws IllegalArgumentException if the column index is not valid for this reader.
+ * @return `true` if the column value for the current value has a `null` value, `false` otherwise.
+ */
+ public fun isNull(index: Int): Boolean
+
+ /**
+ * Obtain the object value of the column with the specified [index].
+ *
+ * @param index The zero-based index of the column to obtain the value for.
+ * @throws IllegalArgumentException if the column index is not valid for this reader.
+ * @return The object value of the column.
+ */
+ public fun get(index: Int): Any?
+
+ /**
+ * Obtain the boolean value of the column with the specified [index].
+ *
+ * @param index The zero-based index of the column to obtain the value for.
+ * @throws IllegalArgumentException if the column index is not valid for this reader.
+ * @return The boolean value of the column or `false` if the column is `null`.
+ */
+ public fun getBoolean(index: Int): Boolean
+
+ /**
+ * Obtain the integer value of the column with the specified [index].
+ *
+ * @param index The zero-based index of the column to obtain the value for.
+ * @throws IllegalArgumentException if the column index is not valid for this reader.
+ * @return The integer value of the column or `0` if the column is `null`.
+ */
+ public fun getInt(index: Int): Int
+
+ /**
+ * Obtain the double value of the column with the specified [index].
+ *
+ * @param index The zero-based index of the column to obtain the value for.
+ * @throws IllegalArgumentException if the column index is not valid for this reader.
+ * @return The long value of the column or `0` if the column is `null`.
+ */
+ public fun getLong(index: Int): Long
+
+ /**
+ * Obtain the double value of the column with the specified [index].
+ *
+ * @param index The zero-based index of the column to obtain the value for.
+ * @throws IllegalArgumentException if the column index is not valid for this reader.
+ * @return The double value of the column or [Double.NaN] if the column is `null`.
+ */
+ public fun getDouble(index: Int): Double
+
+ /**
+ * Determine whether the specified [column] has a `null` value for the current row.
+ *
+ * @param column The column to lookup.
+ * @throws IllegalArgumentException if the column is not valid for this table.
+ * @return `true` if the column value for the current value has a `null` value, `false` otherwise.
+ */
+ public fun isNull(column: TableColumn<*>): Boolean = isNull(resolve(column))
/**
* Obtain the value of the current column with type [T].
+ *
+ * @param column The column to obtain the value for.
+ * @throws IllegalArgumentException if the column is not valid for this reader.
+ * @return The object value of the column.
*/
- public fun <T> get(column: TableColumn<T>): T
+ public fun <T> get(column: TableColumn<T>): T {
+ // This cast should always succeed since the resolve the index of the typed column
+ @Suppress("UNCHECKED_CAST")
+ return get(resolve(column)) as T
+ }
/**
* Read the specified [column] as boolean.
+ *
+ * @param column The column to obtain the value for.
+ * @throws IllegalArgumentException if the column is not valid for this reader.
+ * @return The boolean value of the column or `false` if the column is `null`.
*/
- public fun getBoolean(column: TableColumn<Boolean>): Boolean
+ public fun getBoolean(column: TableColumn<Boolean>): Boolean = getBoolean(resolve(column))
/**
* Read the specified [column] as integer.
+ *
+ * @param column The column to obtain the value for.
+ * @throws IllegalArgumentException if the column is not valid for this reader.
+ * @return The integer value of the column or `0` if the column is `null`.
*/
- public fun getInt(column: TableColumn<Int>): Int
+ public fun getInt(column: TableColumn<Int>): Int = getInt(resolve(column))
/**
* Read the specified [column] as long.
+ *
+ * @param column The column to obtain the value for.
+ * @throws IllegalArgumentException if the column is not valid for this reader.
+ * @return The long value of the column or `0` if the column is `null`.
*/
- public fun getLong(column: TableColumn<Long>): Long
+ public fun getLong(column: TableColumn<Long>): Long = getLong(resolve(column))
/**
* Read the specified [column] as double.
+ *
+ * @param column The column to obtain the value for.
+ * @throws IllegalArgumentException if the column is not valid for this reader.
+ * @return The double value of the column or [Double.NaN] if the column is `null`.
*/
- public fun getDouble(column: TableColumn<Double>): Double
+ public fun getDouble(column: TableColumn<Double>): Double = getDouble(resolve(column))
/**
* Closes the reader so that no further iteration or data access can be made.
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt
new file mode 100644
index 00000000..423ce86a
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt
@@ -0,0 +1,151 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace
+
+/**
+ * Base class for writing workload traces.
+ */
+public interface TableWriter : AutoCloseable {
+ /**
+ * Start a new row in the table.
+ */
+ public fun startRow()
+
+ /**
+ * Flush the current row to the table.
+ */
+ public fun endRow()
+
+ /**
+ * Resolve the index of the specified [column] for this writer.
+ *
+ * @param column The column to lookup.
+ * @return The zero-based index of the column or a negative value if the column is not present in this table.
+ */
+ public fun resolve(column: TableColumn<*>): Int
+
+ /**
+ * Determine whether the [TableReader] supports the specified [column].
+ */
+ public fun hasColumn(column: TableColumn<*>): Boolean = resolve(column) >= 0
+
+ /**
+ * Set [column] to [value].
+ *
+ * @param index The zero-based index of the column to set the value for.
+ * @param value The value to set the column to.
+ * @throws IllegalArgumentException if the column is not valid for this method.
+ */
+ public fun set(index: Int, value: Any)
+
+ /**
+ * Set [column] to boolean [value].
+ *
+ * @param index The zero-based index of the column to set the value for.
+ * @param value The boolean value to set the column to.
+ * @throws IllegalArgumentException if the column is not valid for this method.
+ */
+ public fun setBoolean(index: Int, value: Boolean)
+
+ /**
+ * Set [column] to integer [value].
+ *
+ * @param index The zero-based index of the column to set the value for.
+ * @param value The integer value to set the column to.
+ * @throws IllegalArgumentException if the column is not valid for this method.
+ */
+ public fun setInt(index: Int, value: Int)
+
+ /**
+ * Set [column] to long [value].
+ *
+ * @param index The zero-based index of the column to set the value for.
+ * @param value The long value to set the column to.
+ * @throws IllegalArgumentException if the column is not valid for this method.
+ */
+ public fun setLong(index: Int, value: Long)
+
+ /**
+ * Set [column] to double [value].
+ *
+ * @param index The zero-based index of the column to set the value for.
+ * @param value The double value to set the column to.
+ * @throws IllegalArgumentException if the column is not valid for this method.
+ */
+ public fun setDouble(index: Int, value: Double)
+
+ /**
+ * Set [column] to [value].
+ *
+ * @param column The column to set the value for.
+ * @param value The value to set the column to.
+ * @throws IllegalArgumentException if the column is not valid for this method.
+ */
+ public fun <T : Any> set(column: TableColumn<T>, value: T): Unit = set(resolve(column), value)
+
+ /**
+ * Set [column] to boolean [value].
+ *
+ * @param column The column to set the value for.
+ * @param value The boolean value to set the column to.
+ * @throws IllegalArgumentException if the column is not valid for this method.
+ */
+ public fun setBoolean(column: TableColumn<Boolean>, value: Boolean): Unit = setBoolean(resolve(column), value)
+
+ /**
+ * Set [column] to integer [value].
+ *
+ * @param column The column to set the value for.
+ * @param value The integer value to set the column to.
+ * @throws IllegalArgumentException if the column is not valid for this method.
+ */
+ public fun setInt(column: TableColumn<Int>, value: Int): Unit = setInt(resolve(column), value)
+
+ /**
+ * Set [column] to long [value].
+ *
+ * @param column The column to set the value for.
+ * @param value The long value to set the column to.
+ * @throws IllegalArgumentException if the column is not valid for this method.
+ */
+ public fun setLong(column: TableColumn<Long>, value: Long): Unit = setLong(resolve(column), value)
+
+ /**
+ * Set [column] to double [value].
+ *
+ * @param column The column to set the value for.
+ * @param value The double value to set the column to.
+ * @throws IllegalArgumentException if the column is not valid for this method.
+ */
+ public fun setDouble(column: TableColumn<Double>, value: Double): Unit = setDouble(resolve(column), value)
+
+ /**
+ * Flush any buffered content to the underlying target.
+ */
+ public fun flush()
+
+ /**
+ * Close the writer so that no more rows can be written.
+ */
+ public override fun close()
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt
index 46920dce..d103bce4 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt
@@ -30,72 +30,70 @@ import java.time.Instant
* A column containing the task identifier.
*/
@JvmField
-public val TASK_ID: TableColumn<String> = stringColumn("task:id")
+public val TASK_ID: TableColumn<String> = column("task:id")
/**
* A column containing the identifier of the workflow.
*/
@JvmField
-public val TASK_WORKFLOW_ID: TableColumn<String> = stringColumn("task:workflow_id")
+public val TASK_WORKFLOW_ID: TableColumn<String> = column("task:workflow_id")
/**
- * A column containing the submit time of the task.
+ * A column containing the submission time of the task.
*/
@JvmField
-public val TASK_SUBMIT_TIME: TableColumn<Instant> = TableColumn("task:submit_time", type = Instant::class.java)
+public val TASK_SUBMIT_TIME: TableColumn<Instant> = column("task:submit_time")
/**
* A column containing the wait time of the task.
*/
@JvmField
-public val TASK_WAIT_TIME: TableColumn<Instant> = TableColumn("task:wait_time", type = Instant::class.java)
+public val TASK_WAIT_TIME: TableColumn<Instant> = column("task:wait_time")
/**
* A column containing the runtime time of the task.
*/
@JvmField
-public val TASK_RUNTIME: TableColumn<Duration> = TableColumn("task:runtime", type = Duration::class.java)
+public val TASK_RUNTIME: TableColumn<Duration> = column("task:runtime")
/**
* A column containing the parents of a task.
*/
-@Suppress("UNCHECKED_CAST")
@JvmField
-public val TASK_PARENTS: TableColumn<Set<String>> = TableColumn("task:parents", type = Set::class.java as Class<Set<String>>)
+public val TASK_PARENTS: TableColumn<Set<String>> = column("task:parents")
/**
* A column containing the children of a task.
*/
-@Suppress("UNCHECKED_CAST")
@JvmField
-public val TASK_CHILDREN: TableColumn<Set<String>> = TableColumn("task:children", type = Set::class.java as Class<Set<String>>)
+public val TASK_CHILDREN: TableColumn<Set<String>> = column("task:children")
/**
* A column containing the requested CPUs of a task.
*/
@JvmField
-public val TASK_REQ_NCPUS: TableColumn<Int> = intColumn("task:req_ncpus")
+public val TASK_REQ_NCPUS: TableColumn<Int> = column("task:req_ncpus")
/**
* A column containing the allocated CPUs of a task.
*/
@JvmField
-public val TASK_ALLOC_NCPUS: TableColumn<Int> = intColumn("task:alloc_ncpus")
+public val TASK_ALLOC_NCPUS: TableColumn<Int> = column("task:alloc_ncpus")
/**
* A column containing the status of a task.
*/
@JvmField
-public val TASK_STATUS: TableColumn<Int> = intColumn("task:status")
+public val TASK_STATUS: TableColumn<Int> = column("task:status")
/**
* A column containing the group id of a task.
*/
@JvmField
-public val TASK_GROUP_ID: TableColumn<Int> = intColumn("task:group_id")
+public val TASK_GROUP_ID: TableColumn<Int> = column("task:group_id")
/**
* A column containing the user id of a task.
*/
@JvmField
-public val TASK_USER_ID: TableColumn<Int> = intColumn("task:user_id")
+public val TASK_USER_ID: TableColumn<Int> = column("task:user_id")
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt
index 0ae45e86..64e8f272 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt
@@ -22,9 +22,9 @@
package org.opendc.trace
+import org.opendc.trace.internal.TraceImpl
import org.opendc.trace.spi.TraceFormat
import java.io.File
-import java.net.URL
import java.nio.file.Path
/**
@@ -48,31 +48,48 @@ public interface Trace {
public companion object {
/**
- * Open a [Trace] at the specified [url] in the given [format].
+ * Open a [Trace] at the specified [path] in the given [format].
*
+ * @param path The path to the trace.
+ * @param format The format of the trace to open.
* @throws IllegalArgumentException if [format] is not supported.
*/
- public fun open(url: URL, format: String): Trace {
- val provider = requireNotNull(TraceFormat.byName(format)) { "Unknown format $format" }
- return provider.open(url)
- }
+ @JvmStatic
+ public fun open(path: File, format: String): Trace = open(path.toPath(), format)
/**
* Open a [Trace] at the specified [path] in the given [format].
*
+ * @param path The [Path] to the trace.
+ * @param format The format of the trace to open.
* @throws IllegalArgumentException if [format] is not supported.
*/
- public fun open(path: File, format: String): Trace {
- return open(path.toURI().toURL(), format)
+ @JvmStatic
+ public fun open(path: Path, format: String): Trace {
+ val provider = requireNotNull(TraceFormat.byName(format)) { "Unknown format $format" }
+ return TraceImpl(provider, path)
}
/**
- * Open a [Trace] at the specified [path] in the given [format].
+ * Create a [Trace] at the specified [path] in the given [format].
*
- * @throws IllegalArgumentException if [format] is not supported.
+ * @param path The [Path] to the trace.
+ * @param format The format of the trace to create.
*/
- public fun open(path: Path, format: String): Trace {
- return open(path.toUri().toURL(), format)
+ @JvmStatic
+ public fun create(path: File, format: String): Trace = create(path.toPath(), format)
+
+ /**
+ * Create a [Trace] at the specified [path] in the given [format].
+ *
+ * @param path The [Path] to the trace.
+ * @param format The format of the trace to create.
+ */
+ @JvmStatic
+ public fun create(path: Path, format: String): Trace {
+ val provider = requireNotNull(TraceFormat.byName(format)) { "Unknown format $format" }
+ provider.create(path)
+ return TraceImpl(provider, path)
}
}
}
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt
index a755a107..24551edb 100644
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt
@@ -20,28 +20,36 @@
* SOFTWARE.
*/
-package org.opendc.trace.wtf
+package org.opendc.trace.internal
-import org.opendc.trace.TABLE_TASKS
import org.opendc.trace.Table
-import org.opendc.trace.Trace
-import java.nio.file.Path
+import org.opendc.trace.TableColumn
+import org.opendc.trace.TableReader
+import org.opendc.trace.TableWriter
+import java.util.*
/**
- * [Trace] implementation for the WTF format.
+ * Internal implementation of [Table].
*/
-public class WtfTrace internal constructor(private val path: Path) : Trace {
- override val tables: List<String> = listOf(TABLE_TASKS)
+internal class TableImpl(val trace: TraceImpl, override val name: String) : Table {
+ /**
+ * The details of this table.
+ */
+ private val details = trace.format.getDetails(trace.path, name)
- override fun containsTable(name: String): Boolean = TABLE_TASKS == name
+ override val columns: List<TableColumn<*>>
+ get() = details.columns
- override fun getTable(name: String): Table? {
- if (!containsTable(name)) {
- return null
- }
+ override val partitionKeys: List<TableColumn<*>>
+ get() = details.partitionKeys
- return WtfTaskTable(path)
- }
+ override fun newReader(): TableReader = trace.format.newReader(trace.path, name)
- override fun toString(): String = "WtfTrace[$path]"
+ override fun newWriter(): TableWriter = trace.format.newWriter(trace.path, name)
+
+ override fun toString(): String = "Table[name=$name]"
+
+ override fun hashCode(): Int = Objects.hash(trace, name)
+
+ override fun equals(other: Any?): Boolean = other is TableImpl && trace == other.trace && name == other.name
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTrace.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TraceImpl.kt
index 3e5029b4..fd9536ab 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTrace.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TraceImpl.kt
@@ -20,30 +20,37 @@
* SOFTWARE.
*/
-package org.opendc.trace.opendc
+package org.opendc.trace.internal
-import org.opendc.trace.TABLE_RESOURCES
-import org.opendc.trace.TABLE_RESOURCE_STATES
import org.opendc.trace.Table
import org.opendc.trace.Trace
+import org.opendc.trace.spi.TraceFormat
import java.nio.file.Path
+import java.util.*
+import java.util.concurrent.ConcurrentHashMap
/**
- * A [Trace] in the OpenDC virtual machine trace format.
+ * Internal implementation of the [Trace] interface.
*/
-public class OdcVmTrace internal constructor(private val path: Path) : Trace {
- override val tables: List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES)
+internal class TraceImpl(val format: TraceFormat, val path: Path) : Trace {
+ /**
+ * A map containing the [TableImpl] instances associated with the trace.
+ */
+ private val tableMap = ConcurrentHashMap<String, TableImpl>()
- override fun containsTable(name: String): Boolean =
- name == TABLE_RESOURCES || name == TABLE_RESOURCE_STATES
+ override val tables: List<String> = format.getTables(path)
- override fun getTable(name: String): Table? {
- return when (name) {
- TABLE_RESOURCES -> OdcVmResourceTable(path)
- TABLE_RESOURCE_STATES -> OdcVmResourceStateTable(path)
- else -> null
+ init {
+ for (table in tables) {
+ tableMap.computeIfAbsent(table) { TableImpl(this, it) }
}
}
- override fun toString(): String = "OdcVmTrace[$path]"
+ override fun containsTable(name: String): Boolean = tableMap.containsKey(name)
+
+ override fun getTable(name: String): Table? = tableMap[name]
+
+ override fun hashCode(): Int = Objects.hash(format, path)
+
+ override fun equals(other: Any?): Boolean = other is TraceImpl && format == other.format && path == other.path
}
diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTrace.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TableDetails.kt
index d4da735e..1a9b9ee1 100644
--- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTrace.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TableDetails.kt
@@ -20,27 +20,18 @@
* SOFTWARE.
*/
-package org.opendc.trace.swf
+package org.opendc.trace.spi
-import org.opendc.trace.TABLE_TASKS
import org.opendc.trace.Table
-import org.opendc.trace.Trace
-import java.nio.file.Path
+import org.opendc.trace.TableColumn
/**
- * [Trace] implementation for the SWF format.
+ * A class used by the [TraceFormat] interface for describing the metadata of a [Table].
+ *
+ * @param columns The available columns in the table.
+ * @param partitionKeys The table columns that act as partition keys for the table.
*/
-public class SwfTrace internal constructor(private val path: Path) : Trace {
- override val tables: List<String> = listOf(TABLE_TASKS)
-
- override fun containsTable(name: String): Boolean = TABLE_TASKS == name
-
- override fun getTable(name: String): Table? {
- if (!containsTable(name)) {
- return null
- }
- return SwfTaskTable(path)
- }
-
- override fun toString(): String = "SwfTrace[$path]"
-}
+public data class TableDetails(
+ val columns: List<TableColumn<*>>,
+ val partitionKeys: List<TableColumn<*>> = emptyList()
+)
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt
index 54029fcf..f2e610db 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt
@@ -22,8 +22,9 @@
package org.opendc.trace.spi
-import org.opendc.trace.Trace
-import java.net.URL
+import org.opendc.trace.TableReader
+import org.opendc.trace.TableWriter
+import java.nio.file.Path
import java.util.*
/**
@@ -36,11 +37,52 @@ public interface TraceFormat {
public val name: String
/**
- * Open a new [Trace] with this provider.
+ * Construct an empty trace at [path].
*
- * @param url A reference to the trace.
+ * @param path The path where to create the empty trace.
+ * @throws IllegalArgumentException If [path] is invalid.
+ * @throws UnsupportedOperationException If the table does not support trace creation.
*/
- public fun open(url: URL): Trace
+ public fun create(path: Path)
+
+ /**
+ * Return the name of the tables available in the trace at the specified [path].
+ *
+ * @param path The path to the trace.
+ * @return The list of tables available in the trace.
+ */
+ public fun getTables(path: Path): List<String>
+
+ /**
+ * Return the details of [table] in the trace at the specified [path].
+ *
+ * @param path The path to the trace.
+ * @param table The name of the table to obtain the details for.
+ * @throws IllegalArgumentException If [table] does not exist.
+ * @return The [TableDetails] for the specified [table].
+ */
+ public fun getDetails(path: Path, table: String): TableDetails
+
+ /**
+ * Open a [TableReader] for the specified [table].
+ *
+ * @param path The path to the trace to open.
+ * @param table The name of the table to open a [TableReader] for.
+ * @throws IllegalArgumentException If [table] does not exist.
+ * @return A [TableReader] instance for the table.
+ */
+ public fun newReader(path: Path, table: String): TableReader
+
+ /**
+ * Open a [TableWriter] for the specified [table].
+ *
+ * @param path The path to the trace to open.
+ * @param table The name of the table to open a [TableWriter] for.
+ * @throws IllegalArgumentException If [table] does not exist.
+ * @throws UnsupportedOperationException If the format does not support writing.
+ * @return A [TableWriter] instance for the table.
+ */
+ public fun newWriter(path: Path, table: String): TableWriter
/**
* A helper object for resolving providers.
@@ -49,6 +91,7 @@ public interface TraceFormat {
/**
* A list of [TraceFormat] that are available on this system.
*/
+ @JvmStatic
public val installedProviders: List<TraceFormat> by lazy {
val loader = ServiceLoader.load(TraceFormat::class.java)
loader.toList()
@@ -57,6 +100,7 @@ public interface TraceFormat {
/**
* Obtain a [TraceFormat] implementation by [name].
*/
+ @JvmStatic
public fun byName(name: String): TraceFormat? = installedProviders.find { it.name == name }
}
}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/CompositeTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/CompositeTableReader.kt
new file mode 100644
index 00000000..dafc0798
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/CompositeTableReader.kt
@@ -0,0 +1,110 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.util
+
+import org.opendc.trace.TableColumn
+import org.opendc.trace.TableReader
+
+/**
+ * A helper class to chain multiple [TableReader]s.
+ */
+public abstract class CompositeTableReader : TableReader {
+ /**
+ * A flag to indicate that the reader has starting, meaning the user called [nextRow] at least once
+ * (and in turn [nextReader]).
+ */
+ private var hasStarted = false
+
+ /**
+ * The active [TableReader] instance.
+ */
+ private var delegate: TableReader? = null
+
+ /**
+ * Obtain the next [TableReader] instance to read from or `null` if there are no more readers to read from.
+ */
+ protected abstract fun nextReader(): TableReader?
+
+ override fun nextRow(): Boolean {
+ if (!hasStarted) {
+ assert(delegate == null) { "Duplicate initialization" }
+ delegate = nextReader()
+ hasStarted = true
+ }
+
+ var delegate = delegate
+
+ while (delegate != null) {
+ if (delegate.nextRow()) {
+ break
+ }
+
+ delegate.close()
+ delegate = nextReader()
+ this.delegate = delegate
+ }
+
+ return delegate != null
+ }
+
+ override fun resolve(column: TableColumn<*>): Int {
+ val delegate = delegate
+ return delegate?.resolve(column) ?: -1
+ }
+
+ override fun isNull(index: Int): Boolean {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.isNull(index)
+ }
+
+ override fun get(index: Int): Any? {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.get(index)
+ }
+
+ override fun getBoolean(index: Int): Boolean {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getBoolean(index)
+ }
+
+ override fun getInt(index: Int): Int {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getInt(index)
+ }
+
+ override fun getLong(index: Int): Long {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getLong(index)
+ }
+
+ override fun getDouble(index: Int): Double {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getDouble(index)
+ }
+
+ override fun close() {
+ delegate?.close()
+ }
+
+ override fun toString(): String = "CompositeTableReader"
+}
diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt
deleted file mode 100644
index 84c9b347..00000000
--- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.azure
-
-import com.fasterxml.jackson.dataformat.csv.CsvFactory
-import org.opendc.trace.*
-import java.nio.file.Files
-import java.nio.file.Path
-import java.util.stream.Collectors
-import kotlin.io.path.extension
-import kotlin.io.path.nameWithoutExtension
-
-/**
- * The resource state [Table] for the Azure v1 VM traces.
- */
-internal class AzureResourceStateTable(private val factory: CsvFactory, path: Path) : Table {
- /**
- * The partitions that belong to the table.
- */
- private val partitions = Files.walk(path.resolve("vm_cpu_readings"), 1)
- .filter { !Files.isDirectory(it) && it.extension == "csv" }
- .collect(Collectors.toMap({ it.nameWithoutExtension }, { it }))
- .toSortedMap()
-
- override val name: String = TABLE_RESOURCE_STATES
-
- override val isSynthetic: Boolean = false
-
- override val columns: List<TableColumn<*>> = listOf(
- RESOURCE_STATE_ID,
- RESOURCE_STATE_TIMESTAMP,
- RESOURCE_STATE_CPU_USAGE_PCT
- )
-
- override fun newReader(): TableReader {
- val it = partitions.iterator()
-
- return object : TableReader {
- var delegate: TableReader? = nextDelegate()
-
- override fun nextRow(): Boolean {
- var delegate = delegate
-
- while (delegate != null) {
- if (delegate.nextRow()) {
- break
- }
-
- delegate.close()
- delegate = nextDelegate()
- this.delegate = delegate
- }
-
- return delegate != null
- }
-
- override fun hasColumn(column: TableColumn<*>): Boolean = delegate?.hasColumn(column) ?: false
-
- override fun <T> get(column: TableColumn<T>): T {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.get(column)
- }
-
- override fun getBoolean(column: TableColumn<Boolean>): Boolean {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.getBoolean(column)
- }
-
- override fun getInt(column: TableColumn<Int>): Int {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.getInt(column)
- }
-
- override fun getLong(column: TableColumn<Long>): Long {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.getLong(column)
- }
-
- override fun getDouble(column: TableColumn<Double>): Double {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.getDouble(column)
- }
-
- override fun close() {
- delegate?.close()
- }
-
- private fun nextDelegate(): TableReader? {
- return if (it.hasNext()) {
- val (_, path) = it.next()
- return AzureResourceStateTableReader(factory.createParser(path.toFile()))
- } else {
- null
- }
- }
-
- override fun toString(): String = "AzureCompositeTableReader"
- }
- }
-
- override fun newReader(partition: String): TableReader {
- val path = requireNotNull(partitions[partition]) { "Invalid partition $partition" }
- return AzureResourceStateTableReader(factory.createParser(path.toFile()))
- }
-
- override fun toString(): String = "AzureResourceStateTable"
-}
diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt
index c17a17ab..da8181fe 100644
--- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt
+++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt
@@ -60,42 +60,37 @@ internal class AzureResourceStateTableReader(private val parser: CsvParser) : Ta
return true
}
- override fun hasColumn(column: TableColumn<*>): Boolean {
- return when (column) {
- RESOURCE_STATE_ID -> true
- RESOURCE_STATE_TIMESTAMP -> true
- RESOURCE_STATE_CPU_USAGE_PCT -> true
- else -> false
- }
+ override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
+
+ override fun isNull(index: Int): Boolean {
+ require(index in 0..columns.size) { "Invalid column index" }
+ return false
}
- override fun <T> get(column: TableColumn<T>): T {
- val res: Any? = when (column) {
- RESOURCE_STATE_ID -> id
- RESOURCE_STATE_TIMESTAMP -> timestamp
- RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct
- else -> throw IllegalArgumentException("Invalid column")
+ override fun get(index: Int): Any? {
+ return when (index) {
+ COL_ID -> id
+ COL_TIMESTAMP -> timestamp
+ COL_CPU_USAGE_PCT -> cpuUsagePct
+ else -> throw IllegalArgumentException("Invalid column index")
}
-
- @Suppress("UNCHECKED_CAST")
- return res as T
}
- override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ override fun getBoolean(index: Int): Boolean {
throw IllegalArgumentException("Invalid column")
}
- override fun getInt(column: TableColumn<Int>): Int {
+ override fun getInt(index: Int): Int {
throw IllegalArgumentException("Invalid column")
}
- override fun getLong(column: TableColumn<Long>): Long {
+ override fun getLong(index: Int): Long {
throw IllegalArgumentException("Invalid column")
}
- override fun getDouble(column: TableColumn<Double>): Double {
- return when (column) {
- RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct
+ override fun getDouble(index: Int): Double {
+ return when (index) {
+ COL_CPU_USAGE_PCT -> cpuUsagePct
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -133,6 +128,15 @@ internal class AzureResourceStateTableReader(private val parser: CsvParser) : Ta
cpuUsagePct = Double.NaN
}
+ private val COL_ID = 0
+ private val COL_TIMESTAMP = 1
+ private val COL_CPU_USAGE_PCT = 2
+ private val columns = mapOf(
+ RESOURCE_ID to COL_ID,
+ RESOURCE_STATE_TIMESTAMP to COL_TIMESTAMP,
+ RESOURCE_STATE_CPU_USAGE_PCT to COL_CPU_USAGE_PCT
+ )
+
companion object {
/**
* The [CsvSchema] that is used to parse the trace.
diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTable.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTable.kt
deleted file mode 100644
index 96ee3158..00000000
--- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTable.kt
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.azure
-
-import com.fasterxml.jackson.dataformat.csv.CsvFactory
-import org.opendc.trace.*
-import java.nio.file.Path
-
-/**
- * The resource [Table] for the Azure v1 VM traces.
- */
-internal class AzureResourceTable(private val factory: CsvFactory, private val path: Path) : Table {
- override val name: String = TABLE_RESOURCES
-
- override val isSynthetic: Boolean = false
-
- override val columns: List<TableColumn<*>> = listOf(
- RESOURCE_ID,
- RESOURCE_START_TIME,
- RESOURCE_STOP_TIME,
- RESOURCE_CPU_COUNT,
- RESOURCE_MEM_CAPACITY
- )
-
- override fun newReader(): TableReader {
- return AzureResourceTableReader(factory.createParser(path.resolve("vmtable/vmtable.csv").toFile()))
- }
-
- override fun newReader(partition: String): TableReader {
- throw IllegalArgumentException("No partition $partition")
- }
-
- override fun toString(): String = "AzureResourceTable"
-}
diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt
index 5ea97483..a6352613 100644
--- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt
+++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt
@@ -62,49 +62,42 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe
return true
}
- override fun hasColumn(column: TableColumn<*>): Boolean {
- return when (column) {
- RESOURCE_ID -> true
- RESOURCE_START_TIME -> true
- RESOURCE_STOP_TIME -> true
- RESOURCE_CPU_COUNT -> true
- RESOURCE_MEM_CAPACITY -> true
- else -> false
- }
+ override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
+
+ override fun isNull(index: Int): Boolean {
+ require(index in 0..columns.size) { "Invalid column index" }
+ return false
}
- override fun <T> get(column: TableColumn<T>): T {
- val res: Any? = when (column) {
- RESOURCE_ID -> id
- RESOURCE_START_TIME -> startTime
- RESOURCE_STOP_TIME -> stopTime
- RESOURCE_CPU_COUNT -> getInt(RESOURCE_CPU_COUNT)
- RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_MEM_CAPACITY)
+ override fun get(index: Int): Any? {
+ return when (index) {
+ COL_ID -> id
+ COL_START_TIME -> startTime
+ COL_STOP_TIME -> stopTime
+ COL_CPU_COUNT -> getInt(index)
+ COL_MEM_CAPACITY -> getDouble(index)
else -> throw IllegalArgumentException("Invalid column")
}
-
- @Suppress("UNCHECKED_CAST")
- return res as T
}
- override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ override fun getBoolean(index: Int): Boolean {
throw IllegalArgumentException("Invalid column")
}
- override fun getInt(column: TableColumn<Int>): Int {
- return when (column) {
- RESOURCE_CPU_COUNT -> cpuCores
+ override fun getInt(index: Int): Int {
+ return when (index) {
+ COL_CPU_COUNT -> cpuCores
else -> throw IllegalArgumentException("Invalid column")
}
}
- override fun getLong(column: TableColumn<Long>): Long {
+ override fun getLong(index: Int): Long {
throw IllegalArgumentException("Invalid column")
}
- override fun getDouble(column: TableColumn<Double>): Double {
- return when (column) {
- RESOURCE_MEM_CAPACITY -> memCapacity
+ override fun getDouble(index: Int): Double {
+ return when (index) {
+ COL_MEM_CAPACITY -> memCapacity
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -138,7 +131,7 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe
/**
* Reset the state.
*/
- fun reset() {
+ private fun reset() {
id = null
startTime = null
stopTime = null
@@ -146,6 +139,19 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe
memCapacity = Double.NaN
}
+ private val COL_ID = 0
+ private val COL_START_TIME = 1
+ private val COL_STOP_TIME = 2
+ private val COL_CPU_COUNT = 3
+ private val COL_MEM_CAPACITY = 4
+ private val columns = mapOf(
+ RESOURCE_ID to COL_ID,
+ RESOURCE_START_TIME to COL_START_TIME,
+ RESOURCE_STOP_TIME to COL_STOP_TIME,
+ RESOURCE_CPU_COUNT to COL_CPU_COUNT,
+ RESOURCE_MEM_CAPACITY to COL_MEM_CAPACITY
+ )
+
companion object {
/**
* The [CsvSchema] that is used to parse the trace.
diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTrace.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTrace.kt
deleted file mode 100644
index c7e7dc36..00000000
--- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTrace.kt
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.azure
-
-import com.fasterxml.jackson.dataformat.csv.CsvFactory
-import org.opendc.trace.*
-import java.nio.file.Path
-
-/**
- * [Trace] implementation for the Azure v1 VM traces.
- */
-public class AzureTrace internal constructor(private val factory: CsvFactory, private val path: Path) : Trace {
- override val tables: List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES)
-
- override fun containsTable(name: String): Boolean = name in tables
-
- override fun getTable(name: String): Table? {
- return when (name) {
- TABLE_RESOURCES -> AzureResourceTable(factory, path)
- TABLE_RESOURCE_STATES -> AzureResourceStateTable(factory, path)
- else -> null
- }
- }
-
- override fun toString(): String = "AzureTrace[$path]"
-}
diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt
index 1230d857..253c7057 100644
--- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt
+++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt
@@ -24,10 +24,15 @@ package org.opendc.trace.azure
import com.fasterxml.jackson.dataformat.csv.CsvFactory
import com.fasterxml.jackson.dataformat.csv.CsvParser
+import org.opendc.trace.*
+import org.opendc.trace.spi.TableDetails
import org.opendc.trace.spi.TraceFormat
-import java.net.URL
-import java.nio.file.Paths
-import kotlin.io.path.exists
+import org.opendc.trace.util.CompositeTableReader
+import java.nio.file.Files
+import java.nio.file.Path
+import java.util.stream.Collectors
+import kotlin.io.path.extension
+import kotlin.io.path.nameWithoutExtension
/**
* A format implementation for the Azure v1 format.
@@ -45,12 +50,68 @@ public class AzureTraceFormat : TraceFormat {
.enable(CsvParser.Feature.ALLOW_COMMENTS)
.enable(CsvParser.Feature.TRIM_SPACES)
+ override fun create(path: Path) {
+ throw UnsupportedOperationException("Writing not supported for this format")
+ }
+
+ override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES)
+
+ override fun getDetails(path: Path, table: String): TableDetails {
+ return when (table) {
+ TABLE_RESOURCES -> TableDetails(
+ listOf(
+ RESOURCE_ID,
+ RESOURCE_START_TIME,
+ RESOURCE_STOP_TIME,
+ RESOURCE_CPU_COUNT,
+ RESOURCE_MEM_CAPACITY
+ )
+ )
+ TABLE_RESOURCE_STATES -> TableDetails(
+ listOf(
+ RESOURCE_ID,
+ RESOURCE_STATE_TIMESTAMP,
+ RESOURCE_STATE_CPU_USAGE_PCT
+ ),
+ listOf(RESOURCE_STATE_TIMESTAMP)
+ )
+ else -> throw IllegalArgumentException("Table $table not supported")
+ }
+ }
+
+ override fun newReader(path: Path, table: String): TableReader {
+ return when (table) {
+ TABLE_RESOURCES -> AzureResourceTableReader(factory.createParser(path.resolve("vmtable/vmtable.csv").toFile()))
+ TABLE_RESOURCE_STATES -> newResourceStateReader(path)
+ else -> throw IllegalArgumentException("Table $table not supported")
+ }
+ }
+
+ override fun newWriter(path: Path, table: String): TableWriter {
+ throw UnsupportedOperationException("Writing not supported for this format")
+ }
+
/**
- * Open the trace file.
+ * Construct a [TableReader] for reading over all VM CPU readings.
*/
- override fun open(url: URL): AzureTrace {
- val path = Paths.get(url.toURI())
- require(path.exists()) { "URL $url does not exist" }
- return AzureTrace(factory, path)
+ private fun newResourceStateReader(path: Path): TableReader {
+ val partitions = Files.walk(path.resolve("vm_cpu_readings"), 1)
+ .filter { !Files.isDirectory(it) && it.extension == "csv" }
+ .collect(Collectors.toMap({ it.nameWithoutExtension }, { it }))
+ .toSortedMap()
+ val it = partitions.iterator()
+
+ return object : CompositeTableReader() {
+ override fun nextReader(): TableReader? {
+ return if (it.hasNext()) {
+ val (_, partPath) = it.next()
+ return AzureResourceStateTableReader(factory.createParser(partPath.toFile()))
+ } else {
+ null
+ }
+ }
+
+ override fun toString(): String = "AzureCompositeTableReader"
+ }
}
}
diff --git a/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt b/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt
index e5735f0d..b73bb728 100644
--- a/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt
@@ -26,8 +26,7 @@ import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import org.opendc.trace.*
-import java.io.File
-import java.net.URL
+import java.nio.file.Paths
/**
* Test suite for the [AzureTraceFormat] class.
@@ -36,54 +35,29 @@ class AzureTraceFormatTest {
private val format = AzureTraceFormat()
@Test
- fun testTraceExists() {
- val url = File("src/test/resources/trace").toURI().toURL()
- assertDoesNotThrow {
- format.open(url)
- }
- }
-
- @Test
- fun testTraceDoesNotExists() {
- val url = File("src/test/resources/trace").toURI().toURL()
- assertThrows<IllegalArgumentException> {
- format.open(URL(url.toString() + "help"))
- }
- }
-
- @Test
fun testTables() {
- val url = File("src/test/resources/trace").toURI().toURL()
- val trace = format.open(url)
+ val path = Paths.get("src/test/resources/trace")
- assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), trace.tables)
+ assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), format.getTables(path))
}
@Test
fun testTableExists() {
- val url = File("src/test/resources/trace").toURI().toURL()
- val table = format.open(url).getTable(TABLE_RESOURCE_STATES)
+ val path = Paths.get("src/test/resources/trace")
- assertNotNull(table)
- assertDoesNotThrow { table!!.newReader() }
+ assertDoesNotThrow { format.getDetails(path, TABLE_RESOURCE_STATES) }
}
@Test
fun testTableDoesNotExist() {
- val url = File("src/test/resources/trace").toURI().toURL()
- val trace = format.open(url)
-
- assertFalse(trace.containsTable("test"))
- assertNull(trace.getTable("test"))
+ val path = Paths.get("src/test/resources/trace")
+ assertThrows<IllegalArgumentException> { format.getDetails(path, "test") }
}
@Test
fun testResources() {
- val url = File("src/test/resources/trace").toURI().toURL()
- val trace = format.open(url)
-
- val reader = trace.getTable(TABLE_RESOURCES)!!.newReader()
-
+ val path = Paths.get("src/test/resources/trace")
+ val reader = format.newReader(path, TABLE_RESOURCES)
assertAll(
{ assertTrue(reader.nextRow()) },
{ assertEquals("x/XsOfHO4ocsV99i4NluqKDuxctW2MMVmwqOPAlg4wp8mqbBOe3wxBlQo0+Qx+uf", reader.get(RESOURCE_ID)) },
@@ -96,14 +70,12 @@ class AzureTraceFormatTest {
@Test
fun testSmoke() {
- val url = File("src/test/resources/trace").toURI().toURL()
- val trace = format.open(url)
-
- val reader = trace.getTable(TABLE_RESOURCE_STATES)!!.newReader()
+ val path = Paths.get("src/test/resources/trace")
+ val reader = format.newReader(path, TABLE_RESOURCE_STATES)
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals("+ZcrOp5/c/fJ6mVgP5qMZlOAGDwyjaaDNM0WoWOt2IDb47gT0UwK9lFwkPQv3C7Q", reader.get(RESOURCE_STATE_ID)) },
+ { assertEquals("+ZcrOp5/c/fJ6mVgP5qMZlOAGDwyjaaDNM0WoWOt2IDb47gT0UwK9lFwkPQv3C7Q", reader.get(RESOURCE_ID)) },
{ assertEquals(0, reader.get(RESOURCE_STATE_TIMESTAMP).epochSecond) },
{ assertEquals(2.86979, reader.getDouble(RESOURCE_STATE_CPU_USAGE_PCT), 0.01) }
)
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt
deleted file mode 100644
index 4a60dff3..00000000
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.bitbrains
-
-import org.opendc.trace.*
-import java.nio.file.Files
-import java.nio.file.Path
-import java.util.stream.Collectors
-import kotlin.io.path.bufferedReader
-import kotlin.io.path.extension
-import kotlin.io.path.nameWithoutExtension
-
-/**
- * The resource state [Table] in the extended Bitbrains format.
- */
-internal class BitbrainsExResourceStateTable(path: Path) : Table {
- /**
- * The partitions that belong to the table.
- */
- private val partitions = Files.walk(path, 1)
- .filter { !Files.isDirectory(it) && it.extension == "txt" }
- .collect(Collectors.toMap({ it.nameWithoutExtension }, { it }))
- .toSortedMap()
-
- override val name: String = TABLE_RESOURCE_STATES
-
- override val isSynthetic: Boolean = false
-
- override val columns: List<TableColumn<*>> = listOf(
- RESOURCE_STATE_ID,
- RESOURCE_STATE_CLUSTER_ID,
- RESOURCE_STATE_TIMESTAMP,
- RESOURCE_STATE_CPU_COUNT,
- RESOURCE_STATE_CPU_CAPACITY,
- RESOURCE_STATE_CPU_USAGE,
- RESOURCE_STATE_CPU_USAGE_PCT,
- RESOURCE_STATE_CPU_DEMAND,
- RESOURCE_STATE_CPU_READY_PCT,
- RESOURCE_STATE_MEM_CAPACITY,
- RESOURCE_STATE_DISK_READ,
- RESOURCE_STATE_DISK_WRITE,
- )
-
- override fun newReader(): TableReader {
- val it = partitions.iterator()
-
- return object : TableReader {
- var delegate: TableReader? = nextDelegate()
-
- override fun nextRow(): Boolean {
- var delegate = delegate
-
- while (delegate != null) {
- if (delegate.nextRow()) {
- break
- }
-
- delegate.close()
- delegate = nextDelegate()
- this.delegate = delegate
- }
-
- return delegate != null
- }
-
- override fun hasColumn(column: TableColumn<*>): Boolean = delegate?.hasColumn(column) ?: false
-
- override fun <T> get(column: TableColumn<T>): T {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.get(column)
- }
-
- override fun getBoolean(column: TableColumn<Boolean>): Boolean {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.getBoolean(column)
- }
-
- override fun getInt(column: TableColumn<Int>): Int {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.getInt(column)
- }
-
- override fun getLong(column: TableColumn<Long>): Long {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.getLong(column)
- }
-
- override fun getDouble(column: TableColumn<Double>): Double {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.getDouble(column)
- }
-
- override fun close() {
- delegate?.close()
- }
-
- private fun nextDelegate(): TableReader? {
- return if (it.hasNext()) {
- val (_, path) = it.next()
- val reader = path.bufferedReader()
- return BitbrainsExResourceStateTableReader(reader)
- } else {
- null
- }
- }
-
- override fun toString(): String = "SvCompositeTableReader"
- }
- }
-
- override fun newReader(partition: String): TableReader {
- val path = requireNotNull(partitions[partition]) { "Invalid partition $partition" }
- val reader = path.bufferedReader()
- return BitbrainsExResourceStateTableReader(reader)
- }
-
- override fun toString(): String = "BitbrainsExResourceStateTable"
-}
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt
index f1cf7307..c1b6f5ba 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt
@@ -88,70 +88,53 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR
return true
}
- override fun hasColumn(column: TableColumn<*>): Boolean {
- return when (column) {
- RESOURCE_STATE_ID -> true
- RESOURCE_STATE_CLUSTER_ID -> true
- RESOURCE_STATE_TIMESTAMP -> true
- RESOURCE_STATE_CPU_COUNT -> true
- RESOURCE_STATE_CPU_CAPACITY -> true
- RESOURCE_STATE_CPU_USAGE -> true
- RESOURCE_STATE_CPU_USAGE_PCT -> true
- RESOURCE_STATE_CPU_DEMAND -> true
- RESOURCE_STATE_CPU_READY_PCT -> true
- RESOURCE_STATE_MEM_CAPACITY -> true
- RESOURCE_STATE_DISK_READ -> true
- RESOURCE_STATE_DISK_WRITE -> true
- else -> false
- }
+ override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
+
+ override fun isNull(index: Int): Boolean {
+ require(index in 0..COL_MAX) { "Invalid column index" }
+ return false
}
- override fun <T> get(column: TableColumn<T>): T {
- val res: Any? = when (column) {
- RESOURCE_STATE_ID -> id
- RESOURCE_STATE_CLUSTER_ID -> cluster
- RESOURCE_STATE_TIMESTAMP -> timestamp
- RESOURCE_STATE_CPU_COUNT -> getInt(RESOURCE_STATE_CPU_COUNT)
- RESOURCE_STATE_CPU_CAPACITY -> getDouble(RESOURCE_STATE_CPU_CAPACITY)
- RESOURCE_STATE_CPU_USAGE -> getDouble(RESOURCE_STATE_CPU_USAGE)
- RESOURCE_STATE_CPU_USAGE_PCT -> getDouble(RESOURCE_STATE_CPU_USAGE_PCT)
- RESOURCE_STATE_MEM_CAPACITY -> getDouble(RESOURCE_STATE_MEM_CAPACITY)
- RESOURCE_STATE_DISK_READ -> getDouble(RESOURCE_STATE_DISK_READ)
- RESOURCE_STATE_DISK_WRITE -> getDouble(RESOURCE_STATE_DISK_WRITE)
+ override fun get(index: Int): Any? {
+ return when (index) {
+ COL_ID -> id
+ COL_CLUSTER_ID -> cluster
+ COL_TIMESTAMP -> timestamp
+ COL_NCPUS -> getInt(index)
+ COL_POWERED_ON -> getInt(index)
+ COL_CPU_CAPACITY, COL_CPU_USAGE, COL_CPU_USAGE_PCT, COL_CPU_READY_PCT, COL_CPU_DEMAND, COL_MEM_CAPACITY, COL_DISK_READ, COL_DISK_WRITE -> getDouble(index)
else -> throw IllegalArgumentException("Invalid column")
}
-
- @Suppress("UNCHECKED_CAST")
- return res as T
}
- override fun getBoolean(column: TableColumn<Boolean>): Boolean {
- return when (column) {
- RESOURCE_STATE_POWERED_ON -> poweredOn
+ override fun getBoolean(index: Int): Boolean {
+ return when (index) {
+ COL_POWERED_ON -> poweredOn
else -> throw IllegalArgumentException("Invalid column")
}
}
- override fun getInt(column: TableColumn<Int>): Int {
- return when (column) {
- RESOURCE_STATE_CPU_COUNT -> cpuCores
+ override fun getInt(index: Int): Int {
+ return when (index) {
+ COL_NCPUS -> cpuCores
else -> throw IllegalArgumentException("Invalid column")
}
}
- override fun getLong(column: TableColumn<Long>): Long {
+ override fun getLong(index: Int): Long {
throw IllegalArgumentException("Invalid column")
}
- override fun getDouble(column: TableColumn<Double>): Double {
- return when (column) {
- RESOURCE_STATE_CPU_CAPACITY -> cpuCapacity
- RESOURCE_STATE_CPU_USAGE -> cpuUsage
- RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsage / cpuCapacity
- RESOURCE_STATE_CPU_DEMAND -> cpuDemand
- RESOURCE_STATE_MEM_CAPACITY -> memCapacity
- RESOURCE_STATE_DISK_READ -> diskRead
- RESOURCE_STATE_DISK_WRITE -> diskWrite
+ override fun getDouble(index: Int): Double {
+ return when (index) {
+ COL_CPU_CAPACITY -> cpuCapacity
+ COL_CPU_USAGE -> cpuUsage
+ COL_CPU_USAGE_PCT -> cpuUsage / cpuCapacity
+ COL_CPU_READY_PCT -> cpuReadyPct
+ COL_CPU_DEMAND -> cpuDemand
+ COL_MEM_CAPACITY -> memCapacity
+ COL_DISK_READ -> diskRead
+ COL_DISK_WRITE -> diskWrite
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -209,4 +192,21 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR
private val COL_CPU_CAPACITY = 18
private val COL_ID = 19
private val COL_MEM_CAPACITY = 20
+ private val COL_CPU_USAGE_PCT = 21
+ private val COL_MAX = COL_CPU_USAGE_PCT + 1
+
+ private val columns = mapOf(
+ RESOURCE_ID to COL_ID,
+ RESOURCE_CLUSTER_ID to COL_CLUSTER_ID,
+ RESOURCE_STATE_TIMESTAMP to COL_TIMESTAMP,
+ RESOURCE_CPU_COUNT to COL_NCPUS,
+ RESOURCE_CPU_CAPACITY to COL_CPU_CAPACITY,
+ RESOURCE_STATE_CPU_USAGE to COL_CPU_USAGE,
+ RESOURCE_STATE_CPU_USAGE_PCT to COL_CPU_USAGE_PCT,
+ RESOURCE_STATE_CPU_DEMAND to COL_CPU_DEMAND,
+ RESOURCE_STATE_CPU_READY_PCT to COL_CPU_READY_PCT,
+ RESOURCE_MEM_CAPACITY to COL_MEM_CAPACITY,
+ RESOURCE_STATE_DISK_READ to COL_DISK_READ,
+ RESOURCE_STATE_DISK_WRITE to COL_DISK_WRITE
+ )
}
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTrace.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTrace.kt
deleted file mode 100644
index f16c493d..00000000
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTrace.kt
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.bitbrains
-
-import org.opendc.trace.*
-import java.nio.file.Path
-
-/**
- * [Trace] implementation for the extended Bitbrains format.
- */
-public class BitbrainsExTrace internal constructor(private val path: Path) : Trace {
- override val tables: List<String> = listOf(TABLE_RESOURCE_STATES)
-
- override fun containsTable(name: String): Boolean = TABLE_RESOURCE_STATES == name
-
- override fun getTable(name: String): Table? {
- if (!containsTable(name)) {
- return null
- }
-
- return BitbrainsExResourceStateTable(path)
- }
-
- override fun toString(): String = "BitbrainsExTrace[$path]"
-}
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt
index 06388a84..20222c8a 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt
@@ -22,10 +22,16 @@
package org.opendc.trace.bitbrains
+import org.opendc.trace.*
+import org.opendc.trace.spi.TableDetails
import org.opendc.trace.spi.TraceFormat
-import java.net.URL
-import java.nio.file.Paths
-import kotlin.io.path.exists
+import org.opendc.trace.util.CompositeTableReader
+import java.nio.file.Files
+import java.nio.file.Path
+import java.util.stream.Collectors
+import kotlin.io.path.bufferedReader
+import kotlin.io.path.extension
+import kotlin.io.path.nameWithoutExtension
/**
* A format implementation for the extended Bitbrains trace format.
@@ -36,12 +42,67 @@ public class BitbrainsExTraceFormat : TraceFormat {
*/
override val name: String = "bitbrains-ex"
+ override fun create(path: Path) {
+ throw UnsupportedOperationException("Writing not supported for this format")
+ }
+
+ override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCE_STATES)
+
+ override fun getDetails(path: Path, table: String): TableDetails {
+ return when (table) {
+ TABLE_RESOURCE_STATES -> TableDetails(
+ listOf(
+ RESOURCE_ID,
+ RESOURCE_CLUSTER_ID,
+ RESOURCE_STATE_TIMESTAMP,
+ RESOURCE_CPU_COUNT,
+ RESOURCE_CPU_CAPACITY,
+ RESOURCE_STATE_CPU_USAGE,
+ RESOURCE_STATE_CPU_USAGE_PCT,
+ RESOURCE_STATE_CPU_DEMAND,
+ RESOURCE_STATE_CPU_READY_PCT,
+ RESOURCE_MEM_CAPACITY,
+ RESOURCE_STATE_DISK_READ,
+ RESOURCE_STATE_DISK_WRITE
+ ),
+ listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP)
+ )
+ else -> throw IllegalArgumentException("Table $table not supported")
+ }
+ }
+
+ override fun newReader(path: Path, table: String): TableReader {
+ return when (table) {
+ TABLE_RESOURCE_STATES -> newResourceStateReader(path)
+ else -> throw IllegalArgumentException("Table $table not supported")
+ }
+ }
+
+ override fun newWriter(path: Path, table: String): TableWriter {
+ throw UnsupportedOperationException("Writing not supported for this format")
+ }
+
/**
- * Open the trace file.
+ * Construct a [TableReader] for reading over all resource state partitions.
*/
- override fun open(url: URL): BitbrainsExTrace {
- val path = Paths.get(url.toURI())
- require(path.exists()) { "URL $url does not exist" }
- return BitbrainsExTrace(path)
+ private fun newResourceStateReader(path: Path): TableReader {
+ val partitions = Files.walk(path, 1)
+ .filter { !Files.isDirectory(it) && it.extension == "txt" }
+ .collect(Collectors.toMap({ it.nameWithoutExtension }, { it }))
+ .toSortedMap()
+ val it = partitions.iterator()
+
+ return object : CompositeTableReader() {
+ override fun nextReader(): TableReader? {
+ return if (it.hasNext()) {
+ val (_, partPath) = it.next()
+ return BitbrainsExResourceStateTableReader(partPath.bufferedReader())
+ } else {
+ null
+ }
+ }
+
+ override fun toString(): String = "BitbrainsExCompositeTableReader"
+ }
}
}
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt
deleted file mode 100644
index 7241b18b..00000000
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.bitbrains
-
-import com.fasterxml.jackson.dataformat.csv.CsvFactory
-import org.opendc.trace.*
-import java.nio.file.Files
-import java.nio.file.Path
-import java.util.stream.Collectors
-import kotlin.io.path.extension
-import kotlin.io.path.nameWithoutExtension
-
-/**
- * The resource state [Table] in the Bitbrains format.
- */
-internal class BitbrainsResourceStateTable(private val factory: CsvFactory, path: Path) : Table {
- /**
- * The partitions that belong to the table.
- */
- private val partitions =
- Files.walk(path, 1)
- .filter { !Files.isDirectory(it) && it.extension == "csv" }
- .collect(Collectors.toMap({ it.nameWithoutExtension }, { it }))
- .toSortedMap()
-
- override val name: String = TABLE_RESOURCE_STATES
-
- override val isSynthetic: Boolean = false
-
- override val columns: List<TableColumn<*>> = listOf(
- RESOURCE_STATE_ID,
- RESOURCE_STATE_TIMESTAMP,
- RESOURCE_STATE_CPU_COUNT,
- RESOURCE_STATE_CPU_CAPACITY,
- RESOURCE_STATE_CPU_USAGE,
- RESOURCE_STATE_CPU_USAGE_PCT,
- RESOURCE_STATE_MEM_CAPACITY,
- RESOURCE_STATE_MEM_USAGE,
- RESOURCE_STATE_DISK_READ,
- RESOURCE_STATE_DISK_WRITE,
- RESOURCE_STATE_NET_RX,
- RESOURCE_STATE_NET_TX,
- )
-
- override fun newReader(): TableReader {
- val it = partitions.iterator()
-
- return object : TableReader {
- var delegate: TableReader? = nextDelegate()
-
- override fun nextRow(): Boolean {
- var delegate = delegate
-
- while (delegate != null) {
- if (delegate.nextRow()) {
- break
- }
-
- delegate.close()
- delegate = nextDelegate()
- this.delegate = delegate
- }
-
- return delegate != null
- }
-
- override fun hasColumn(column: TableColumn<*>): Boolean = delegate?.hasColumn(column) ?: false
-
- override fun <T> get(column: TableColumn<T>): T {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.get(column)
- }
-
- override fun getBoolean(column: TableColumn<Boolean>): Boolean {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.getBoolean(column)
- }
-
- override fun getInt(column: TableColumn<Int>): Int {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.getInt(column)
- }
-
- override fun getLong(column: TableColumn<Long>): Long {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.getLong(column)
- }
-
- override fun getDouble(column: TableColumn<Double>): Double {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.getDouble(column)
- }
-
- override fun close() {
- delegate?.close()
- }
-
- private fun nextDelegate(): TableReader? {
- return if (it.hasNext()) {
- val (partition, path) = it.next()
- return BitbrainsResourceStateTableReader(partition, factory.createParser(path.toFile()))
- } else {
- null
- }
- }
-
- override fun toString(): String = "BitbrainsCompositeTableReader"
- }
- }
-
- override fun newReader(partition: String): TableReader {
- val path = requireNotNull(partitions[partition]) { "Invalid partition $partition" }
- return BitbrainsResourceStateTableReader(partition, factory.createParser(path.toFile()))
- }
-
- override fun toString(): String = "BitbrainsResourceStateTable"
-}
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt
index 56e66f5c..3a8839b4 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt
@@ -111,71 +111,49 @@ internal class BitbrainsResourceStateTableReader(private val partition: String,
return true
}
- override fun hasColumn(column: TableColumn<*>): Boolean {
- return when (column) {
- RESOURCE_STATE_ID -> true
- RESOURCE_STATE_TIMESTAMP -> true
- RESOURCE_STATE_CPU_COUNT -> true
- RESOURCE_STATE_CPU_CAPACITY -> true
- RESOURCE_STATE_CPU_USAGE -> true
- RESOURCE_STATE_CPU_USAGE_PCT -> true
- RESOURCE_STATE_MEM_CAPACITY -> true
- RESOURCE_STATE_MEM_USAGE -> true
- RESOURCE_STATE_DISK_READ -> true
- RESOURCE_STATE_DISK_WRITE -> true
- RESOURCE_STATE_NET_RX -> true
- RESOURCE_STATE_NET_TX -> true
- else -> false
- }
+ override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
+
+ override fun isNull(index: Int): Boolean {
+ check(index in 0..columns.size) { "Invalid column index" }
+ return false
}
- override fun <T> get(column: TableColumn<T>): T {
- val res: Any? = when (column) {
- RESOURCE_STATE_ID -> partition
- RESOURCE_STATE_TIMESTAMP -> timestamp
- RESOURCE_STATE_CPU_COUNT -> cpuCores
- RESOURCE_STATE_CPU_CAPACITY -> cpuCapacity
- RESOURCE_STATE_CPU_USAGE -> cpuUsage
- RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct
- RESOURCE_STATE_MEM_CAPACITY -> memCapacity
- RESOURCE_STATE_MEM_USAGE -> memUsage
- RESOURCE_STATE_DISK_READ -> diskRead
- RESOURCE_STATE_DISK_WRITE -> diskWrite
- RESOURCE_STATE_NET_RX -> netReceived
- RESOURCE_STATE_NET_TX -> netTransmitted
+ override fun get(index: Int): Any? {
+ return when (index) {
+ COL_ID -> partition
+ COL_TIMESTAMP -> timestamp
+ COL_CPU_COUNT -> getInt(index)
+ COL_CPU_CAPACITY, COL_CPU_USAGE, COL_CPU_USAGE_PCT, COL_MEM_CAPACITY, COL_MEM_USAGE, COL_DISK_READ, COL_DISK_WRITE, COL_NET_RX, COL_NET_TX -> getDouble(index)
else -> throw IllegalArgumentException("Invalid column")
}
-
- @Suppress("UNCHECKED_CAST")
- return res as T
}
- override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ override fun getBoolean(index: Int): Boolean {
throw IllegalArgumentException("Invalid column")
}
- override fun getInt(column: TableColumn<Int>): Int {
- return when (column) {
- RESOURCE_STATE_CPU_COUNT -> cpuCores
+ override fun getInt(index: Int): Int {
+ return when (index) {
+ COL_CPU_COUNT -> cpuCores
else -> throw IllegalArgumentException("Invalid column")
}
}
- override fun getLong(column: TableColumn<Long>): Long {
+ override fun getLong(index: Int): Long {
throw IllegalArgumentException("Invalid column")
}
- override fun getDouble(column: TableColumn<Double>): Double {
- return when (column) {
- RESOURCE_STATE_CPU_CAPACITY -> cpuCapacity
- RESOURCE_STATE_CPU_USAGE -> cpuUsage
- RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct
- RESOURCE_STATE_MEM_CAPACITY -> memCapacity
- RESOURCE_STATE_MEM_USAGE -> memUsage
- RESOURCE_STATE_DISK_READ -> diskRead
- RESOURCE_STATE_DISK_WRITE -> diskWrite
- RESOURCE_STATE_NET_RX -> netReceived
- RESOURCE_STATE_NET_TX -> netTransmitted
+ override fun getDouble(index: Int): Double {
+ return when (index) {
+ COL_CPU_CAPACITY -> cpuCapacity
+ COL_CPU_USAGE -> cpuUsage
+ COL_CPU_USAGE_PCT -> cpuUsagePct
+ COL_MEM_CAPACITY -> memCapacity
+ COL_MEM_USAGE -> memUsage
+ COL_DISK_READ -> diskRead
+ COL_DISK_WRITE -> diskWrite
+ COL_NET_RX -> netReceived
+ COL_NET_TX -> netTransmitted
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -249,6 +227,34 @@ internal class BitbrainsResourceStateTableReader(private val partition: String,
netTransmitted = Double.NaN
}
+ private val COL_TIMESTAMP = 0
+ private val COL_CPU_COUNT = 1
+ private val COL_CPU_CAPACITY = 2
+ private val COL_CPU_USAGE = 3
+ private val COL_CPU_USAGE_PCT = 4
+ private val COL_MEM_CAPACITY = 5
+ private val COL_MEM_USAGE = 6
+ private val COL_DISK_READ = 7
+ private val COL_DISK_WRITE = 8
+ private val COL_NET_RX = 9
+ private val COL_NET_TX = 10
+ private val COL_ID = 11
+
+ private val columns = mapOf(
+ RESOURCE_ID to COL_ID,
+ RESOURCE_STATE_TIMESTAMP to COL_TIMESTAMP,
+ RESOURCE_CPU_COUNT to COL_CPU_COUNT,
+ RESOURCE_CPU_CAPACITY to COL_CPU_CAPACITY,
+ RESOURCE_STATE_CPU_USAGE to COL_CPU_USAGE,
+ RESOURCE_STATE_CPU_USAGE_PCT to COL_CPU_USAGE_PCT,
+ RESOURCE_MEM_CAPACITY to COL_MEM_CAPACITY,
+ RESOURCE_STATE_MEM_USAGE to COL_MEM_USAGE,
+ RESOURCE_STATE_DISK_READ to COL_DISK_READ,
+ RESOURCE_STATE_DISK_WRITE to COL_DISK_WRITE,
+ RESOURCE_STATE_NET_RX to COL_NET_RX,
+ RESOURCE_STATE_NET_TX to COL_NET_TX
+ )
+
/**
* The type of the timestamp in the trace.
*/
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTable.kt
deleted file mode 100644
index bc4f0b7d..00000000
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTable.kt
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.bitbrains
-
-import com.fasterxml.jackson.dataformat.csv.CsvFactory
-import org.opendc.trace.*
-import java.nio.file.Files
-import java.nio.file.Path
-import java.util.stream.Collectors
-import kotlin.io.path.extension
-import kotlin.io.path.nameWithoutExtension
-
-/**
- * The resources [Table] in the Bitbrains format.
- */
-internal class BitbrainsResourceTable(private val factory: CsvFactory, path: Path) : Table {
- /**
- * The VMs that belong to the table.
- */
- private val vms =
- Files.walk(path, 1)
- .filter { !Files.isDirectory(it) && it.extension == "csv" }
- .collect(Collectors.toMap({ it.nameWithoutExtension }, { it }))
- .toSortedMap()
-
- override val name: String = TABLE_RESOURCES
-
- override val isSynthetic: Boolean = true
-
- override val columns: List<TableColumn<*>> = listOf(RESOURCE_ID)
-
- override fun newReader(): TableReader {
- return BitbrainsResourceTableReader(factory, vms)
- }
-
- override fun newReader(partition: String): TableReader {
- throw IllegalArgumentException("Unknown partition $partition")
- }
-
- override fun toString(): String = "BitbrainsResourceTable"
-}
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt
index c02dc5ae..3701994a 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt
@@ -43,13 +43,14 @@ internal class BitbrainsResourceTableReader(private val factory: CsvFactory, vms
val parser = factory.createParser(path.toFile())
val reader = BitbrainsResourceStateTableReader(name, parser)
+ val idCol = reader.resolve(RESOURCE_ID)
try {
if (!reader.nextRow()) {
continue
}
- id = reader.get(RESOURCE_STATE_ID)
+ id = reader.get(idCol) as String
return true
} finally {
reader.close()
@@ -59,36 +60,33 @@ internal class BitbrainsResourceTableReader(private val factory: CsvFactory, vms
return false
}
- override fun hasColumn(column: TableColumn<*>): Boolean {
- return when (column) {
- RESOURCE_ID -> true
- else -> false
- }
+ override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
+
+ override fun isNull(index: Int): Boolean {
+ check(index in 0..columns.size) { "Invalid column index" }
+ return false
}
- override fun <T> get(column: TableColumn<T>): T {
- val res: Any? = when (column) {
- RESOURCE_ID -> id
+ override fun get(index: Int): Any? {
+ return when (index) {
+ COL_ID -> id
else -> throw IllegalArgumentException("Invalid column")
}
-
- @Suppress("UNCHECKED_CAST")
- return res as T
}
- override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ override fun getBoolean(index: Int): Boolean {
throw IllegalArgumentException("Invalid column")
}
- override fun getInt(column: TableColumn<Int>): Int {
+ override fun getInt(index: Int): Int {
throw IllegalArgumentException("Invalid column")
}
- override fun getLong(column: TableColumn<Long>): Long {
+ override fun getLong(index: Int): Long {
throw IllegalArgumentException("Invalid column")
}
- override fun getDouble(column: TableColumn<Double>): Double {
+ override fun getDouble(index: Int): Double {
throw IllegalArgumentException("Invalid column")
}
@@ -105,4 +103,7 @@ internal class BitbrainsResourceTableReader(private val factory: CsvFactory, vms
private fun reset() {
id = null
}
+
+ private val COL_ID = 0
+ private val columns = mapOf(RESOURCE_ID to COL_ID)
}
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt
deleted file mode 100644
index bcd8dd52..00000000
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.bitbrains
-
-import com.fasterxml.jackson.dataformat.csv.CsvFactory
-import org.opendc.trace.*
-import java.nio.file.Path
-
-/**
- * [Trace] implementation for the Bitbrains format.
- */
-public class BitbrainsTrace internal constructor(private val factory: CsvFactory, private val path: Path) : Trace {
- override val tables: List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES)
-
- override fun containsTable(name: String): Boolean = tables.contains(name)
-
- override fun getTable(name: String): Table? {
- return when (name) {
- TABLE_RESOURCES -> BitbrainsResourceTable(factory, path)
- TABLE_RESOURCE_STATES -> BitbrainsResourceStateTable(factory, path)
- else -> null
- }
- }
-
- override fun toString(): String = "BitbrainsTrace[$path]"
-}
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt
index 55b11fe3..3885c931 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt
@@ -24,10 +24,15 @@ package org.opendc.trace.bitbrains
import com.fasterxml.jackson.dataformat.csv.CsvFactory
import com.fasterxml.jackson.dataformat.csv.CsvParser
+import org.opendc.trace.*
+import org.opendc.trace.spi.TableDetails
import org.opendc.trace.spi.TraceFormat
-import java.net.URL
-import java.nio.file.Paths
-import kotlin.io.path.exists
+import org.opendc.trace.util.CompositeTableReader
+import java.nio.file.Files
+import java.nio.file.Path
+import java.util.stream.Collectors
+import kotlin.io.path.extension
+import kotlin.io.path.nameWithoutExtension
/**
* A format implementation for the GWF trace format.
@@ -45,12 +50,75 @@ public class BitbrainsTraceFormat : TraceFormat {
.enable(CsvParser.Feature.ALLOW_COMMENTS)
.enable(CsvParser.Feature.TRIM_SPACES)
+ override fun create(path: Path) {
+ throw UnsupportedOperationException("Writing not supported for this format")
+ }
+
+ override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES)
+
+ override fun getDetails(path: Path, table: String): TableDetails {
+ return when (table) {
+ TABLE_RESOURCES -> TableDetails(listOf(RESOURCE_ID))
+ TABLE_RESOURCE_STATES -> TableDetails(
+ listOf(
+ RESOURCE_ID,
+ RESOURCE_STATE_TIMESTAMP,
+ RESOURCE_CPU_COUNT,
+ RESOURCE_CPU_CAPACITY,
+ RESOURCE_STATE_CPU_USAGE,
+ RESOURCE_STATE_CPU_USAGE_PCT,
+ RESOURCE_MEM_CAPACITY,
+ RESOURCE_STATE_MEM_USAGE,
+ RESOURCE_STATE_DISK_READ,
+ RESOURCE_STATE_DISK_WRITE,
+ RESOURCE_STATE_NET_RX,
+ RESOURCE_STATE_NET_TX,
+ ),
+ listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP)
+ )
+ else -> throw IllegalArgumentException("Table $table not supported")
+ }
+ }
+
+ override fun newReader(path: Path, table: String): TableReader {
+ return when (table) {
+ TABLE_RESOURCES -> {
+ val vms = Files.walk(path, 1)
+ .filter { !Files.isDirectory(it) && it.extension == "csv" }
+ .collect(Collectors.toMap({ it.nameWithoutExtension }, { it }))
+ .toSortedMap()
+ BitbrainsResourceTableReader(factory, vms)
+ }
+ TABLE_RESOURCE_STATES -> newResourceStateReader(path)
+ else -> throw IllegalArgumentException("Table $table not supported")
+ }
+ }
+
+ override fun newWriter(path: Path, table: String): TableWriter {
+ throw UnsupportedOperationException("Writing not supported for this format")
+ }
+
/**
- * Open a Bitbrains trace.
+ * Construct a [TableReader] for reading over all resource state partitions.
*/
- override fun open(url: URL): BitbrainsTrace {
- val path = Paths.get(url.toURI())
- require(path.exists()) { "URL $url does not exist" }
- return BitbrainsTrace(factory, path)
+ private fun newResourceStateReader(path: Path): TableReader {
+ val partitions = Files.walk(path, 1)
+ .filter { !Files.isDirectory(it) && it.extension == "csv" }
+ .collect(Collectors.toMap({ it.nameWithoutExtension }, { it }))
+ .toSortedMap()
+ val it = partitions.iterator()
+
+ return object : CompositeTableReader() {
+ override fun nextReader(): TableReader? {
+ return if (it.hasNext()) {
+ val (partition, partPath) = it.next()
+ return BitbrainsResourceStateTableReader(partition, factory.createParser(partPath.toFile()))
+ } else {
+ null
+ }
+ }
+
+ override fun toString(): String = "BitbrainsCompositeTableReader"
+ }
}
}
diff --git a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt
index 2e4f176a..d734cf5f 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt
@@ -26,62 +26,38 @@ import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import org.opendc.trace.*
-import java.net.URL
+import java.nio.file.Paths
/**
* Test suite for the [BitbrainsExTraceFormat] class.
*/
-class BitbrainsExTraceFormatTest {
+internal class BitbrainsExTraceFormatTest {
private val format = BitbrainsExTraceFormat()
@Test
- fun testTraceExists() {
- val url = checkNotNull(BitbrainsExTraceFormatTest::class.java.getResource("/vm.txt"))
- assertDoesNotThrow {
- format.open(url)
- }
- }
-
- @Test
- fun testTraceDoesNotExists() {
- val url = checkNotNull(BitbrainsExTraceFormatTest::class.java.getResource("/vm.txt"))
- assertThrows<IllegalArgumentException> {
- format.open(URL(url.toString() + "help"))
- }
- }
-
- @Test
fun testTables() {
- val url = checkNotNull(BitbrainsExTraceFormatTest::class.java.getResource("/vm.txt"))
- val trace = format.open(url)
+ val path = Paths.get("src/test/resources/vm.txt")
- assertEquals(listOf(TABLE_RESOURCE_STATES), trace.tables)
+ assertEquals(listOf(TABLE_RESOURCE_STATES), format.getTables(path))
}
@Test
fun testTableExists() {
- val url = checkNotNull(BitbrainsExTraceFormatTest::class.java.getResource("/vm.txt"))
- val table = format.open(url).getTable(TABLE_RESOURCE_STATES)
+ val path = Paths.get("src/test/resources/vm.txt")
- assertNotNull(table)
- assertDoesNotThrow { table!!.newReader() }
+ assertDoesNotThrow { format.getDetails(path, TABLE_RESOURCE_STATES) }
}
@Test
fun testTableDoesNotExist() {
- val url = checkNotNull(BitbrainsExTraceFormatTest::class.java.getResource("/vm.txt"))
- val trace = format.open(url)
-
- assertFalse(trace.containsTable("test"))
- assertNull(trace.getTable("test"))
+ val path = Paths.get("src/test/resources/vm.txt")
+ assertThrows<IllegalArgumentException> { format.getDetails(path, "test") }
}
@Test
fun testSmoke() {
- val url = checkNotNull(BitbrainsExTraceFormatTest::class.java.getResource("/vm.txt"))
- val trace = format.open(url)
-
- val reader = trace.getTable(TABLE_RESOURCE_STATES)!!.newReader()
+ val path = Paths.get("src/test/resources/vm.txt")
+ val reader = format.newReader(path, TABLE_RESOURCE_STATES)
assertAll(
{ assertTrue(reader.nextRow()) },
diff --git a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt
index ff4a33f8..41e7def2 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt
@@ -26,66 +26,38 @@ import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import org.opendc.trace.*
-import java.net.URL
+import java.nio.file.Paths
/**
* Test suite for the [BitbrainsTraceFormat] class.
*/
class BitbrainsTraceFormatTest {
- @Test
- fun testTraceExists() {
- val format = BitbrainsTraceFormat()
- val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv"))
- assertDoesNotThrow {
- format.open(url)
- }
- }
-
- @Test
- fun testTraceDoesNotExists() {
- val format = BitbrainsTraceFormat()
- val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv"))
- assertThrows<IllegalArgumentException> {
- format.open(URL(url.toString() + "help"))
- }
- }
+ private val format = BitbrainsTraceFormat()
@Test
fun testTables() {
- val format = BitbrainsTraceFormat()
- val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv"))
- val trace = format.open(url)
+ val path = Paths.get("src/test/resources/bitbrains.csv")
- assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), trace.tables)
+ assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), format.getTables(path))
}
@Test
fun testTableExists() {
- val format = BitbrainsTraceFormat()
- val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv"))
- val table = format.open(url).getTable(TABLE_RESOURCE_STATES)
+ val path = Paths.get("src/test/resources/bitbrains.csv")
- assertNotNull(table)
- assertDoesNotThrow { table!!.newReader() }
+ assertDoesNotThrow { format.getDetails(path, TABLE_RESOURCE_STATES) }
}
@Test
fun testTableDoesNotExist() {
- val format = BitbrainsTraceFormat()
- val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv"))
- val trace = format.open(url)
-
- assertFalse(trace.containsTable("test"))
- assertNull(trace.getTable("test"))
+ val path = Paths.get("src/test/resources/bitbrains.csv")
+ assertThrows<IllegalArgumentException> { format.getDetails(path, "test") }
}
@Test
fun testResources() {
- val format = BitbrainsTraceFormat()
- val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv"))
- val trace = format.open(url)
-
- val reader = trace.getTable(TABLE_RESOURCES)!!.newReader()
+ val path = Paths.get("src/test/resources/bitbrains.csv")
+ val reader = format.newReader(path, TABLE_RESOURCES)
assertAll(
{ assertTrue(reader.nextRow()) },
@@ -98,11 +70,8 @@ class BitbrainsTraceFormatTest {
@Test
fun testSmoke() {
- val format = BitbrainsTraceFormat()
- val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv"))
- val trace = format.open(url)
-
- val reader = trace.getTable(TABLE_RESOURCE_STATES)!!.newReader()
+ val path = Paths.get("src/test/resources/bitbrains.csv")
+ val reader = format.newReader(path, TABLE_RESOURCE_STATES)
assertAll(
{ assertTrue(reader.nextRow()) },
diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt
deleted file mode 100644
index fd7bd068..00000000
--- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.gwf
-
-import com.fasterxml.jackson.dataformat.csv.CsvFactory
-import org.opendc.trace.*
-import java.net.URL
-
-/**
- * A [Table] containing the tasks in a GWF trace.
- */
-internal class GwfTaskTable(private val factory: CsvFactory, private val url: URL) : Table {
- override val name: String = TABLE_TASKS
-
- override val isSynthetic: Boolean = false
-
- override val columns: List<TableColumn<*>> = listOf(
- TASK_WORKFLOW_ID,
- TASK_ID,
- TASK_SUBMIT_TIME,
- TASK_RUNTIME,
- TASK_REQ_NCPUS,
- TASK_ALLOC_NCPUS,
- TASK_PARENTS
- )
-
- override fun newReader(): TableReader {
- return GwfTaskTableReader(factory.createParser(url))
- }
-
- override fun newReader(partition: String): TableReader {
- throw IllegalArgumentException("Invalid partition $partition")
- }
-
- override fun toString(): String = "GwfTaskTable"
-}
diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt
index 39eb5520..aa4c543b 100644
--- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt
+++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt
@@ -67,52 +67,43 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader {
return true
}
- override fun hasColumn(column: TableColumn<*>): Boolean {
- return when (column) {
- TASK_WORKFLOW_ID -> true
- TASK_ID -> true
- TASK_SUBMIT_TIME -> true
- TASK_RUNTIME -> true
- TASK_REQ_NCPUS -> true
- TASK_ALLOC_NCPUS -> true
- TASK_PARENTS -> true
- else -> false
- }
+ override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
+
+ override fun isNull(index: Int): Boolean {
+ check(index in 0..columns.size) { "Invalid column" }
+ return false
}
- override fun <T> get(column: TableColumn<T>): T {
- val res: Any? = when (column) {
- TASK_WORKFLOW_ID -> workflowId
- TASK_ID -> jobId
- TASK_SUBMIT_TIME -> submitTime
- TASK_RUNTIME -> runtime
- TASK_REQ_NCPUS -> nProcs
- TASK_ALLOC_NCPUS -> reqNProcs
- TASK_PARENTS -> dependencies
+ override fun get(index: Int): Any? {
+ return when (index) {
+ COL_JOB_ID -> jobId
+ COL_WORKFLOW_ID -> workflowId
+ COL_SUBMIT_TIME -> submitTime
+ COL_RUNTIME -> runtime
+ COL_REQ_NPROC -> getInt(index)
+ COL_NPROC -> getInt(index)
+ COL_DEPS -> dependencies
else -> throw IllegalArgumentException("Invalid column")
}
-
- @Suppress("UNCHECKED_CAST")
- return res as T
}
- override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ override fun getBoolean(index: Int): Boolean {
throw IllegalArgumentException("Invalid column")
}
- override fun getInt(column: TableColumn<Int>): Int {
- return when (column) {
- TASK_REQ_NCPUS -> nProcs
- TASK_ALLOC_NCPUS -> reqNProcs
+ override fun getInt(index: Int): Int {
+ return when (index) {
+ COL_REQ_NPROC -> reqNProcs
+ COL_NPROC -> nProcs
else -> throw IllegalArgumentException("Invalid column")
}
}
- override fun getLong(column: TableColumn<Long>): Long {
+ override fun getLong(index: Int): Long {
throw IllegalArgumentException("Invalid column")
}
- override fun getDouble(column: TableColumn<Double>): Double {
+ override fun getDouble(index: Int): Double {
throw IllegalArgumentException("Invalid column")
}
@@ -180,6 +171,24 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader {
dependencies = emptySet()
}
+ private val COL_WORKFLOW_ID = 0
+ private val COL_JOB_ID = 1
+ private val COL_SUBMIT_TIME = 2
+ private val COL_RUNTIME = 3
+ private val COL_NPROC = 4
+ private val COL_REQ_NPROC = 5
+ private val COL_DEPS = 6
+
+ private val columns = mapOf(
+ TASK_ID to COL_JOB_ID,
+ TASK_WORKFLOW_ID to COL_WORKFLOW_ID,
+ TASK_SUBMIT_TIME to COL_SUBMIT_TIME,
+ TASK_RUNTIME to COL_RUNTIME,
+ TASK_ALLOC_NCPUS to COL_NPROC,
+ TASK_REQ_NCPUS to COL_REQ_NPROC,
+ TASK_PARENTS to COL_DEPS
+ )
+
companion object {
/**
* The [CsvSchema] that is used to parse the trace.
diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTrace.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTrace.kt
deleted file mode 100644
index 166c1e56..00000000
--- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTrace.kt
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.gwf
-
-import com.fasterxml.jackson.dataformat.csv.CsvFactory
-import org.opendc.trace.*
-import java.net.URL
-
-/**
- * [Trace] implementation for the GWF format.
- */
-public class GwfTrace internal constructor(private val factory: CsvFactory, private val url: URL) : Trace {
- override val tables: List<String> = listOf(TABLE_TASKS)
-
- override fun containsTable(name: String): Boolean = TABLE_TASKS == name
-
- override fun getTable(name: String): Table? {
- if (!containsTable(name)) {
- return null
- }
-
- return GwfTaskTable(factory, url)
- }
-
- override fun toString(): String = "GwfTrace[$url]"
-}
diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt
index 6d542503..d4287420 100644
--- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt
+++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt
@@ -24,10 +24,10 @@ package org.opendc.trace.gwf
import com.fasterxml.jackson.dataformat.csv.CsvFactory
import com.fasterxml.jackson.dataformat.csv.CsvParser
+import org.opendc.trace.*
+import org.opendc.trace.spi.TableDetails
import org.opendc.trace.spi.TraceFormat
-import java.net.URL
-import java.nio.file.Paths
-import kotlin.io.path.exists
+import java.nio.file.Path
/**
* A [TraceFormat] implementation for the GWF trace format.
@@ -45,12 +45,38 @@ public class GwfTraceFormat : TraceFormat {
.enable(CsvParser.Feature.ALLOW_COMMENTS)
.enable(CsvParser.Feature.TRIM_SPACES)
- /**
- * Read the tasks in the GWF trace.
- */
- public override fun open(url: URL): GwfTrace {
- val path = Paths.get(url.toURI())
- require(path.exists()) { "URL $url does not exist" }
- return GwfTrace(factory, url)
+ override fun create(path: Path) {
+ throw UnsupportedOperationException("Writing not supported for this format")
+ }
+
+ override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS)
+
+ override fun getDetails(path: Path, table: String): TableDetails {
+ return when (table) {
+ TABLE_TASKS -> TableDetails(
+ listOf(
+ TASK_WORKFLOW_ID,
+ TASK_ID,
+ TASK_SUBMIT_TIME,
+ TASK_RUNTIME,
+ TASK_REQ_NCPUS,
+ TASK_ALLOC_NCPUS,
+ TASK_PARENTS,
+ ),
+ listOf(TASK_WORKFLOW_ID)
+ )
+ else -> throw IllegalArgumentException("Table $table not supported")
+ }
+ }
+
+ override fun newReader(path: Path, table: String): TableReader {
+ return when (table) {
+ TABLE_TASKS -> GwfTaskTableReader(factory.createParser(path.toFile()))
+ else -> throw IllegalArgumentException("Table $table not supported")
+ }
+ }
+
+ override fun newWriter(path: Path, table: String): TableWriter {
+ throw UnsupportedOperationException("Writing not supported for this format")
}
}
diff --git a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt
index b209b979..7fe403b2 100644
--- a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt
@@ -22,13 +22,10 @@
package org.opendc.trace.gwf
+import org.junit.jupiter.api.*
import org.junit.jupiter.api.Assertions.*
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertAll
-import org.junit.jupiter.api.assertDoesNotThrow
-import org.junit.jupiter.api.assertThrows
import org.opendc.trace.*
-import java.net.URL
+import java.nio.file.Paths
import java.time.Duration
import java.time.Instant
@@ -36,59 +33,32 @@ import java.time.Instant
* Test suite for the [GwfTraceFormat] class.
*/
internal class GwfTraceFormatTest {
- @Test
- fun testTraceExists() {
- val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf"))
- val format = GwfTraceFormat()
- assertDoesNotThrow {
- format.open(input)
- }
- }
-
- @Test
- fun testTraceDoesNotExists() {
- val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf"))
- val format = GwfTraceFormat()
- assertThrows<IllegalArgumentException> {
- format.open(URL(input.toString() + "help"))
- }
- }
+ private val format = GwfTraceFormat()
@Test
fun testTables() {
- val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf"))
- val format = GwfTraceFormat()
- val trace = format.open(input)
+ val path = Paths.get(checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")).toURI())
- assertEquals(listOf(TABLE_TASKS), trace.tables)
+ assertEquals(listOf(TABLE_TASKS), format.getTables(path))
}
@Test
fun testTableExists() {
- val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf"))
- val format = GwfTraceFormat()
- val table = format.open(input).getTable(TABLE_TASKS)
-
- assertNotNull(table)
- assertDoesNotThrow { table!!.newReader() }
+ val path = Paths.get(checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")).toURI())
+ assertDoesNotThrow { format.getDetails(path, TABLE_TASKS) }
}
@Test
fun testTableDoesNotExist() {
- val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf"))
- val format = GwfTraceFormat()
- val trace = format.open(input)
+ val path = Paths.get(checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")).toURI())
- assertFalse(trace.containsTable("test"))
- assertNull(trace.getTable("test"))
+ assertThrows<IllegalArgumentException> { format.getDetails(path, "test") }
}
@Test
fun testTableReader() {
- val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf"))
- val format = GwfTraceFormat()
- val table = format.open(input).getTable(TABLE_TASKS)!!
- val reader = table.newReader()
+ val path = Paths.get(checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")).toURI())
+ val reader = format.newReader(path, TABLE_TASKS)
assertAll(
{ assertTrue(reader.nextRow()) },
@@ -99,13 +69,4 @@ internal class GwfTraceFormatTest {
{ assertEquals(emptySet<String>(), reader.get(TASK_PARENTS)) },
)
}
-
- @Test
- fun testTableReaderPartition() {
- val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf"))
- val format = GwfTraceFormat()
- val table = format.open(input).getTable(TABLE_TASKS)!!
-
- assertThrows<IllegalArgumentException> { table.newReader("test") }
- }
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt
deleted file mode 100644
index bee4ba7e..00000000
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.opendc
-
-import org.apache.avro.generic.GenericRecord
-import org.opendc.trace.*
-import org.opendc.trace.util.parquet.LocalParquetReader
-import java.nio.file.Path
-
-/**
- * The resource state [Table] in the OpenDC virtual machine trace format.
- */
-internal class OdcVmResourceStateTable(private val path: Path) : Table {
- override val name: String = TABLE_RESOURCE_STATES
- override val isSynthetic: Boolean = false
-
- override val columns: List<TableColumn<*>> = listOf(
- RESOURCE_STATE_ID,
- RESOURCE_STATE_TIMESTAMP,
- RESOURCE_STATE_DURATION,
- RESOURCE_STATE_CPU_COUNT,
- RESOURCE_STATE_CPU_USAGE,
- )
-
- override fun newReader(): TableReader {
- val reader = LocalParquetReader<GenericRecord>(path.resolve("trace.parquet"))
- return OdcVmResourceStateTableReader(reader)
- }
-
- override fun newReader(partition: String): TableReader {
- throw IllegalArgumentException("Unknown partition $partition")
- }
-}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
index df3bcfa6..b5043f82 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
@@ -55,54 +55,46 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
return record != null
}
- override fun hasColumn(column: TableColumn<*>): Boolean {
- return when (column) {
- RESOURCE_STATE_ID -> true
- RESOURCE_STATE_TIMESTAMP -> true
- RESOURCE_STATE_DURATION -> true
- RESOURCE_STATE_CPU_COUNT -> true
- RESOURCE_STATE_CPU_USAGE -> true
- else -> false
- }
+ override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
+
+ override fun isNull(index: Int): Boolean {
+ check(index in 0..columns.size) { "Invalid column index" }
+ return get(index) == null
}
- override fun <T> get(column: TableColumn<T>): T {
+ override fun get(index: Int): Any? {
val record = checkNotNull(record) { "Reader in invalid state" }
- @Suppress("UNCHECKED_CAST")
- val res: Any = when (column) {
- RESOURCE_STATE_ID -> record[COL_ID].toString()
- RESOURCE_STATE_TIMESTAMP -> Instant.ofEpochMilli(record[COL_TIMESTAMP] as Long)
- RESOURCE_STATE_DURATION -> Duration.ofMillis(record[COL_DURATION] as Long)
- RESOURCE_STATE_CPU_COUNT -> getInt(RESOURCE_STATE_CPU_COUNT)
- RESOURCE_STATE_CPU_USAGE -> getDouble(RESOURCE_STATE_CPU_USAGE)
+ return when (index) {
+ COL_ID -> record[AVRO_COL_ID].toString()
+ COL_TIMESTAMP -> Instant.ofEpochMilli(record[AVRO_COL_TIMESTAMP] as Long)
+ COL_DURATION -> Duration.ofMillis(record[AVRO_COL_DURATION] as Long)
+ COL_CPU_COUNT -> getInt(index)
+ COL_CPU_USAGE -> getDouble(index)
else -> throw IllegalArgumentException("Invalid column")
}
-
- @Suppress("UNCHECKED_CAST")
- return res as T
}
- override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ override fun getBoolean(index: Int): Boolean {
throw IllegalArgumentException("Invalid column")
}
- override fun getInt(column: TableColumn<Int>): Int {
+ override fun getInt(index: Int): Int {
val record = checkNotNull(record) { "Reader in invalid state" }
- return when (column) {
- RESOURCE_STATE_CPU_COUNT -> record[COL_CPU_COUNT] as Int
+ return when (index) {
+ COL_CPU_COUNT -> record[AVRO_COL_CPU_COUNT] as Int
else -> throw IllegalArgumentException("Invalid column")
}
}
- override fun getLong(column: TableColumn<Long>): Long {
+ override fun getLong(index: Int): Long {
throw IllegalArgumentException("Invalid column")
}
- override fun getDouble(column: TableColumn<Double>): Double {
+ override fun getDouble(index: Int): Double {
val record = checkNotNull(record) { "Reader in invalid state" }
- return when (column) {
- RESOURCE_STATE_CPU_USAGE -> (record[COL_CPU_USAGE] as Number).toDouble()
+ return when (index) {
+ COL_CPU_USAGE -> (record[AVRO_COL_CPU_USAGE] as Number).toDouble()
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -118,20 +110,34 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
*/
private fun initColumns(schema: Schema) {
try {
- COL_ID = schema.getField("id").pos()
- COL_TIMESTAMP = (schema.getField("timestamp") ?: schema.getField("time")).pos()
- COL_DURATION = schema.getField("duration").pos()
- COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("cores")).pos()
- COL_CPU_USAGE = (schema.getField("cpu_usage") ?: schema.getField("cpuUsage")).pos()
+ AVRO_COL_ID = schema.getField("id").pos()
+ AVRO_COL_TIMESTAMP = (schema.getField("timestamp") ?: schema.getField("time")).pos()
+ AVRO_COL_DURATION = schema.getField("duration").pos()
+ AVRO_COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("cores")).pos()
+ AVRO_COL_CPU_USAGE = (schema.getField("cpu_usage") ?: schema.getField("cpuUsage")).pos()
} catch (e: NullPointerException) {
// This happens when the field we are trying to access does not exist
throw IllegalArgumentException("Invalid schema", e)
}
}
- private var COL_ID = -1
- private var COL_TIMESTAMP = -1
- private var COL_DURATION = -1
- private var COL_CPU_COUNT = -1
- private var COL_CPU_USAGE = -1
+ private var AVRO_COL_ID = -1
+ private var AVRO_COL_TIMESTAMP = -1
+ private var AVRO_COL_DURATION = -1
+ private var AVRO_COL_CPU_COUNT = -1
+ private var AVRO_COL_CPU_USAGE = -1
+
+ private val COL_ID = 0
+ private val COL_TIMESTAMP = 1
+ private val COL_DURATION = 2
+ private val COL_CPU_COUNT = 3
+ private val COL_CPU_USAGE = 4
+
+ private val columns = mapOf(
+ RESOURCE_ID to COL_ID,
+ RESOURCE_STATE_TIMESTAMP to COL_TIMESTAMP,
+ RESOURCE_STATE_DURATION to COL_DURATION,
+ RESOURCE_CPU_COUNT to COL_CPU_COUNT,
+ RESOURCE_STATE_CPU_USAGE to COL_CPU_USAGE,
+ )
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt
new file mode 100644
index 00000000..15a8cb85
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt
@@ -0,0 +1,123 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.opendc
+
+import org.apache.avro.Schema
+import org.apache.avro.generic.GenericRecord
+import org.apache.avro.generic.GenericRecordBuilder
+import org.apache.parquet.hadoop.ParquetWriter
+import org.opendc.trace.*
+import java.time.Duration
+import java.time.Instant
+
+/**
+ * A [TableWriter] implementation for the OpenDC virtual machine trace format.
+ */
+internal class OdcVmResourceStateTableWriter(
+ private val writer: ParquetWriter<GenericRecord>,
+ private val schema: Schema
+) : TableWriter {
+ /**
+ * The current builder for the record that is being written.
+ */
+ private var builder: GenericRecordBuilder? = null
+
+ /**
+ * The fields belonging to the resource state schema.
+ */
+ private val fields = schema.fields
+
+ override fun startRow() {
+ builder = GenericRecordBuilder(schema)
+ }
+
+ override fun endRow() {
+ val builder = checkNotNull(builder) { "No active row" }
+ this.builder = null
+
+ val record = builder.build()
+ val id = record[COL_ID] as String
+ val timestamp = record[COL_TIMESTAMP] as Long
+
+ check(lastId != id || timestamp >= lastTimestamp) { "Records need to be ordered by (id, timestamp)" }
+
+ writer.write(builder.build())
+
+ lastId = id
+ lastTimestamp = timestamp
+ }
+
+ override fun resolve(column: TableColumn<*>): Int {
+ val schema = schema
+ return when (column) {
+ RESOURCE_ID -> schema.getField("id").pos()
+ RESOURCE_STATE_TIMESTAMP -> (schema.getField("timestamp") ?: schema.getField("time")).pos()
+ RESOURCE_STATE_DURATION -> schema.getField("duration").pos()
+ RESOURCE_CPU_COUNT -> (schema.getField("cpu_count") ?: schema.getField("cores")).pos()
+ RESOURCE_STATE_CPU_USAGE -> (schema.getField("cpu_usage") ?: schema.getField("cpuUsage")).pos()
+ else -> -1
+ }
+ }
+
+ override fun set(index: Int, value: Any) {
+ val builder = checkNotNull(builder) { "No active row" }
+
+ builder.set(
+ fields[index],
+ when (index) {
+ COL_TIMESTAMP -> (value as Instant).toEpochMilli()
+ COL_DURATION -> (value as Duration).toMillis()
+ else -> value
+ }
+ )
+ }
+
+ override fun setBoolean(index: Int, value: Boolean) = set(index, value)
+
+ override fun setInt(index: Int, value: Int) = set(index, value)
+
+ override fun setLong(index: Int, value: Long) = set(index, value)
+
+ override fun setDouble(index: Int, value: Double) = set(index, value)
+
+ override fun flush() {
+ // Not available
+ }
+
+ override fun close() {
+ writer.close()
+ }
+
+ /**
+ * Last column values that are used to check for correct partitioning.
+ */
+ private var lastId: String? = null
+ private var lastTimestamp: Long = Long.MIN_VALUE
+
+ /**
+ * Columns with special behavior.
+ */
+ private val COL_ID = resolve(RESOURCE_ID)
+ private val COL_TIMESTAMP = resolve(RESOURCE_STATE_TIMESTAMP)
+ private val COL_DURATION = resolve(RESOURCE_STATE_DURATION)
+}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt
deleted file mode 100644
index b1456560..00000000
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.opendc
-
-import org.apache.avro.generic.GenericRecord
-import org.opendc.trace.*
-import org.opendc.trace.util.parquet.LocalParquetReader
-import java.nio.file.Path
-
-/**
- * The resource [Table] for the OpenDC virtual machine trace format.
- */
-internal class OdcVmResourceTable(private val path: Path) : Table {
- override val name: String = TABLE_RESOURCES
- override val isSynthetic: Boolean = false
-
- override val columns: List<TableColumn<*>> = listOf(
- RESOURCE_ID,
- RESOURCE_START_TIME,
- RESOURCE_STOP_TIME,
- RESOURCE_CPU_COUNT,
- RESOURCE_MEM_CAPACITY,
- )
-
- override fun newReader(): TableReader {
- val reader = LocalParquetReader<GenericRecord>(path.resolve("meta.parquet"))
- return OdcVmResourceTableReader(reader)
- }
-
- override fun newReader(partition: String): TableReader {
- throw IllegalArgumentException("Unknown partition $partition")
- }
-}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
index c52da62d..d93929aa 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
@@ -54,56 +54,48 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G
return record != null
}
- override fun hasColumn(column: TableColumn<*>): Boolean {
- return when (column) {
- RESOURCE_ID -> true
- RESOURCE_START_TIME -> true
- RESOURCE_STOP_TIME -> true
- RESOURCE_CPU_COUNT -> true
- RESOURCE_MEM_CAPACITY -> true
- else -> false
- }
+ override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
+
+ override fun isNull(index: Int): Boolean {
+ check(index in 0..columns.size) { "Invalid column index" }
+ return get(index) == null
}
- override fun <T> get(column: TableColumn<T>): T {
+ override fun get(index: Int): Any? {
val record = checkNotNull(record) { "Reader in invalid state" }
- @Suppress("UNCHECKED_CAST")
- val res: Any = when (column) {
- RESOURCE_ID -> record[COL_ID].toString()
- RESOURCE_START_TIME -> Instant.ofEpochMilli(record[COL_START_TIME] as Long)
- RESOURCE_STOP_TIME -> Instant.ofEpochMilli(record[COL_STOP_TIME] as Long)
- RESOURCE_CPU_COUNT -> getInt(RESOURCE_CPU_COUNT)
- RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_MEM_CAPACITY)
+ return when (index) {
+ COL_ID -> record[AVRO_COL_ID].toString()
+ COL_START_TIME -> Instant.ofEpochMilli(record[AVRO_COL_START_TIME] as Long)
+ COL_STOP_TIME -> Instant.ofEpochMilli(record[AVRO_COL_STOP_TIME] as Long)
+ COL_CPU_COUNT -> getInt(index)
+ COL_MEM_CAPACITY -> getDouble(index)
else -> throw IllegalArgumentException("Invalid column")
}
-
- @Suppress("UNCHECKED_CAST")
- return res as T
}
- override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ override fun getBoolean(index: Int): Boolean {
throw IllegalArgumentException("Invalid column")
}
- override fun getInt(column: TableColumn<Int>): Int {
+ override fun getInt(index: Int): Int {
val record = checkNotNull(record) { "Reader in invalid state" }
- return when (column) {
- RESOURCE_CPU_COUNT -> record[COL_CPU_COUNT] as Int
+ return when (index) {
+ COL_CPU_COUNT -> record[AVRO_COL_CPU_COUNT] as Int
else -> throw IllegalArgumentException("Invalid column")
}
}
- override fun getLong(column: TableColumn<Long>): Long {
+ override fun getLong(index: Int): Long {
throw IllegalArgumentException("Invalid column")
}
- override fun getDouble(column: TableColumn<Double>): Double {
+ override fun getDouble(index: Int): Double {
val record = checkNotNull(record) { "Reader in invalid state" }
- return when (column) {
- RESOURCE_MEM_CAPACITY -> (record[COL_MEM_CAPACITY] as Number).toDouble()
+ return when (index) {
+ COL_MEM_CAPACITY -> (record[AVRO_COL_MEM_CAPACITY] as Number).toDouble()
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -119,20 +111,34 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G
*/
private fun initColumns(schema: Schema) {
try {
- COL_ID = schema.getField("id").pos()
- COL_START_TIME = (schema.getField("start_time") ?: schema.getField("submissionTime")).pos()
- COL_STOP_TIME = (schema.getField("stop_time") ?: schema.getField("endTime")).pos()
- COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("maxCores")).pos()
- COL_MEM_CAPACITY = (schema.getField("mem_capacity") ?: schema.getField("requiredMemory")).pos()
+ AVRO_COL_ID = schema.getField("id").pos()
+ AVRO_COL_START_TIME = (schema.getField("start_time") ?: schema.getField("submissionTime")).pos()
+ AVRO_COL_STOP_TIME = (schema.getField("stop_time") ?: schema.getField("endTime")).pos()
+ AVRO_COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("maxCores")).pos()
+ AVRO_COL_MEM_CAPACITY = (schema.getField("mem_capacity") ?: schema.getField("requiredMemory")).pos()
} catch (e: NullPointerException) {
// This happens when the field we are trying to access does not exist
throw IllegalArgumentException("Invalid schema")
}
}
- private var COL_ID = -1
- private var COL_START_TIME = -1
- private var COL_STOP_TIME = -1
- private var COL_CPU_COUNT = -1
- private var COL_MEM_CAPACITY = -1
+ private var AVRO_COL_ID = -1
+ private var AVRO_COL_START_TIME = -1
+ private var AVRO_COL_STOP_TIME = -1
+ private var AVRO_COL_CPU_COUNT = -1
+ private var AVRO_COL_MEM_CAPACITY = -1
+
+ private val COL_ID = 0
+ private val COL_START_TIME = 1
+ private val COL_STOP_TIME = 2
+ private val COL_CPU_COUNT = 3
+ private val COL_MEM_CAPACITY = 4
+
+ private val columns = mapOf(
+ RESOURCE_ID to COL_ID,
+ RESOURCE_START_TIME to COL_START_TIME,
+ RESOURCE_STOP_TIME to COL_STOP_TIME,
+ RESOURCE_CPU_COUNT to COL_CPU_COUNT,
+ RESOURCE_MEM_CAPACITY to COL_MEM_CAPACITY,
+ )
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt
new file mode 100644
index 00000000..9cc6ca7d
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt
@@ -0,0 +1,106 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.opendc
+
+import org.apache.avro.Schema
+import org.apache.avro.generic.GenericRecord
+import org.apache.avro.generic.GenericRecordBuilder
+import org.apache.parquet.hadoop.ParquetWriter
+import org.opendc.trace.*
+import java.time.Instant
+import kotlin.math.roundToLong
+
+/**
+ * A [TableWriter] implementation for the OpenDC virtual machine trace format.
+ */
+internal class OdcVmResourceTableWriter(
+ private val writer: ParquetWriter<GenericRecord>,
+ private val schema: Schema
+) : TableWriter {
+ /**
+ * The current builder for the record that is being written.
+ */
+ private var builder: GenericRecordBuilder? = null
+
+ /**
+ * The fields belonging to the resource schema.
+ */
+ private val fields = schema.fields
+
+ override fun startRow() {
+ builder = GenericRecordBuilder(schema)
+ }
+
+ override fun endRow() {
+ val builder = checkNotNull(builder) { "No active row" }
+ this.builder = null
+ writer.write(builder.build())
+ }
+
+ override fun resolve(column: TableColumn<*>): Int {
+ val schema = schema
+ return when (column) {
+ RESOURCE_ID -> schema.getField("id").pos()
+ RESOURCE_START_TIME -> (schema.getField("start_time") ?: schema.getField("submissionTime")).pos()
+ RESOURCE_STOP_TIME -> (schema.getField("stop_time") ?: schema.getField("endTime")).pos()
+ RESOURCE_CPU_COUNT -> (schema.getField("cpu_count") ?: schema.getField("maxCores")).pos()
+ RESOURCE_MEM_CAPACITY -> (schema.getField("mem_capacity") ?: schema.getField("requiredMemory")).pos()
+ else -> -1
+ }
+ }
+
+ override fun set(index: Int, value: Any) {
+ val builder = checkNotNull(builder) { "No active row" }
+ builder.set(
+ fields[index],
+ when (index) {
+ COL_START_TIME, COL_STOP_TIME -> (value as Instant).toEpochMilli()
+ COL_MEM_CAPACITY -> (value as Double).roundToLong()
+ else -> value
+ }
+ )
+ }
+
+ override fun setBoolean(index: Int, value: Boolean) = set(index, value)
+
+ override fun setInt(index: Int, value: Int) = set(index, value)
+
+ override fun setLong(index: Int, value: Long) = set(index, value)
+
+ override fun setDouble(index: Int, value: Double) = set(index, value)
+
+ override fun flush() {
+ // Not available
+ }
+
+ override fun close() {
+ writer.close()
+ }
+
+ /**
+ * Columns with special behavior.
+ */
+ private val COL_START_TIME = resolve(RESOURCE_START_TIME)
+ private val COL_STOP_TIME = resolve(RESOURCE_STOP_TIME)
+ private val COL_MEM_CAPACITY = resolve(RESOURCE_MEM_CAPACITY)
+}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
index 8edba725..9b32f8fd 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
@@ -24,11 +24,18 @@ package org.opendc.trace.opendc
import org.apache.avro.Schema
import org.apache.avro.SchemaBuilder
+import org.apache.avro.generic.GenericRecord
+import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.parquet.hadoop.ParquetFileWriter
+import org.apache.parquet.hadoop.metadata.CompressionCodecName
+import org.opendc.trace.*
+import org.opendc.trace.spi.TableDetails
import org.opendc.trace.spi.TraceFormat
+import org.opendc.trace.util.parquet.LocalOutputFile
+import org.opendc.trace.util.parquet.LocalParquetReader
import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA
-import java.net.URL
-import java.nio.file.Paths
-import kotlin.io.path.exists
+import java.nio.file.Files
+import java.nio.file.Path
/**
* A [TraceFormat] implementation of the OpenDC virtual machine trace format.
@@ -39,13 +46,83 @@ public class OdcVmTraceFormat : TraceFormat {
*/
override val name: String = "opendc-vm"
- /**
- * Open a Bitbrains Parquet trace.
- */
- override fun open(url: URL): OdcVmTrace {
- val path = Paths.get(url.toURI())
- require(path.exists()) { "URL $url does not exist" }
- return OdcVmTrace(path)
+ override fun create(path: Path) {
+ // Construct directory containing the trace files
+ Files.createDirectory(path)
+
+ val tables = getTables(path)
+
+ for (table in tables) {
+ val writer = newWriter(path, table)
+ writer.close()
+ }
+ }
+
+ override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES)
+
+ override fun getDetails(path: Path, table: String): TableDetails {
+ return when (table) {
+ TABLE_RESOURCES -> TableDetails(
+ listOf(
+ RESOURCE_ID,
+ RESOURCE_START_TIME,
+ RESOURCE_STOP_TIME,
+ RESOURCE_CPU_COUNT,
+ RESOURCE_MEM_CAPACITY,
+ )
+ )
+ TABLE_RESOURCE_STATES -> TableDetails(
+ listOf(
+ RESOURCE_ID,
+ RESOURCE_STATE_TIMESTAMP,
+ RESOURCE_STATE_DURATION,
+ RESOURCE_CPU_COUNT,
+ RESOURCE_STATE_CPU_USAGE,
+ ),
+ listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP)
+ )
+ else -> throw IllegalArgumentException("Table $table not supported")
+ }
+ }
+
+ override fun newReader(path: Path, table: String): TableReader {
+ return when (table) {
+ TABLE_RESOURCES -> {
+ val reader = LocalParquetReader<GenericRecord>(path.resolve("meta.parquet"))
+ OdcVmResourceTableReader(reader)
+ }
+ TABLE_RESOURCE_STATES -> {
+ val reader = LocalParquetReader<GenericRecord>(path.resolve("trace.parquet"))
+ OdcVmResourceStateTableReader(reader)
+ }
+ else -> throw IllegalArgumentException("Table $table not supported")
+ }
+ }
+
+ override fun newWriter(path: Path, table: String): TableWriter {
+ return when (table) {
+ TABLE_RESOURCES -> {
+ val schema = RESOURCES_SCHEMA
+ val writer = AvroParquetWriter.builder<GenericRecord>(LocalOutputFile(path.resolve("meta.parquet")))
+ .withSchema(schema)
+ .withCompressionCodec(CompressionCodecName.ZSTD)
+ .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+ .build()
+ OdcVmResourceTableWriter(writer, schema)
+ }
+ TABLE_RESOURCE_STATES -> {
+ val schema = RESOURCE_STATES_SCHEMA
+ val writer = AvroParquetWriter.builder<GenericRecord>(LocalOutputFile(path.resolve("trace.parquet")))
+ .withSchema(schema)
+ .withCompressionCodec(CompressionCodecName.ZSTD)
+ .withDictionaryEncoding("id", true)
+ .withBloomFilterEnabled("id", true)
+ .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+ .build()
+ OdcVmResourceStateTableWriter(writer, schema)
+ }
+ else -> throw IllegalArgumentException("Table $table not supported")
+ }
}
public companion object {
diff --git a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt
index 42eb369e..bfe0f881 100644
--- a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt
@@ -29,8 +29,7 @@ import org.junit.jupiter.api.assertThrows
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.opendc.trace.*
-import java.io.File
-import java.net.URL
+import java.nio.file.Paths
/**
* Test suite for the [OdcVmTraceFormat] implementation.
@@ -39,52 +38,30 @@ internal class OdcVmTraceFormatTest {
private val format = OdcVmTraceFormat()
@Test
- fun testTraceExists() {
- val url = File("src/test/resources/trace-v2.1").toURI().toURL()
- assertDoesNotThrow { format.open(url) }
- }
-
- @Test
- fun testTraceDoesNotExists() {
- val url = File("src/test/resources/trace-v2.1").toURI().toURL()
- assertThrows<IllegalArgumentException> {
- format.open(URL(url.toString() + "help"))
- }
- }
-
- @Test
fun testTables() {
- val url = File("src/test/resources/trace-v2.1").toURI().toURL()
- val trace = format.open(url)
+ val path = Paths.get("src/test/resources/trace-v2.1")
- assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), trace.tables)
+ assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), format.getTables(path))
}
@Test
fun testTableExists() {
- val url = File("src/test/resources/trace-v2.1").toURI().toURL()
- val table = format.open(url).getTable(TABLE_RESOURCE_STATES)
+ val path = Paths.get("src/test/resources/trace-v2.1")
- assertNotNull(table)
- assertDoesNotThrow { table!!.newReader() }
+ assertDoesNotThrow { format.getDetails(path, TABLE_RESOURCE_STATES) }
}
@Test
fun testTableDoesNotExist() {
- val url = File("src/test/resources/trace-v2.1").toURI().toURL()
- val trace = format.open(url)
-
- assertFalse(trace.containsTable("test"))
- assertNull(trace.getTable("test"))
+ val path = Paths.get("src/test/resources/trace-v2.1")
+ assertThrows<IllegalArgumentException> { format.getDetails(path, "test") }
}
@ParameterizedTest
@ValueSource(strings = ["trace-v2.0", "trace-v2.1"])
fun testResources(name: String) {
- val url = File("src/test/resources/$name").toURI().toURL()
- val trace = format.open(url)
-
- val reader = trace.getTable(TABLE_RESOURCES)!!.newReader()
+ val path = Paths.get("src/test/resources/$name")
+ val reader = format.newReader(path, TABLE_RESOURCES)
assertAll(
{ assertTrue(reader.nextRow()) },
@@ -104,14 +81,12 @@ internal class OdcVmTraceFormatTest {
@ParameterizedTest
@ValueSource(strings = ["trace-v2.0", "trace-v2.1"])
fun testSmoke(name: String) {
- val url = File("src/test/resources/$name").toURI().toURL()
- val trace = format.open(url)
-
- val reader = trace.getTable(TABLE_RESOURCE_STATES)!!.newReader()
+ val path = Paths.get("src/test/resources/$name")
+ val reader = format.newReader(path, TABLE_RESOURCE_STATES)
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals("1019", reader.get(RESOURCE_STATE_ID)) },
+ { assertEquals("1019", reader.get(RESOURCE_ID)) },
{ assertEquals(1376314846, reader.get(RESOURCE_STATE_TIMESTAMP).epochSecond) },
{ assertEquals(0.0, reader.getDouble(RESOURCE_STATE_CPU_USAGE), 0.01) }
)
diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt
deleted file mode 100644
index 7ec0d607..00000000
--- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.swf
-
-import org.opendc.trace.*
-import java.nio.file.Path
-import kotlin.io.path.bufferedReader
-
-/**
- * A [Table] containing the tasks in a SWF trace.
- */
-internal class SwfTaskTable(private val path: Path) : Table {
- override val name: String = TABLE_TASKS
-
- override val isSynthetic: Boolean = false
-
- override val columns: List<TableColumn<*>> = listOf(
- TASK_ID,
- TASK_SUBMIT_TIME,
- TASK_WAIT_TIME,
- TASK_RUNTIME,
- TASK_REQ_NCPUS,
- TASK_ALLOC_NCPUS,
- TASK_PARENTS,
- TASK_STATUS,
- TASK_GROUP_ID,
- TASK_USER_ID
- )
-
- override fun newReader(): TableReader {
- val reader = path.bufferedReader()
- return SwfTaskTableReader(reader)
- }
-
- override fun newReader(partition: String): TableReader {
- throw IllegalArgumentException("Invalid partition $partition")
- }
-
- override fun toString(): String = "SwfTaskTable"
-}
diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt
index 3f49c770..2f6ea6ee 100644
--- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt
+++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt
@@ -69,64 +69,43 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea
return true
}
- override fun hasColumn(column: TableColumn<*>): Boolean {
- return when (column) {
- TASK_ID -> true
- TASK_SUBMIT_TIME -> true
- TASK_WAIT_TIME -> true
- TASK_RUNTIME -> true
- TASK_REQ_NCPUS -> true
- TASK_ALLOC_NCPUS -> true
- TASK_PARENTS -> true
- TASK_STATUS -> true
- TASK_GROUP_ID -> true
- TASK_USER_ID -> true
- else -> false
- }
+ override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
+
+ override fun isNull(index: Int): Boolean {
+ require(index in columns.values) { "Invalid column index" }
+ return false
}
- override fun <T> get(column: TableColumn<T>): T {
- val res: Any = when (column) {
- TASK_ID -> fields[COL_JOB_ID]
- TASK_SUBMIT_TIME -> Instant.ofEpochSecond(fields[COL_SUBMIT_TIME].toLong(10))
- TASK_WAIT_TIME -> Duration.ofSeconds(fields[COL_WAIT_TIME].toLong(10))
- TASK_RUNTIME -> Duration.ofSeconds(fields[COL_RUN_TIME].toLong(10))
- TASK_REQ_NCPUS -> getInt(TASK_REQ_NCPUS)
- TASK_ALLOC_NCPUS -> getInt(TASK_ALLOC_NCPUS)
- TASK_PARENTS -> {
- val parent = fields[COL_PARENT_JOB].toLong(10)
+ override fun get(index: Int): Any? {
+ return when (index) {
+ COL_JOB_ID -> fields[index]
+ COL_SUBMIT_TIME -> Instant.ofEpochSecond(fields[index].toLong(10))
+ COL_WAIT_TIME, COL_RUN_TIME -> Duration.ofSeconds(fields[index].toLong(10))
+ COL_REQ_NCPUS, COL_ALLOC_NCPUS, COL_STATUS, COL_GROUP_ID, COL_USER_ID -> getInt(index)
+ COL_PARENT_JOB -> {
+ val parent = fields[index].toLong(10)
if (parent < 0) emptySet() else setOf(parent)
}
- TASK_STATUS -> getInt(TASK_STATUS)
- TASK_GROUP_ID -> getInt(TASK_GROUP_ID)
- TASK_USER_ID -> getInt(TASK_USER_ID)
else -> throw IllegalArgumentException("Invalid column")
}
-
- @Suppress("UNCHECKED_CAST")
- return res as T
}
- override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ override fun getBoolean(index: Int): Boolean {
throw IllegalArgumentException("Invalid column")
}
- override fun getInt(column: TableColumn<Int>): Int {
- return when (column) {
- TASK_REQ_NCPUS -> fields[COL_REQ_NCPUS].toInt(10)
- TASK_ALLOC_NCPUS -> fields[COL_ALLOC_NCPUS].toInt(10)
- TASK_STATUS -> fields[COL_STATUS].toInt(10)
- TASK_GROUP_ID -> fields[COL_GROUP_ID].toInt(10)
- TASK_USER_ID -> fields[COL_USER_ID].toInt(10)
+ override fun getInt(index: Int): Int {
+ return when (index) {
+ COL_REQ_NCPUS, COL_ALLOC_NCPUS, COL_STATUS, COL_GROUP_ID, COL_USER_ID -> fields[index].toInt(10)
else -> throw IllegalArgumentException("Invalid column")
}
}
- override fun getLong(column: TableColumn<Long>): Long {
+ override fun getLong(index: Int): Long {
throw IllegalArgumentException("Invalid column")
}
- override fun getDouble(column: TableColumn<Double>): Double {
+ override fun getDouble(index: Int): Double {
throw IllegalArgumentException("Invalid column")
}
@@ -155,4 +134,17 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea
private val COL_PART_NUM = 15
private val COL_PARENT_JOB = 16
private val COL_PARENT_THINK_TIME = 17
+
+ private val columns = mapOf(
+ TASK_ID to COL_JOB_ID,
+ TASK_SUBMIT_TIME to COL_SUBMIT_TIME,
+ TASK_WAIT_TIME to COL_WAIT_TIME,
+ TASK_RUNTIME to COL_RUN_TIME,
+ TASK_ALLOC_NCPUS to COL_ALLOC_NCPUS,
+ TASK_REQ_NCPUS to COL_REQ_NCPUS,
+ TASK_STATUS to COL_STATUS,
+ TASK_USER_ID to COL_USER_ID,
+ TASK_GROUP_ID to COL_GROUP_ID,
+ TASK_PARENTS to COL_PARENT_JOB
+ )
}
diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt
index 36c3122e..1fd076d5 100644
--- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt
+++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt
@@ -22,10 +22,11 @@
package org.opendc.trace.swf
+import org.opendc.trace.*
+import org.opendc.trace.spi.TableDetails
import org.opendc.trace.spi.TraceFormat
-import java.net.URL
-import java.nio.file.Paths
-import kotlin.io.path.exists
+import java.nio.file.Path
+import kotlin.io.path.bufferedReader
/**
* Support for the Standard Workload Format (SWF) in OpenDC.
@@ -35,9 +36,41 @@ import kotlin.io.path.exists
public class SwfTraceFormat : TraceFormat {
override val name: String = "swf"
- override fun open(url: URL): SwfTrace {
- val path = Paths.get(url.toURI())
- require(path.exists()) { "URL $url does not exist" }
- return SwfTrace(path)
+ override fun create(path: Path) {
+ throw UnsupportedOperationException("Writing not supported for this format")
+ }
+
+ override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS)
+
+ override fun getDetails(path: Path, table: String): TableDetails {
+ return when (table) {
+ TABLE_TASKS -> TableDetails(
+ listOf(
+ TASK_ID,
+ TASK_SUBMIT_TIME,
+ TASK_WAIT_TIME,
+ TASK_RUNTIME,
+ TASK_REQ_NCPUS,
+ TASK_ALLOC_NCPUS,
+ TASK_PARENTS,
+ TASK_STATUS,
+ TASK_GROUP_ID,
+ TASK_USER_ID
+ ),
+ emptyList()
+ )
+ else -> throw IllegalArgumentException("Table $table not supported")
+ }
+ }
+
+ override fun newReader(path: Path, table: String): TableReader {
+ return when (table) {
+ TABLE_TASKS -> SwfTaskTableReader(path.bufferedReader())
+ else -> throw IllegalArgumentException("Table $table not supported")
+ }
+ }
+
+ override fun newWriter(path: Path, table: String): TableWriter {
+ throw UnsupportedOperationException("Writing not supported for this format")
}
}
diff --git a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt
index 828c2bfa..4dcd43f6 100644
--- a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt
@@ -27,61 +27,38 @@ import org.junit.jupiter.api.Assertions.*
import org.opendc.trace.TABLE_TASKS
import org.opendc.trace.TASK_ALLOC_NCPUS
import org.opendc.trace.TASK_ID
-import java.net.URL
+import java.nio.file.Paths
/**
* Test suite for the [SwfTraceFormat] class.
*/
internal class SwfTraceFormatTest {
- @Test
- fun testTraceExists() {
- val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf"))
- val format = SwfTraceFormat()
- assertDoesNotThrow {
- format.open(input)
- }
- }
-
- @Test
- fun testTraceDoesNotExists() {
- val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf"))
- val format = SwfTraceFormat()
- assertThrows<IllegalArgumentException> {
- format.open(URL(input.toString() + "help"))
- }
- }
+ private val format = SwfTraceFormat()
@Test
fun testTables() {
- val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf"))
- val trace = SwfTraceFormat().open(input)
+ val path = Paths.get(checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")).toURI())
- assertEquals(listOf(TABLE_TASKS), trace.tables)
+ assertEquals(listOf(TABLE_TASKS), format.getTables(path))
}
@Test
fun testTableExists() {
- val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf"))
- val table = SwfTraceFormat().open(input).getTable(TABLE_TASKS)
-
- assertNotNull(table)
- assertDoesNotThrow { table!!.newReader() }
+ val path = Paths.get(checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")).toURI())
+ assertDoesNotThrow { format.getDetails(path, TABLE_TASKS) }
}
@Test
fun testTableDoesNotExist() {
- val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf"))
- val trace = SwfTraceFormat().open(input)
+ val path = Paths.get(checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")).toURI())
- assertFalse(trace.containsTable("test"))
- assertNull(trace.getTable("test"))
+ assertThrows<IllegalArgumentException> { format.getDetails(path, "test") }
}
@Test
fun testReader() {
- val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf"))
- val trace = SwfTraceFormat().open(input)
- val reader = trace.getTable(TABLE_TASKS)!!.newReader()
+ val path = Paths.get(checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")).toURI())
+ val reader = format.newReader(path, TABLE_TASKS)
assertAll(
{ assertTrue(reader.nextRow()) },
@@ -94,14 +71,4 @@ internal class SwfTraceFormatTest {
reader.close()
}
-
- @Test
- fun testReaderPartition() {
- val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf"))
- val trace = SwfTraceFormat().open(input)
-
- assertThrows<IllegalArgumentException> {
- trace.getTable(TABLE_TASKS)!!.newReader("test")
- }
- }
}
diff --git a/opendc-trace/opendc-trace-tools/build.gradle.kts b/opendc-trace/opendc-trace-tools/build.gradle.kts
index 35190dba..14a0fc7c 100644
--- a/opendc-trace/opendc-trace-tools/build.gradle.kts
+++ b/opendc-trace/opendc-trace-tools/build.gradle.kts
@@ -29,19 +29,18 @@ plugins {
}
application {
- mainClass.set("org.opendc.trace.tools.TraceConverterKt")
+ mainClass.set("org.opendc.trace.tools.TraceConverter")
}
dependencies {
api(platform(projects.opendcPlatform))
- implementation(projects.opendcTrace.opendcTraceParquet)
- implementation(projects.opendcTrace.opendcTraceOpendc)
- implementation(projects.opendcTrace.opendcTraceAzure)
- implementation(projects.opendcTrace.opendcTraceBitbrains)
-
+ implementation(projects.opendcTrace.opendcTraceApi)
implementation(libs.kotlin.logging)
implementation(libs.clikt)
+ runtimeOnly(projects.opendcTrace.opendcTraceOpendc)
+ runtimeOnly(projects.opendcTrace.opendcTraceBitbrains)
+ runtimeOnly(projects.opendcTrace.opendcTraceAzure)
runtimeOnly(libs.log4j.slf4j)
}
diff --git a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt
index 322464cd..6fad43be 100644
--- a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt
+++ b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt
@@ -20,6 +20,7 @@
* SOFTWARE.
*/
+@file:JvmName("TraceConverter")
package org.opendc.trace.tools
import com.github.ajalt.clikt.core.CliktCommand
@@ -29,28 +30,19 @@ import com.github.ajalt.clikt.parameters.groups.cooccurring
import com.github.ajalt.clikt.parameters.options.*
import com.github.ajalt.clikt.parameters.types.*
import mu.KotlinLogging
-import org.apache.avro.generic.GenericData
-import org.apache.avro.generic.GenericRecordBuilder
-import org.apache.parquet.avro.AvroParquetWriter
-import org.apache.parquet.hadoop.ParquetWriter
-import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.opendc.trace.*
-import org.opendc.trace.azure.AzureTraceFormat
-import org.opendc.trace.bitbrains.BitbrainsExTraceFormat
-import org.opendc.trace.bitbrains.BitbrainsTraceFormat
-import org.opendc.trace.opendc.OdcVmTraceFormat
-import org.opendc.trace.util.parquet.LocalOutputFile
import java.io.File
+import java.time.Duration
+import java.time.Instant
import java.util.*
import kotlin.math.abs
import kotlin.math.max
import kotlin.math.min
-import kotlin.math.roundToLong
/**
* A script to convert a trace in text format into a Parquet trace.
*/
-public fun main(args: Array<String>): Unit = TraceConverterCli().main(args)
+fun main(args: Array<String>): Unit = TraceConverterCli().main(args)
/**
* Represents the command for converting traces
@@ -77,15 +69,16 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
/**
* The input format of the trace.
*/
- private val format by option("-f", "--format", help = "input format of trace")
- .choice(
- "solvinity" to BitbrainsExTraceFormat(),
- "bitbrains" to BitbrainsTraceFormat(),
- "azure" to AzureTraceFormat()
- )
+ private val inputFormat by option("-f", "--input-format", help = "format of output trace")
.required()
/**
+ * The format of the output trace.
+ */
+ private val outputFormat by option("--output-format", help = "format of output trace")
+ .default("opendc-vm")
+
+ /**
* The sampling options.
*/
private val samplingOptions by SamplingOptions().cooccurring()
@@ -101,17 +94,14 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
traceParquet.delete()
}
- val trace = format.open(input.toURI().toURL())
+ val inputTrace = Trace.open(input, format = inputFormat)
+ val outputTrace = Trace.create(output, format = outputFormat)
logger.info { "Building resources table" }
- val metaWriter = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(metaParquet))
- .withSchema(OdcVmTraceFormat.RESOURCES_SCHEMA)
- .withCompressionCodec(CompressionCodecName.ZSTD)
- .enablePageWriteChecksum()
- .build()
+ val metaWriter = outputTrace.getTable(TABLE_RESOURCES)!!.newWriter()
- val selectedVms = metaWriter.use { convertResources(trace, it) }
+ val selectedVms = metaWriter.use { convertResources(inputTrace, it) }
if (selectedVms.isEmpty()) {
logger.warn { "No VMs selected" }
@@ -121,23 +111,16 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
logger.info { "Wrote ${selectedVms.size} rows" }
logger.info { "Building resource states table" }
- val writer = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(traceParquet))
- .withSchema(OdcVmTraceFormat.RESOURCE_STATES_SCHEMA)
- .withCompressionCodec(CompressionCodecName.ZSTD)
- .withDictionaryEncoding("id", true)
- .withBloomFilterEnabled("id", true)
- .withBloomFilterNDV("id", selectedVms.size.toLong())
- .enableValidation()
- .build()
+ val writer = outputTrace.getTable(TABLE_RESOURCE_STATES)!!.newWriter()
- val statesCount = writer.use { convertResourceStates(trace, it, selectedVms) }
+ val statesCount = writer.use { convertResourceStates(inputTrace, it, selectedVms) }
logger.info { "Wrote $statesCount rows" }
}
/**
* Convert the resources table for the trace.
*/
- private fun convertResources(trace: Trace, writer: ParquetWriter<GenericData.Record>): Set<String> {
+ private fun convertResources(trace: Trace, writer: TableWriter): Set<String> {
val random = samplingOptions?.let { Random(it.seed) }
val samplingFraction = samplingOptions?.fraction ?: 1.0
val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader()
@@ -154,39 +137,37 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
var stopTime = Long.MIN_VALUE
do {
- id = reader.get(RESOURCE_STATE_ID)
+ id = reader.get(RESOURCE_ID)
val timestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli()
startTime = min(startTime, timestamp)
stopTime = max(stopTime, timestamp)
- numCpus = max(numCpus, reader.getInt(RESOURCE_STATE_CPU_COUNT))
+ numCpus = max(numCpus, reader.getInt(RESOURCE_CPU_COUNT))
- memCapacity = max(memCapacity, reader.getDouble(RESOURCE_STATE_MEM_CAPACITY))
+ memCapacity = max(memCapacity, reader.getDouble(RESOURCE_MEM_CAPACITY))
if (reader.hasColumn(RESOURCE_STATE_MEM_USAGE)) {
memUsage = max(memUsage, reader.getDouble(RESOURCE_STATE_MEM_USAGE))
}
hasNextRow = reader.nextRow()
- } while (hasNextRow && id == reader.get(RESOURCE_STATE_ID))
+ } while (hasNextRow && id == reader.get(RESOURCE_ID))
// Sample only a fraction of the VMs
if (random != null && random.nextDouble() > samplingFraction) {
continue
}
- val builder = GenericRecordBuilder(OdcVmTraceFormat.RESOURCES_SCHEMA)
-
- builder["id"] = id
- builder["start_time"] = startTime
- builder["stop_time"] = stopTime
- builder["cpu_count"] = numCpus
- builder["mem_capacity"] = max(memCapacity, memUsage).roundToLong()
-
logger.info { "Selecting VM $id" }
-
- writer.write(builder.build())
selectedVms.add(id)
+
+ writer.startRow()
+ writer.set(RESOURCE_ID, id)
+ writer.set(RESOURCE_START_TIME, Instant.ofEpochMilli(startTime))
+ writer.set(RESOURCE_STOP_TIME, Instant.ofEpochMilli(stopTime))
+ writer.setInt(RESOURCE_CPU_COUNT, numCpus)
+ writer.setDouble(RESOURCE_MEM_CAPACITY, max(memCapacity, memUsage))
+ writer.endRow()
}
return selectedVms
@@ -195,7 +176,7 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
/**
* Convert the resource states table for the trace.
*/
- private fun convertResourceStates(trace: Trace, writer: ParquetWriter<GenericData.Record>, selectedVms: Set<String>): Int {
+ private fun convertResourceStates(trace: Trace, writer: TableWriter, selectedVms: Set<String>): Int {
val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader()
var hasNextRow = reader.nextRow()
@@ -204,14 +185,14 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
var lastTimestamp = 0L
while (hasNextRow) {
- val id = reader.get(RESOURCE_STATE_ID)
+ val id = reader.get(RESOURCE_ID)
if (id !in selectedVms) {
hasNextRow = reader.nextRow()
continue
}
- val cpuCount = reader.getInt(RESOURCE_STATE_CPU_COUNT)
+ val cpuCount = reader.getInt(RESOURCE_CPU_COUNT)
val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE)
val startTimestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli()
@@ -233,20 +214,18 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
break
}
- val shouldContinue = id == reader.get(RESOURCE_STATE_ID) &&
+ val shouldContinue = id == reader.get(RESOURCE_ID) &&
abs(cpuUsage - reader.getDouble(RESOURCE_STATE_CPU_USAGE)) < 0.01 &&
- cpuCount == reader.getInt(RESOURCE_STATE_CPU_COUNT)
+ cpuCount == reader.getInt(RESOURCE_CPU_COUNT)
} while (shouldContinue)
- val builder = GenericRecordBuilder(OdcVmTraceFormat.RESOURCE_STATES_SCHEMA)
-
- builder["id"] = id
- builder["timestamp"] = startTimestamp
- builder["duration"] = duration
- builder["cpu_count"] = cpuCount
- builder["cpu_usage"] = cpuUsage
-
- writer.write(builder.build())
+ writer.startRow()
+ writer.set(RESOURCE_ID, id)
+ writer.set(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(startTimestamp))
+ writer.set(RESOURCE_STATE_DURATION, Duration.ofMillis(duration))
+ writer.setInt(RESOURCE_CPU_COUNT, cpuCount)
+ writer.setDouble(RESOURCE_STATE_CPU_USAGE, cpuUsage)
+ writer.endRow()
count++
diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt
deleted file mode 100644
index 7b7f979f..00000000
--- a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.wfformat
-
-import com.fasterxml.jackson.core.JsonFactory
-import org.opendc.trace.*
-import java.nio.file.Path
-
-/**
- * A [Table] containing the tasks in a WfCommons workload trace.
- */
-internal class WfFormatTaskTable(private val factory: JsonFactory, private val path: Path) : Table {
- override val name: String = TABLE_TASKS
-
- override val isSynthetic: Boolean = false
-
- override val columns: List<TableColumn<*>> = listOf(
- TASK_ID,
- TASK_WORKFLOW_ID,
- TASK_RUNTIME,
- TASK_REQ_NCPUS,
- TASK_PARENTS,
- TASK_CHILDREN
- )
-
- override fun newReader(): TableReader {
- val parser = factory.createParser(path.toFile())
- return WfFormatTaskTableReader(parser)
- }
-
- override fun newReader(partition: String): TableReader {
- throw IllegalArgumentException("Invalid partition $partition")
- }
-
- override fun toString(): String = "WfFormatTaskTable"
-}
diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt
index 4408ba5c..7f378d80 100644
--- a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt
+++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt
@@ -94,49 +94,41 @@ internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableRe
return hasJob
}
- override fun hasColumn(column: TableColumn<*>): Boolean {
- return when (column) {
- TASK_ID -> true
- TASK_WORKFLOW_ID -> true
- TASK_RUNTIME -> true
- TASK_REQ_NCPUS -> true
- TASK_PARENTS -> true
- TASK_CHILDREN -> true
- else -> false
- }
+ override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
+
+ override fun isNull(index: Int): Boolean {
+ check(index in 0..columns.size) { "Invalid column value" }
+ return false
}
- override fun <T> get(column: TableColumn<T>): T {
- val res: Any? = when (column) {
- TASK_ID -> id
- TASK_WORKFLOW_ID -> workflowId
- TASK_RUNTIME -> runtime
- TASK_PARENTS -> parents
- TASK_CHILDREN -> children
- TASK_REQ_NCPUS -> getInt(TASK_REQ_NCPUS)
+ override fun get(index: Int): Any? {
+ return when (index) {
+ COL_ID -> id
+ COL_WORKFLOW_ID -> workflowId
+ COL_RUNTIME -> runtime
+ COL_PARENTS -> parents
+ COL_CHILDREN -> children
+ COL_NPROC -> getInt(index)
else -> throw IllegalArgumentException("Invalid column")
}
-
- @Suppress("UNCHECKED_CAST")
- return res as T
}
- override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ override fun getBoolean(index: Int): Boolean {
throw IllegalArgumentException("Invalid column")
}
- override fun getInt(column: TableColumn<Int>): Int {
- return when (column) {
- TASK_REQ_NCPUS -> cores
+ override fun getInt(index: Int): Int {
+ return when (index) {
+ COL_NPROC -> cores
else -> throw IllegalArgumentException("Invalid column")
}
}
- override fun getLong(column: TableColumn<Long>): Long {
+ override fun getLong(index: Int): Long {
throw IllegalArgumentException("Invalid column")
}
- override fun getDouble(column: TableColumn<Double>): Double {
+ override fun getDouble(index: Int): Double {
throw IllegalArgumentException("Invalid column")
}
@@ -231,4 +223,20 @@ internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableRe
children = null
cores = -1
}
+
+ private val COL_ID = 0
+ private val COL_WORKFLOW_ID = 1
+ private val COL_RUNTIME = 3
+ private val COL_NPROC = 4
+ private val COL_PARENTS = 5
+ private val COL_CHILDREN = 6
+
+ private val columns = mapOf(
+ TASK_ID to COL_ID,
+ TASK_WORKFLOW_ID to COL_WORKFLOW_ID,
+ TASK_RUNTIME to COL_RUNTIME,
+ TASK_REQ_NCPUS to COL_NPROC,
+ TASK_PARENTS to COL_PARENTS,
+ TASK_CHILDREN to COL_CHILDREN,
+ )
}
diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTrace.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTrace.kt
deleted file mode 100644
index 2d9c79fb..00000000
--- a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTrace.kt
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.wfformat
-
-import com.fasterxml.jackson.core.JsonFactory
-import org.opendc.trace.TABLE_TASKS
-import org.opendc.trace.Table
-import org.opendc.trace.Trace
-import java.nio.file.Path
-
-/**
- * [Trace] implementation for the WfCommons workload trace format.
- */
-public class WfFormatTrace internal constructor(private val factory: JsonFactory, private val path: Path) : Trace {
- override val tables: List<String> = listOf(TABLE_TASKS)
-
- override fun containsTable(name: String): Boolean = TABLE_TASKS == name
-
- override fun getTable(name: String): Table? {
- return when (name) {
- TABLE_TASKS -> WfFormatTaskTable(factory, path)
- else -> null
- }
- }
-
- override fun toString(): String = "WfFormatTrace[$path]"
-}
diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt
index ff8d054c..c75e3cbb 100644
--- a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt
+++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt
@@ -23,10 +23,10 @@
package org.opendc.trace.wfformat
import com.fasterxml.jackson.core.JsonFactory
+import org.opendc.trace.*
+import org.opendc.trace.spi.TableDetails
import org.opendc.trace.spi.TraceFormat
-import java.net.URL
-import java.nio.file.Paths
-import kotlin.io.path.exists
+import java.nio.file.Path
/**
* A [TraceFormat] implementation for the WfCommons workload trace format.
@@ -39,9 +39,37 @@ public class WfFormatTraceFormat : TraceFormat {
override val name: String = "wfformat"
- override fun open(url: URL): WfFormatTrace {
- val path = Paths.get(url.toURI())
- require(path.exists()) { "URL $url does not exist" }
- return WfFormatTrace(factory, path)
+ override fun create(path: Path) {
+ throw UnsupportedOperationException("Writing not supported for this format")
+ }
+
+ override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS)
+
+ override fun getDetails(path: Path, table: String): TableDetails {
+ return when (table) {
+ TABLE_TASKS -> TableDetails(
+ listOf(
+ TASK_ID,
+ TASK_WORKFLOW_ID,
+ TASK_RUNTIME,
+ TASK_REQ_NCPUS,
+ TASK_PARENTS,
+ TASK_CHILDREN
+ ),
+ emptyList()
+ )
+ else -> throw IllegalArgumentException("Table $table not supported")
+ }
+ }
+
+ override fun newReader(path: Path, table: String): TableReader {
+ return when (table) {
+ TABLE_TASKS -> WfFormatTaskTableReader(factory.createParser(path.toFile()))
+ else -> throw IllegalArgumentException("Table $table not supported")
+ }
+ }
+
+ override fun newWriter(path: Path, table: String): TableWriter {
+ throw UnsupportedOperationException("Writing not supported for this format")
}
}
diff --git a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt
index 0bfc8840..217b175d 100644
--- a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt
@@ -22,59 +22,38 @@
package org.opendc.trace.wfformat
+import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertDoesNotThrow
import org.junit.jupiter.api.assertThrows
import org.opendc.trace.*
-import java.io.File
-import java.net.URL
+import java.nio.file.Paths
/**
* Test suite for the [WfFormatTraceFormat] class.
*/
class WfFormatTraceFormatTest {
- @Test
- fun testTraceExists() {
- val input = File("src/test/resources/trace.json").toURI().toURL()
- val format = WfFormatTraceFormat()
- assertDoesNotThrow { format.open(input) }
- }
-
- @Test
- fun testTraceDoesNotExists() {
- val input = File("src/test/resources/trace.json").toURI().toURL()
- val format = WfFormatTraceFormat()
- assertThrows<IllegalArgumentException> { format.open(URL(input.toString() + "help")) }
- }
+ private val format = WfFormatTraceFormat()
@Test
fun testTables() {
- val input = File("src/test/resources/trace.json").toURI().toURL()
- val format = WfFormatTraceFormat()
- val trace = format.open(input)
+ val path = Paths.get("src/test/resources/trace.json")
- assertEquals(listOf(TABLE_TASKS), trace.tables)
+ assertEquals(listOf(TABLE_TASKS), format.getTables(path))
}
@Test
fun testTableExists() {
- val input = File("src/test/resources/trace.json").toURI().toURL()
- val format = WfFormatTraceFormat()
- val table = format.open(input).getTable(TABLE_TASKS)
-
- assertNotNull(table)
- assertDoesNotThrow { table!!.newReader() }
+ val path = Paths.get("src/test/resources/trace.json")
+ Assertions.assertDoesNotThrow { format.getDetails(path, TABLE_TASKS) }
}
@Test
fun testTableDoesNotExist() {
- val input = File("src/test/resources/trace.json").toURI().toURL()
- val format = WfFormatTraceFormat()
- val trace = format.open(input)
+ val path = Paths.get("src/test/resources/trace.json")
- assertFalse(trace.containsTable("test"))
- assertNull(trace.getTable("test"))
+ assertThrows<IllegalArgumentException> { format.getDetails(path, "test") }
}
/**
@@ -82,9 +61,8 @@ class WfFormatTraceFormatTest {
*/
@Test
fun testTableReader() {
- val input = File("src/test/resources/trace.json").toURI().toURL()
- val trace = WfFormatTraceFormat().open(input)
- val reader = trace.getTable(TABLE_TASKS)!!.newReader()
+ val path = Paths.get("src/test/resources/trace.json")
+ val reader = format.newReader(path, TABLE_TASKS)
assertAll(
{ assertTrue(reader.nextRow()) },
@@ -110,9 +88,8 @@ class WfFormatTraceFormatTest {
*/
@Test
fun testTableReaderFull() {
- val input = File("src/test/resources/trace.json").toURI().toURL()
- val trace = WfFormatTraceFormat().open(input)
- val reader = trace.getTable(TABLE_TASKS)!!.newReader()
+ val path = Paths.get("src/test/resources/trace.json")
+ val reader = format.newReader(path, TABLE_TASKS)
assertDoesNotThrow {
while (reader.nextRow()) {
@@ -121,13 +98,4 @@ class WfFormatTraceFormatTest {
reader.close()
}
}
-
- @Test
- fun testTableReaderPartition() {
- val input = File("src/test/resources/trace.json").toURI().toURL()
- val format = WfFormatTraceFormat()
- val table = format.open(input).getTable(TABLE_TASKS)!!
-
- assertThrows<IllegalArgumentException> { table.newReader("test") }
- }
}
diff --git a/opendc-trace/opendc-trace-wtf/build.gradle.kts b/opendc-trace/opendc-trace-wtf/build.gradle.kts
index 5051c7b0..e4f0ab3a 100644
--- a/opendc-trace/opendc-trace-wtf/build.gradle.kts
+++ b/opendc-trace/opendc-trace-wtf/build.gradle.kts
@@ -34,4 +34,6 @@ dependencies {
api(projects.opendcTrace.opendcTraceApi)
implementation(projects.opendcTrace.opendcTraceParquet)
+
+ testRuntimeOnly(libs.slf4j.simple)
}
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt
deleted file mode 100644
index 74202718..00000000
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.wtf
-
-import org.apache.avro.generic.GenericRecord
-import org.opendc.trace.*
-import org.opendc.trace.util.parquet.LocalParquetReader
-import java.nio.file.Path
-
-/**
- * A [Table] containing the tasks in a GWF trace.
- */
-internal class WtfTaskTable(private val path: Path) : Table {
- override val name: String = TABLE_TASKS
-
- override val isSynthetic: Boolean = false
-
- override val columns: List<TableColumn<*>> = listOf(
- TASK_ID,
- TASK_WORKFLOW_ID,
- TASK_SUBMIT_TIME,
- TASK_WAIT_TIME,
- TASK_RUNTIME,
- TASK_REQ_NCPUS,
- TASK_PARENTS,
- TASK_CHILDREN,
- TASK_GROUP_ID,
- TASK_USER_ID
- )
-
- override fun newReader(): TableReader {
- val reader = LocalParquetReader<GenericRecord>(path.resolve("tasks/schema-1.0"))
- return WtfTaskTableReader(reader)
- }
-
- override fun newReader(partition: String): TableReader {
- throw IllegalArgumentException("Invalid partition $partition")
- }
-
- override fun toString(): String = "WtfTaskTable"
-}
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt
index 5e2463f8..45ec25dd 100644
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt
@@ -22,6 +22,7 @@
package org.opendc.trace.wtf
+import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.opendc.trace.*
import org.opendc.trace.util.parquet.LocalParquetReader
@@ -37,73 +38,126 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Generic
*/
private var record: GenericRecord? = null
+ /**
+ * A flag to indicate that the columns have been initialized.
+ */
+ private var hasInitializedColumns = false
+
override fun nextRow(): Boolean {
- record = reader.read()
+ val record = reader.read()
+ this.record = record
+
+ if (!hasInitializedColumns && record != null) {
+ initColumns(record.schema)
+ hasInitializedColumns = true
+ }
+
return record != null
}
- override fun hasColumn(column: TableColumn<*>): Boolean {
- return when (column) {
- TASK_ID -> true
- TASK_WORKFLOW_ID -> true
- TASK_SUBMIT_TIME -> true
- TASK_WAIT_TIME -> true
- TASK_RUNTIME -> true
- TASK_REQ_NCPUS -> true
- TASK_PARENTS -> true
- TASK_CHILDREN -> true
- TASK_GROUP_ID -> true
- TASK_USER_ID -> true
- else -> false
- }
+ override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
+
+ override fun isNull(index: Int): Boolean {
+ check(index in 0..columns.size) { "Invalid column index" }
+ return get(index) == null
}
- override fun <T> get(column: TableColumn<T>): T {
+ override fun get(index: Int): Any? {
val record = checkNotNull(record) { "Reader in invalid state" }
-
@Suppress("UNCHECKED_CAST")
- val res: Any = when (column) {
- TASK_ID -> (record["id"] as Long).toString()
- TASK_WORKFLOW_ID -> (record["workflow_id"] as Long).toString()
- TASK_SUBMIT_TIME -> Instant.ofEpochMilli(record["ts_submit"] as Long)
- TASK_WAIT_TIME -> Duration.ofMillis(record["wait_time"] as Long)
- TASK_RUNTIME -> Duration.ofMillis(record["runtime"] as Long)
- TASK_REQ_NCPUS -> (record["resource_amount_requested"] as Double).toInt()
- TASK_PARENTS -> (record["parents"] as ArrayList<GenericRecord>).map { it["item"].toString() }.toSet()
- TASK_CHILDREN -> (record["children"] as ArrayList<GenericRecord>).map { it["item"].toString() }.toSet()
- TASK_GROUP_ID -> record["group_id"]
- TASK_USER_ID -> record["user_id"]
+ return when (index) {
+ COL_ID -> (record[AVRO_COL_ID] as Long).toString()
+ COL_WORKFLOW_ID -> (record[AVRO_COL_WORKFLOW_ID] as Long).toString()
+ COL_SUBMIT_TIME -> Instant.ofEpochMilli(record[AVRO_COL_SUBMIT_TIME] as Long)
+ COL_WAIT_TIME -> Duration.ofMillis(record[AVRO_COL_WAIT_TIME] as Long)
+ COL_RUNTIME -> Duration.ofMillis(record[AVRO_COL_RUNTIME] as Long)
+ COL_REQ_NCPUS, COL_GROUP_ID, COL_USER_ID -> getInt(index)
+ COL_PARENTS -> (record[AVRO_COL_PARENTS] as ArrayList<GenericRecord>).map { it["item"].toString() }.toSet()
+ COL_CHILDREN -> (record[AVRO_COL_CHILDREN] as ArrayList<GenericRecord>).map { it["item"].toString() }.toSet()
else -> throw IllegalArgumentException("Invalid column")
}
-
- @Suppress("UNCHECKED_CAST")
- return res as T
}
- override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ override fun getBoolean(index: Int): Boolean {
throw IllegalArgumentException("Invalid column")
}
- override fun getInt(column: TableColumn<Int>): Int {
+ override fun getInt(index: Int): Int {
val record = checkNotNull(record) { "Reader in invalid state" }
- return when (column) {
- TASK_REQ_NCPUS -> (record["resource_amount_requested"] as Double).toInt()
- TASK_GROUP_ID -> record["group_id"] as Int
- TASK_USER_ID -> record["user_id"] as Int
+ return when (index) {
+ COL_REQ_NCPUS -> (record[AVRO_COL_REQ_NCPUS] as Double).toInt()
+ COL_GROUP_ID -> record[AVRO_COL_GROUP_ID] as Int
+ COL_USER_ID -> record[AVRO_COL_USER_ID] as Int
else -> throw IllegalArgumentException("Invalid column")
}
}
- override fun getLong(column: TableColumn<Long>): Long {
+ override fun getLong(index: Int): Long {
throw IllegalArgumentException("Invalid column")
}
- override fun getDouble(column: TableColumn<Double>): Double {
+ override fun getDouble(index: Int): Double {
throw IllegalArgumentException("Invalid column")
}
override fun close() {
reader.close()
}
+
+ /**
+ * Initialize the columns for the reader based on [schema].
+ */
+ private fun initColumns(schema: Schema) {
+ try {
+ AVRO_COL_ID = schema.getField("id").pos()
+ AVRO_COL_WORKFLOW_ID = schema.getField("workflow_id").pos()
+ AVRO_COL_SUBMIT_TIME = schema.getField("ts_submit").pos()
+ AVRO_COL_WAIT_TIME = schema.getField("wait_time").pos()
+ AVRO_COL_RUNTIME = schema.getField("runtime").pos()
+ AVRO_COL_REQ_NCPUS = schema.getField("resource_amount_requested").pos()
+ AVRO_COL_PARENTS = schema.getField("parents").pos()
+ AVRO_COL_CHILDREN = schema.getField("children").pos()
+ AVRO_COL_GROUP_ID = schema.getField("group_id").pos()
+ AVRO_COL_USER_ID = schema.getField("user_id").pos()
+ } catch (e: NullPointerException) {
+ // This happens when the field we are trying to access does not exist
+ throw IllegalArgumentException("Invalid schema", e)
+ }
+ }
+
+ private var AVRO_COL_ID = -1
+ private var AVRO_COL_WORKFLOW_ID = -1
+ private var AVRO_COL_SUBMIT_TIME = -1
+ private var AVRO_COL_WAIT_TIME = -1
+ private var AVRO_COL_RUNTIME = -1
+ private var AVRO_COL_REQ_NCPUS = -1
+ private var AVRO_COL_PARENTS = -1
+ private var AVRO_COL_CHILDREN = -1
+ private var AVRO_COL_GROUP_ID = -1
+ private var AVRO_COL_USER_ID = -1
+
+ private val COL_ID = 0
+ private val COL_WORKFLOW_ID = 1
+ private val COL_SUBMIT_TIME = 2
+ private val COL_WAIT_TIME = 3
+ private val COL_RUNTIME = 4
+ private val COL_REQ_NCPUS = 5
+ private val COL_PARENTS = 6
+ private val COL_CHILDREN = 7
+ private val COL_GROUP_ID = 8
+ private val COL_USER_ID = 9
+
+ private val columns = mapOf(
+ TASK_ID to COL_ID,
+ TASK_WORKFLOW_ID to COL_WORKFLOW_ID,
+ TASK_SUBMIT_TIME to COL_SUBMIT_TIME,
+ TASK_WAIT_TIME to COL_WAIT_TIME,
+ TASK_RUNTIME to COL_RUNTIME,
+ TASK_REQ_NCPUS to COL_REQ_NCPUS,
+ TASK_PARENTS to COL_PARENTS,
+ TASK_CHILDREN to COL_CHILDREN,
+ TASK_GROUP_ID to COL_GROUP_ID,
+ TASK_USER_ID to COL_USER_ID,
+ )
}
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt
index 781cb335..ef88d295 100644
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt
@@ -22,10 +22,12 @@
package org.opendc.trace.wtf
+import org.apache.avro.generic.GenericRecord
+import org.opendc.trace.*
+import org.opendc.trace.spi.TableDetails
import org.opendc.trace.spi.TraceFormat
-import java.net.URL
-import java.nio.file.Paths
-import kotlin.io.path.exists
+import org.opendc.trace.util.parquet.LocalParquetReader
+import java.nio.file.Path
/**
* A [TraceFormat] implementation for the Workflow Trace Format (WTF).
@@ -33,9 +35,44 @@ import kotlin.io.path.exists
public class WtfTraceFormat : TraceFormat {
override val name: String = "wtf"
- override fun open(url: URL): WtfTrace {
- val path = Paths.get(url.toURI())
- require(path.exists()) { "URL $url does not exist" }
- return WtfTrace(path)
+ override fun create(path: Path) {
+ throw UnsupportedOperationException("Writing not supported for this format")
+ }
+
+ override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS)
+
+ override fun getDetails(path: Path, table: String): TableDetails {
+ return when (table) {
+ TABLE_TASKS -> TableDetails(
+ listOf(
+ TASK_ID,
+ TASK_WORKFLOW_ID,
+ TASK_SUBMIT_TIME,
+ TASK_WAIT_TIME,
+ TASK_RUNTIME,
+ TASK_REQ_NCPUS,
+ TASK_PARENTS,
+ TASK_CHILDREN,
+ TASK_GROUP_ID,
+ TASK_USER_ID
+ ),
+ listOf(TASK_SUBMIT_TIME)
+ )
+ else -> throw IllegalArgumentException("Table $table not supported")
+ }
+ }
+
+ override fun newReader(path: Path, table: String): TableReader {
+ return when (table) {
+ TABLE_TASKS -> {
+ val reader = LocalParquetReader<GenericRecord>(path.resolve("tasks/schema-1.0"))
+ WtfTaskTableReader(reader)
+ }
+ else -> throw IllegalArgumentException("Table $table not supported")
+ }
+ }
+
+ override fun newWriter(path: Path, table: String): TableWriter {
+ throw UnsupportedOperationException("Writing not supported for this format")
}
}
diff --git a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt
index b155f265..09c3703a 100644
--- a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt
@@ -26,8 +26,7 @@ import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import org.opendc.trace.*
-import java.io.File
-import java.net.URL
+import java.nio.file.Paths
import java.time.Duration
import java.time.Instant
@@ -35,51 +34,25 @@ import java.time.Instant
* Test suite for the [WtfTraceFormat] class.
*/
class WtfTraceFormatTest {
- @Test
- fun testTraceExists() {
- val input = File("src/test/resources/wtf-trace").toURI().toURL()
- val format = WtfTraceFormat()
- org.junit.jupiter.api.assertDoesNotThrow {
- format.open(input)
- }
- }
-
- @Test
- fun testTraceDoesNotExists() {
- val input = File("src/test/resources/wtf-trace").toURI().toURL()
- val format = WtfTraceFormat()
- assertThrows<IllegalArgumentException> {
- format.open(URL(input.toString() + "help"))
- }
- }
+ private val format = WtfTraceFormat()
@Test
fun testTables() {
- val input = File("src/test/resources/wtf-trace").toURI().toURL()
- val format = WtfTraceFormat()
- val trace = format.open(input)
-
- assertEquals(listOf(TABLE_TASKS), trace.tables)
+ val path = Paths.get("src/test/resources/wtf-trace")
+ assertEquals(listOf(TABLE_TASKS), format.getTables(path))
}
@Test
fun testTableExists() {
- val input = File("src/test/resources/wtf-trace").toURI().toURL()
- val format = WtfTraceFormat()
- val table = format.open(input).getTable(TABLE_TASKS)
-
- assertNotNull(table)
- org.junit.jupiter.api.assertDoesNotThrow { table!!.newReader() }
+ val path = Paths.get("src/test/resources/wtf-trace")
+ assertDoesNotThrow { format.getDetails(path, TABLE_TASKS) }
}
@Test
fun testTableDoesNotExist() {
- val input = File("src/test/resources/wtf-trace").toURI().toURL()
- val format = WtfTraceFormat()
- val trace = format.open(input)
+ val path = Paths.get("src/test/resources/wtf-trace")
- assertFalse(trace.containsTable("test"))
- assertNull(trace.getTable("test"))
+ assertThrows<IllegalArgumentException> { format.getDetails(path, "test") }
}
/**
@@ -87,9 +60,8 @@ class WtfTraceFormatTest {
*/
@Test
fun testTableReader() {
- val input = File("src/test/resources/wtf-trace")
- val trace = WtfTraceFormat().open(input.toURI().toURL())
- val reader = trace.getTable(TABLE_TASKS)!!.newReader()
+ val path = Paths.get("src/test/resources/wtf-trace")
+ val reader = format.newReader(path, TABLE_TASKS)
assertAll(
{ assertTrue(reader.nextRow()) },
@@ -111,13 +83,4 @@ class WtfTraceFormatTest {
reader.close()
}
-
- @Test
- fun testTableReaderPartition() {
- val input = File("src/test/resources/wtf-trace").toURI().toURL()
- val format = WtfTraceFormat()
- val table = format.open(input).getTable(TABLE_TASKS)!!
-
- assertThrows<IllegalArgumentException> { table.newReader("test") }
- }
}