summaryrefslogtreecommitdiff
path: root/opendc-trace
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-21 11:34:34 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-21 11:34:34 +0200
commit68ef3700ed2f69bcf0118bb69eda71e6b1f4d54f (patch)
tree73201888564accde4cfa107f4ffdb15e9f93d45c /opendc-trace
parentc7fff03408ee3109d0a39a96c043584a2d8f67ca (diff)
feat(trace): Add support for writing traces
This change adds a new API for writing traces in a trace format. Currently, writing is only supported by the OpenDC VM format, but over time the other formats will also have support for writing added.
Diffstat (limited to 'opendc-trace')
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt7
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt151
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt30
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt3
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt21
-rw-r--r--opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt8
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt8
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt8
-rw-r--r--opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt8
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt123
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt106
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt43
-rw-r--r--opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt8
-rw-r--r--opendc-trace/opendc-trace-tools/build.gradle.kts11
-rw-r--r--opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt82
-rw-r--r--opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt8
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt8
17 files changed, 576 insertions, 57 deletions
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt
index 031ee269..b0181cbc 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt
@@ -45,4 +45,11 @@ public interface Table {
* Open a [TableReader] for this table.
*/
public fun newReader(): TableReader
+
+ /**
+ * Open a [TableWriter] for this table.
+ *
+ * @throws UnsupportedOperationException if writing is not supported by the table.
+ */
+ public fun newWriter(): TableWriter
}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt
new file mode 100644
index 00000000..423ce86a
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt
@@ -0,0 +1,151 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace
+
+/**
+ * Base class for writing workload traces.
+ */
+public interface TableWriter : AutoCloseable {
+ /**
+ * Start a new row in the table.
+ */
+ public fun startRow()
+
+ /**
+ * Flush the current row to the table.
+ */
+ public fun endRow()
+
+ /**
+ * Resolve the index of the specified [column] for this writer.
+ *
+ * @param column The column to lookup.
+ * @return The zero-based index of the column or a negative value if the column is not present in this table.
+ */
+ public fun resolve(column: TableColumn<*>): Int
+
+ /**
+ * Determine whether the [TableReader] supports the specified [column].
+ */
+ public fun hasColumn(column: TableColumn<*>): Boolean = resolve(column) >= 0
+
+ /**
+ * Set [column] to [value].
+ *
+ * @param index The zero-based index of the column to set the value for.
+ * @param value The value to set the column to.
+ * @throws IllegalArgumentException if the column is not valid for this method.
+ */
+ public fun set(index: Int, value: Any)
+
+ /**
+ * Set [column] to boolean [value].
+ *
+ * @param index The zero-based index of the column to set the value for.
+ * @param value The boolean value to set the column to.
+ * @throws IllegalArgumentException if the column is not valid for this method.
+ */
+ public fun setBoolean(index: Int, value: Boolean)
+
+ /**
+ * Set [column] to integer [value].
+ *
+ * @param index The zero-based index of the column to set the value for.
+ * @param value The integer value to set the column to.
+ * @throws IllegalArgumentException if the column is not valid for this method.
+ */
+ public fun setInt(index: Int, value: Int)
+
+ /**
+ * Set [column] to long [value].
+ *
+ * @param index The zero-based index of the column to set the value for.
+ * @param value The long value to set the column to.
+ * @throws IllegalArgumentException if the column is not valid for this method.
+ */
+ public fun setLong(index: Int, value: Long)
+
+ /**
+ * Set [column] to double [value].
+ *
+ * @param index The zero-based index of the column to set the value for.
+ * @param value The double value to set the column to.
+ * @throws IllegalArgumentException if the column is not valid for this method.
+ */
+ public fun setDouble(index: Int, value: Double)
+
+ /**
+ * Set [column] to [value].
+ *
+ * @param column The column to set the value for.
+ * @param value The value to set the column to.
+ * @throws IllegalArgumentException if the column is not valid for this method.
+ */
+ public fun <T : Any> set(column: TableColumn<T>, value: T): Unit = set(resolve(column), value)
+
+ /**
+ * Set [column] to boolean [value].
+ *
+ * @param column The column to set the value for.
+ * @param value The boolean value to set the column to.
+ * @throws IllegalArgumentException if the column is not valid for this method.
+ */
+ public fun setBoolean(column: TableColumn<Boolean>, value: Boolean): Unit = setBoolean(resolve(column), value)
+
+ /**
+ * Set [column] to integer [value].
+ *
+ * @param column The column to set the value for.
+ * @param value The integer value to set the column to.
+ * @throws IllegalArgumentException if the column is not valid for this method.
+ */
+ public fun setInt(column: TableColumn<Int>, value: Int): Unit = setInt(resolve(column), value)
+
+ /**
+ * Set [column] to long [value].
+ *
+ * @param column The column to set the value for.
+ * @param value The long value to set the column to.
+ * @throws IllegalArgumentException if the column is not valid for this method.
+ */
+ public fun setLong(column: TableColumn<Long>, value: Long): Unit = setLong(resolve(column), value)
+
+ /**
+ * Set [column] to double [value].
+ *
+ * @param column The column to set the value for.
+ * @param value The double value to set the column to.
+ * @throws IllegalArgumentException if the column is not valid for this method.
+ */
+ public fun setDouble(column: TableColumn<Double>, value: Double): Unit = setDouble(resolve(column), value)
+
+ /**
+ * Flush any buffered content to the underlying target.
+ */
+ public fun flush()
+
+ /**
+ * Close the writer so that no more rows can be written.
+ */
+ public override fun close()
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt
index 6d0014cb..64e8f272 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt
@@ -51,21 +51,45 @@ public interface Trace {
* Open a [Trace] at the specified [path] in the given [format].
*
* @param path The path to the trace.
+ * @param format The format of the trace to open.
* @throws IllegalArgumentException if [format] is not supported.
*/
- public fun open(path: File, format: String): Trace {
- return open(path.toPath(), format)
- }
+ @JvmStatic
+ public fun open(path: File, format: String): Trace = open(path.toPath(), format)
/**
* Open a [Trace] at the specified [path] in the given [format].
*
* @param path The [Path] to the trace.
+ * @param format The format of the trace to open.
* @throws IllegalArgumentException if [format] is not supported.
*/
+ @JvmStatic
public fun open(path: Path, format: String): Trace {
val provider = requireNotNull(TraceFormat.byName(format)) { "Unknown format $format" }
return TraceImpl(provider, path)
}
+
+ /**
+ * Create a [Trace] at the specified [path] in the given [format].
+ *
+ * @param path The [Path] to the trace.
+ * @param format The format of the trace to create.
+ */
+ @JvmStatic
+ public fun create(path: File, format: String): Trace = create(path.toPath(), format)
+
+ /**
+ * Create a [Trace] at the specified [path] in the given [format].
+ *
+ * @param path The [Path] to the trace.
+ * @param format The format of the trace to create.
+ */
+ @JvmStatic
+ public fun create(path: Path, format: String): Trace {
+ val provider = requireNotNull(TraceFormat.byName(format)) { "Unknown format $format" }
+ provider.create(path)
+ return TraceImpl(provider, path)
+ }
}
}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt
index fd0a0f04..24551edb 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt
@@ -25,6 +25,7 @@ package org.opendc.trace.internal
import org.opendc.trace.Table
import org.opendc.trace.TableColumn
import org.opendc.trace.TableReader
+import org.opendc.trace.TableWriter
import java.util.*
/**
@@ -44,6 +45,8 @@ internal class TableImpl(val trace: TraceImpl, override val name: String) : Tabl
override fun newReader(): TableReader = trace.format.newReader(trace.path, name)
+ override fun newWriter(): TableWriter = trace.format.newWriter(trace.path, name)
+
override fun toString(): String = "Table[name=$name]"
override fun hashCode(): Int = Objects.hash(trace, name)
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt
index e04dd948..f2e610db 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt
@@ -23,6 +23,7 @@
package org.opendc.trace.spi
import org.opendc.trace.TableReader
+import org.opendc.trace.TableWriter
import java.nio.file.Path
import java.util.*
@@ -36,6 +37,15 @@ public interface TraceFormat {
public val name: String
/**
+ * Construct an empty trace at [path].
+ *
+ * @param path The path where to create the empty trace.
+ * @throws IllegalArgumentException If [path] is invalid.
+ * @throws UnsupportedOperationException If the table does not support trace creation.
+ */
+ public fun create(path: Path)
+
+ /**
* Return the name of the tables available in the trace at the specified [path].
*
* @param path The path to the trace.
@@ -64,6 +74,17 @@ public interface TraceFormat {
public fun newReader(path: Path, table: String): TableReader
/**
+ * Open a [TableWriter] for the specified [table].
+ *
+ * @param path The path to the trace to open.
+ * @param table The name of the table to open a [TableWriter] for.
+ * @throws IllegalArgumentException If [table] does not exist.
+ * @throws UnsupportedOperationException If the format does not support writing.
+ * @return A [TableWriter] instance for the table.
+ */
+ public fun newWriter(path: Path, table: String): TableWriter
+
+ /**
* A helper object for resolving providers.
*/
public companion object {
diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt
index 77af0d81..253c7057 100644
--- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt
+++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt
@@ -50,6 +50,10 @@ public class AzureTraceFormat : TraceFormat {
.enable(CsvParser.Feature.ALLOW_COMMENTS)
.enable(CsvParser.Feature.TRIM_SPACES)
+ override fun create(path: Path) {
+ throw UnsupportedOperationException("Writing not supported for this format")
+ }
+
override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES)
override fun getDetails(path: Path, table: String): TableDetails {
@@ -83,6 +87,10 @@ public class AzureTraceFormat : TraceFormat {
}
}
+ override fun newWriter(path: Path, table: String): TableWriter {
+ throw UnsupportedOperationException("Writing not supported for this format")
+ }
+
/**
* Construct a [TableReader] for reading over all VM CPU readings.
*/
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt
index 080b73de..20222c8a 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt
@@ -42,6 +42,10 @@ public class BitbrainsExTraceFormat : TraceFormat {
*/
override val name: String = "bitbrains-ex"
+ override fun create(path: Path) {
+ throw UnsupportedOperationException("Writing not supported for this format")
+ }
+
override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCE_STATES)
override fun getDetails(path: Path, table: String): TableDetails {
@@ -74,6 +78,10 @@ public class BitbrainsExTraceFormat : TraceFormat {
}
}
+ override fun newWriter(path: Path, table: String): TableWriter {
+ throw UnsupportedOperationException("Writing not supported for this format")
+ }
+
/**
* Construct a [TableReader] for reading over all resource state partitions.
*/
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt
index 1573726f..3885c931 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt
@@ -50,6 +50,10 @@ public class BitbrainsTraceFormat : TraceFormat {
.enable(CsvParser.Feature.ALLOW_COMMENTS)
.enable(CsvParser.Feature.TRIM_SPACES)
+ override fun create(path: Path) {
+ throw UnsupportedOperationException("Writing not supported for this format")
+ }
+
override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES)
override fun getDetails(path: Path, table: String): TableDetails {
@@ -90,6 +94,10 @@ public class BitbrainsTraceFormat : TraceFormat {
}
}
+ override fun newWriter(path: Path, table: String): TableWriter {
+ throw UnsupportedOperationException("Writing not supported for this format")
+ }
+
/**
* Construct a [TableReader] for reading over all resource state partitions.
*/
diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt
index 0f7b9d6e..d4287420 100644
--- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt
+++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt
@@ -45,6 +45,10 @@ public class GwfTraceFormat : TraceFormat {
.enable(CsvParser.Feature.ALLOW_COMMENTS)
.enable(CsvParser.Feature.TRIM_SPACES)
+ override fun create(path: Path) {
+ throw UnsupportedOperationException("Writing not supported for this format")
+ }
+
override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS)
override fun getDetails(path: Path, table: String): TableDetails {
@@ -71,4 +75,8 @@ public class GwfTraceFormat : TraceFormat {
else -> throw IllegalArgumentException("Table $table not supported")
}
}
+
+ override fun newWriter(path: Path, table: String): TableWriter {
+ throw UnsupportedOperationException("Writing not supported for this format")
+ }
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt
new file mode 100644
index 00000000..15a8cb85
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt
@@ -0,0 +1,123 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.opendc
+
+import org.apache.avro.Schema
+import org.apache.avro.generic.GenericRecord
+import org.apache.avro.generic.GenericRecordBuilder
+import org.apache.parquet.hadoop.ParquetWriter
+import org.opendc.trace.*
+import java.time.Duration
+import java.time.Instant
+
+/**
+ * A [TableWriter] implementation for the OpenDC virtual machine trace format.
+ */
+internal class OdcVmResourceStateTableWriter(
+ private val writer: ParquetWriter<GenericRecord>,
+ private val schema: Schema
+) : TableWriter {
+ /**
+ * The current builder for the record that is being written.
+ */
+ private var builder: GenericRecordBuilder? = null
+
+ /**
+ * The fields belonging to the resource state schema.
+ */
+ private val fields = schema.fields
+
+ override fun startRow() {
+ builder = GenericRecordBuilder(schema)
+ }
+
+ override fun endRow() {
+ val builder = checkNotNull(builder) { "No active row" }
+ this.builder = null
+
+ val record = builder.build()
+ val id = record[COL_ID] as String
+ val timestamp = record[COL_TIMESTAMP] as Long
+
+ check(lastId != id || timestamp >= lastTimestamp) { "Records need to be ordered by (id, timestamp)" }
+
+ writer.write(builder.build())
+
+ lastId = id
+ lastTimestamp = timestamp
+ }
+
+ override fun resolve(column: TableColumn<*>): Int {
+ val schema = schema
+ return when (column) {
+ RESOURCE_ID -> schema.getField("id").pos()
+ RESOURCE_STATE_TIMESTAMP -> (schema.getField("timestamp") ?: schema.getField("time")).pos()
+ RESOURCE_STATE_DURATION -> schema.getField("duration").pos()
+ RESOURCE_CPU_COUNT -> (schema.getField("cpu_count") ?: schema.getField("cores")).pos()
+ RESOURCE_STATE_CPU_USAGE -> (schema.getField("cpu_usage") ?: schema.getField("cpuUsage")).pos()
+ else -> -1
+ }
+ }
+
+ override fun set(index: Int, value: Any) {
+ val builder = checkNotNull(builder) { "No active row" }
+
+ builder.set(
+ fields[index],
+ when (index) {
+ COL_TIMESTAMP -> (value as Instant).toEpochMilli()
+ COL_DURATION -> (value as Duration).toMillis()
+ else -> value
+ }
+ )
+ }
+
+ override fun setBoolean(index: Int, value: Boolean) = set(index, value)
+
+ override fun setInt(index: Int, value: Int) = set(index, value)
+
+ override fun setLong(index: Int, value: Long) = set(index, value)
+
+ override fun setDouble(index: Int, value: Double) = set(index, value)
+
+ override fun flush() {
+ // Not available
+ }
+
+ override fun close() {
+ writer.close()
+ }
+
+ /**
+ * Last column values that are used to check for correct partitioning.
+ */
+ private var lastId: String? = null
+ private var lastTimestamp: Long = Long.MIN_VALUE
+
+ /**
+ * Columns with special behavior.
+ */
+ private val COL_ID = resolve(RESOURCE_ID)
+ private val COL_TIMESTAMP = resolve(RESOURCE_STATE_TIMESTAMP)
+ private val COL_DURATION = resolve(RESOURCE_STATE_DURATION)
+}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt
new file mode 100644
index 00000000..9cc6ca7d
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt
@@ -0,0 +1,106 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.opendc
+
+import org.apache.avro.Schema
+import org.apache.avro.generic.GenericRecord
+import org.apache.avro.generic.GenericRecordBuilder
+import org.apache.parquet.hadoop.ParquetWriter
+import org.opendc.trace.*
+import java.time.Instant
+import kotlin.math.roundToLong
+
+/**
+ * A [TableWriter] implementation for the OpenDC virtual machine trace format.
+ */
+internal class OdcVmResourceTableWriter(
+ private val writer: ParquetWriter<GenericRecord>,
+ private val schema: Schema
+) : TableWriter {
+ /**
+ * The current builder for the record that is being written.
+ */
+ private var builder: GenericRecordBuilder? = null
+
+ /**
+ * The fields belonging to the resource schema.
+ */
+ private val fields = schema.fields
+
+ override fun startRow() {
+ builder = GenericRecordBuilder(schema)
+ }
+
+ override fun endRow() {
+ val builder = checkNotNull(builder) { "No active row" }
+ this.builder = null
+ writer.write(builder.build())
+ }
+
+ override fun resolve(column: TableColumn<*>): Int {
+ val schema = schema
+ return when (column) {
+ RESOURCE_ID -> schema.getField("id").pos()
+ RESOURCE_START_TIME -> (schema.getField("start_time") ?: schema.getField("submissionTime")).pos()
+ RESOURCE_STOP_TIME -> (schema.getField("stop_time") ?: schema.getField("endTime")).pos()
+ RESOURCE_CPU_COUNT -> (schema.getField("cpu_count") ?: schema.getField("maxCores")).pos()
+ RESOURCE_MEM_CAPACITY -> (schema.getField("mem_capacity") ?: schema.getField("requiredMemory")).pos()
+ else -> -1
+ }
+ }
+
+ override fun set(index: Int, value: Any) {
+ val builder = checkNotNull(builder) { "No active row" }
+ builder.set(
+ fields[index],
+ when (index) {
+ COL_START_TIME, COL_STOP_TIME -> (value as Instant).toEpochMilli()
+ COL_MEM_CAPACITY -> (value as Double).roundToLong()
+ else -> value
+ }
+ )
+ }
+
+ override fun setBoolean(index: Int, value: Boolean) = set(index, value)
+
+ override fun setInt(index: Int, value: Int) = set(index, value)
+
+ override fun setLong(index: Int, value: Long) = set(index, value)
+
+ override fun setDouble(index: Int, value: Double) = set(index, value)
+
+ override fun flush() {
+ // Not available
+ }
+
+ override fun close() {
+ writer.close()
+ }
+
+ /**
+ * Columns with special behavior.
+ */
+ private val COL_START_TIME = resolve(RESOURCE_START_TIME)
+ private val COL_STOP_TIME = resolve(RESOURCE_STOP_TIME)
+ private val COL_MEM_CAPACITY = resolve(RESOURCE_MEM_CAPACITY)
+}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
index 29818147..9b32f8fd 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
@@ -25,11 +25,16 @@ package org.opendc.trace.opendc
import org.apache.avro.Schema
import org.apache.avro.SchemaBuilder
import org.apache.avro.generic.GenericRecord
+import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.parquet.hadoop.ParquetFileWriter
+import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.opendc.trace.*
import org.opendc.trace.spi.TableDetails
import org.opendc.trace.spi.TraceFormat
+import org.opendc.trace.util.parquet.LocalOutputFile
import org.opendc.trace.util.parquet.LocalParquetReader
import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA
+import java.nio.file.Files
import java.nio.file.Path
/**
@@ -41,6 +46,18 @@ public class OdcVmTraceFormat : TraceFormat {
*/
override val name: String = "opendc-vm"
+ override fun create(path: Path) {
+ // Construct directory containing the trace files
+ Files.createDirectory(path)
+
+ val tables = getTables(path)
+
+ for (table in tables) {
+ val writer = newWriter(path, table)
+ writer.close()
+ }
+ }
+
override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES)
override fun getDetails(path: Path, table: String): TableDetails {
@@ -82,6 +99,32 @@ public class OdcVmTraceFormat : TraceFormat {
}
}
+ override fun newWriter(path: Path, table: String): TableWriter {
+ return when (table) {
+ TABLE_RESOURCES -> {
+ val schema = RESOURCES_SCHEMA
+ val writer = AvroParquetWriter.builder<GenericRecord>(LocalOutputFile(path.resolve("meta.parquet")))
+ .withSchema(schema)
+ .withCompressionCodec(CompressionCodecName.ZSTD)
+ .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+ .build()
+ OdcVmResourceTableWriter(writer, schema)
+ }
+ TABLE_RESOURCE_STATES -> {
+ val schema = RESOURCE_STATES_SCHEMA
+ val writer = AvroParquetWriter.builder<GenericRecord>(LocalOutputFile(path.resolve("trace.parquet")))
+ .withSchema(schema)
+ .withCompressionCodec(CompressionCodecName.ZSTD)
+ .withDictionaryEncoding("id", true)
+ .withBloomFilterEnabled("id", true)
+ .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+ .build()
+ OdcVmResourceStateTableWriter(writer, schema)
+ }
+ else -> throw IllegalArgumentException("Table $table not supported")
+ }
+ }
+
public companion object {
/**
* Schema for the resources table in the trace.
diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt
index 4cb7e49e..1fd076d5 100644
--- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt
+++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt
@@ -36,6 +36,10 @@ import kotlin.io.path.bufferedReader
public class SwfTraceFormat : TraceFormat {
override val name: String = "swf"
+ override fun create(path: Path) {
+ throw UnsupportedOperationException("Writing not supported for this format")
+ }
+
override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS)
override fun getDetails(path: Path, table: String): TableDetails {
@@ -65,4 +69,8 @@ public class SwfTraceFormat : TraceFormat {
else -> throw IllegalArgumentException("Table $table not supported")
}
}
+
+ override fun newWriter(path: Path, table: String): TableWriter {
+ throw UnsupportedOperationException("Writing not supported for this format")
+ }
}
diff --git a/opendc-trace/opendc-trace-tools/build.gradle.kts b/opendc-trace/opendc-trace-tools/build.gradle.kts
index 35190dba..14a0fc7c 100644
--- a/opendc-trace/opendc-trace-tools/build.gradle.kts
+++ b/opendc-trace/opendc-trace-tools/build.gradle.kts
@@ -29,19 +29,18 @@ plugins {
}
application {
- mainClass.set("org.opendc.trace.tools.TraceConverterKt")
+ mainClass.set("org.opendc.trace.tools.TraceConverter")
}
dependencies {
api(platform(projects.opendcPlatform))
- implementation(projects.opendcTrace.opendcTraceParquet)
- implementation(projects.opendcTrace.opendcTraceOpendc)
- implementation(projects.opendcTrace.opendcTraceAzure)
- implementation(projects.opendcTrace.opendcTraceBitbrains)
-
+ implementation(projects.opendcTrace.opendcTraceApi)
implementation(libs.kotlin.logging)
implementation(libs.clikt)
+ runtimeOnly(projects.opendcTrace.opendcTraceOpendc)
+ runtimeOnly(projects.opendcTrace.opendcTraceBitbrains)
+ runtimeOnly(projects.opendcTrace.opendcTraceAzure)
runtimeOnly(libs.log4j.slf4j)
}
diff --git a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt
index cd5d287f..6fad43be 100644
--- a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt
+++ b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt
@@ -20,6 +20,7 @@
* SOFTWARE.
*/
+@file:JvmName("TraceConverter")
package org.opendc.trace.tools
import com.github.ajalt.clikt.core.CliktCommand
@@ -29,25 +30,19 @@ import com.github.ajalt.clikt.parameters.groups.cooccurring
import com.github.ajalt.clikt.parameters.options.*
import com.github.ajalt.clikt.parameters.types.*
import mu.KotlinLogging
-import org.apache.avro.generic.GenericData
-import org.apache.avro.generic.GenericRecordBuilder
-import org.apache.parquet.avro.AvroParquetWriter
-import org.apache.parquet.hadoop.ParquetWriter
-import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.opendc.trace.*
-import org.opendc.trace.opendc.OdcVmTraceFormat
-import org.opendc.trace.util.parquet.LocalOutputFile
import java.io.File
+import java.time.Duration
+import java.time.Instant
import java.util.*
import kotlin.math.abs
import kotlin.math.max
import kotlin.math.min
-import kotlin.math.roundToLong
/**
* A script to convert a trace in text format into a Parquet trace.
*/
-public fun main(args: Array<String>): Unit = TraceConverterCli().main(args)
+fun main(args: Array<String>): Unit = TraceConverterCli().main(args)
/**
* Represents the command for converting traces
@@ -74,11 +69,16 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
/**
* The input format of the trace.
*/
- private val format by option("-f", "--format", help = "input format of trace")
- .choice("bitbrains-ex", "bitbrains", "azure")
+ private val inputFormat by option("-f", "--input-format", help = "format of output trace")
.required()
/**
+ * The format of the output trace.
+ */
+ private val outputFormat by option("--output-format", help = "format of output trace")
+ .default("opendc-vm")
+
+ /**
* The sampling options.
*/
private val samplingOptions by SamplingOptions().cooccurring()
@@ -94,17 +94,14 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
traceParquet.delete()
}
- val trace = Trace.open(input, format = format)
+ val inputTrace = Trace.open(input, format = inputFormat)
+ val outputTrace = Trace.create(output, format = outputFormat)
logger.info { "Building resources table" }
- val metaWriter = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(metaParquet))
- .withSchema(OdcVmTraceFormat.RESOURCES_SCHEMA)
- .withCompressionCodec(CompressionCodecName.ZSTD)
- .enablePageWriteChecksum()
- .build()
+ val metaWriter = outputTrace.getTable(TABLE_RESOURCES)!!.newWriter()
- val selectedVms = metaWriter.use { convertResources(trace, it) }
+ val selectedVms = metaWriter.use { convertResources(inputTrace, it) }
if (selectedVms.isEmpty()) {
logger.warn { "No VMs selected" }
@@ -114,23 +111,16 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
logger.info { "Wrote ${selectedVms.size} rows" }
logger.info { "Building resource states table" }
- val writer = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(traceParquet))
- .withSchema(OdcVmTraceFormat.RESOURCE_STATES_SCHEMA)
- .withCompressionCodec(CompressionCodecName.ZSTD)
- .withDictionaryEncoding("id", true)
- .withBloomFilterEnabled("id", true)
- .withBloomFilterNDV("id", selectedVms.size.toLong())
- .enableValidation()
- .build()
+ val writer = outputTrace.getTable(TABLE_RESOURCE_STATES)!!.newWriter()
- val statesCount = writer.use { convertResourceStates(trace, it, selectedVms) }
+ val statesCount = writer.use { convertResourceStates(inputTrace, it, selectedVms) }
logger.info { "Wrote $statesCount rows" }
}
/**
* Convert the resources table for the trace.
*/
- private fun convertResources(trace: Trace, writer: ParquetWriter<GenericData.Record>): Set<String> {
+ private fun convertResources(trace: Trace, writer: TableWriter): Set<String> {
val random = samplingOptions?.let { Random(it.seed) }
val samplingFraction = samplingOptions?.fraction ?: 1.0
val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader()
@@ -168,18 +158,16 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
continue
}
- val builder = GenericRecordBuilder(OdcVmTraceFormat.RESOURCES_SCHEMA)
-
- builder["id"] = id
- builder["start_time"] = startTime
- builder["stop_time"] = stopTime
- builder["cpu_count"] = numCpus
- builder["mem_capacity"] = max(memCapacity, memUsage).roundToLong()
-
logger.info { "Selecting VM $id" }
-
- writer.write(builder.build())
selectedVms.add(id)
+
+ writer.startRow()
+ writer.set(RESOURCE_ID, id)
+ writer.set(RESOURCE_START_TIME, Instant.ofEpochMilli(startTime))
+ writer.set(RESOURCE_STOP_TIME, Instant.ofEpochMilli(stopTime))
+ writer.setInt(RESOURCE_CPU_COUNT, numCpus)
+ writer.setDouble(RESOURCE_MEM_CAPACITY, max(memCapacity, memUsage))
+ writer.endRow()
}
return selectedVms
@@ -188,7 +176,7 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
/**
* Convert the resource states table for the trace.
*/
- private fun convertResourceStates(trace: Trace, writer: ParquetWriter<GenericData.Record>, selectedVms: Set<String>): Int {
+ private fun convertResourceStates(trace: Trace, writer: TableWriter, selectedVms: Set<String>): Int {
val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader()
var hasNextRow = reader.nextRow()
@@ -231,15 +219,13 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
cpuCount == reader.getInt(RESOURCE_CPU_COUNT)
} while (shouldContinue)
- val builder = GenericRecordBuilder(OdcVmTraceFormat.RESOURCE_STATES_SCHEMA)
-
- builder["id"] = id
- builder["timestamp"] = startTimestamp
- builder["duration"] = duration
- builder["cpu_count"] = cpuCount
- builder["cpu_usage"] = cpuUsage
-
- writer.write(builder.build())
+ writer.startRow()
+ writer.set(RESOURCE_ID, id)
+ writer.set(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(startTimestamp))
+ writer.set(RESOURCE_STATE_DURATION, Duration.ofMillis(duration))
+ writer.setInt(RESOURCE_CPU_COUNT, cpuCount)
+ writer.setDouble(RESOURCE_STATE_CPU_USAGE, cpuUsage)
+ writer.endRow()
count++
diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt
index 825c3d6d..c75e3cbb 100644
--- a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt
+++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt
@@ -39,6 +39,10 @@ public class WfFormatTraceFormat : TraceFormat {
override val name: String = "wfformat"
+ override fun create(path: Path) {
+ throw UnsupportedOperationException("Writing not supported for this format")
+ }
+
override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS)
override fun getDetails(path: Path, table: String): TableDetails {
@@ -64,4 +68,8 @@ public class WfFormatTraceFormat : TraceFormat {
else -> throw IllegalArgumentException("Table $table not supported")
}
}
+
+ override fun newWriter(path: Path, table: String): TableWriter {
+ throw UnsupportedOperationException("Writing not supported for this format")
+ }
}
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt
index 2f17694f..ef88d295 100644
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt
@@ -35,6 +35,10 @@ import java.nio.file.Path
public class WtfTraceFormat : TraceFormat {
override val name: String = "wtf"
+ override fun create(path: Path) {
+ throw UnsupportedOperationException("Writing not supported for this format")
+ }
+
override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS)
override fun getDetails(path: Path, table: String): TableDetails {
@@ -67,4 +71,8 @@ public class WtfTraceFormat : TraceFormat {
else -> throw IllegalArgumentException("Table $table not supported")
}
}
+
+ override fun newWriter(path: Path, table: String): TableWriter {
+ throw UnsupportedOperationException("Writing not supported for this format")
+ }
}