summaryrefslogtreecommitdiff
path: root/opendc-trace/opendc-trace-api/src/main
diff options
context:
space:
mode:
authorDante Niewenhuis <d.niewenhuis@hotmail.com>2024-05-07 12:33:39 +0200
committerGitHub <noreply@github.com>2024-05-07 12:33:39 +0200
commitad20465a5df47b49561bb0afbdda5cd65c5da4b8 (patch)
tree268f0fde5924b71ca2750dbbbba4cbe24c361f12 /opendc-trace/opendc-trace-api/src/main
parent7c0691eb6c348d2e49da3ef354b652cf26604905 (diff)
Revamped failure models (#228)
Diffstat (limited to 'opendc-trace/opendc-trace-api/src/main')
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/FailureColumns.kt40
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/Tables.kt4
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/CarbonTraceFormat.kt8
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/FailureTableReader.kt144
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/FailureTraceFormat.kt88
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/parquet/FailureFragment.kt32
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/parquet/FailureReadSupport.kt98
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/parquet/FailureRecordMaterializer.kt94
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt2
9 files changed, 505 insertions, 5 deletions
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/FailureColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/FailureColumns.kt
new file mode 100644
index 00000000..3f653041
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/FailureColumns.kt
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+@file:JvmName("FailureColumns")
+
+package org.opendc.trace.conv
+
+/**
+ * A column containing the task identifier.
+ */
+public const val FAILURE_INTERVAL: String = "failure_interval"
+
+/**
+ * A column containing the task identifier.
+ */
+public const val FAILURE_DURATION: String = "failure_duration"
+
+/**
+ * A column containing the task identifier.
+ */
+public const val FAILURE_INTENSITY: String = "failure_intensity"
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/Tables.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/Tables.kt
index 9b8fc6cf..d4019f73 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/Tables.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/Tables.kt
@@ -49,4 +49,6 @@ public const val TABLE_RESOURCE_STATES: String = "resource_states"
*/
public const val TABLE_INTERFERENCE_GROUPS: String = "interference_groups"
-public const val TABLE_CARBON_INTENSITY: String = "carbon_intensities"
+public const val TABLE_CARBON_INTENSITIES: String = "carbon_intensities"
+
+public const val TABLE_FAILURES: String = "failures"
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/CarbonTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/CarbonTraceFormat.kt
index 0daa1297..d8adc739 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/CarbonTraceFormat.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/CarbonTraceFormat.kt
@@ -28,7 +28,7 @@ import org.opendc.trace.TableReader
import org.opendc.trace.TableWriter
import org.opendc.trace.conv.CARBON_INTENSITY_TIMESTAMP
import org.opendc.trace.conv.CARBON_INTENSITY_VALUE
-import org.opendc.trace.conv.TABLE_CARBON_INTENSITY
+import org.opendc.trace.conv.TABLE_CARBON_INTENSITIES
import org.opendc.trace.formats.carbon.parquet.CarbonIntensityReadSupport
import org.opendc.trace.spi.TableDetails
import org.opendc.trace.spi.TraceFormat
@@ -45,14 +45,14 @@ public class CarbonTraceFormat : TraceFormat {
throw UnsupportedOperationException("Writing not supported for this format")
}
- override fun getTables(path: Path): List<String> = listOf(TABLE_CARBON_INTENSITY)
+ override fun getTables(path: Path): List<String> = listOf(TABLE_CARBON_INTENSITIES)
override fun getDetails(
path: Path,
table: String,
): TableDetails {
return when (table) {
- TABLE_CARBON_INTENSITY ->
+ TABLE_CARBON_INTENSITIES ->
TableDetails(
listOf(
TableColumn(CARBON_INTENSITY_TIMESTAMP, TableColumnType.Instant),
@@ -69,7 +69,7 @@ public class CarbonTraceFormat : TraceFormat {
projection: List<String>?,
): TableReader {
return when (table) {
- TABLE_CARBON_INTENSITY -> {
+ TABLE_CARBON_INTENSITIES -> {
val reader = LocalParquetReader(path, CarbonIntensityReadSupport(projection))
CarbonTableReader(reader)
}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/FailureTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/FailureTableReader.kt
new file mode 100644
index 00000000..a1c10bd0
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/FailureTableReader.kt
@@ -0,0 +1,144 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.formats.failure
+
+import org.opendc.trace.TableReader
+import org.opendc.trace.conv.FAILURE_DURATION
+import org.opendc.trace.conv.FAILURE_INTENSITY
+import org.opendc.trace.conv.FAILURE_INTERVAL
+import org.opendc.trace.formats.failure.parquet.FailureFragment
+import org.opendc.trace.util.parquet.LocalParquetReader
+import java.time.Duration
+import java.time.Instant
+import java.util.UUID
+
+/**
+ * A [TableReader] implementation for the WTF format.
+ */
+internal class FailureTableReader(private val reader: LocalParquetReader<FailureFragment>) : TableReader {
+ /**
+ * The current record.
+ */
+ private var record: FailureFragment? = null
+
+ override fun nextRow(): Boolean {
+ try {
+ val record = reader.read()
+ this.record = record
+
+ return record != null
+ } catch (e: Throwable) {
+ this.record = null
+ throw e
+ }
+ }
+
+ private val colFailureInterval = 0
+ private val colFailureDuration = 1
+ private val colFailureIntensity = 2
+
+ override fun resolve(name: String): Int {
+ return when (name) {
+ FAILURE_INTERVAL -> colFailureInterval
+ FAILURE_DURATION -> colFailureDuration
+ FAILURE_INTENSITY -> colFailureIntensity
+ else -> -1
+ }
+ }
+
+ override fun isNull(index: Int): Boolean {
+ require(index in colFailureInterval..colFailureIntensity) { "Invalid column index" }
+ return false
+ }
+
+ override fun getBoolean(index: Int): Boolean {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInt(index: Int): Int {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getLong(index: Int): Long {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+ return when (index) {
+ colFailureInterval -> record.failureInterval
+ colFailureDuration -> record.failureDuration
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getFloat(index: Int): Float {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getDouble(index: Int): Double {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+ return when (index) {
+ colFailureIntensity -> record.failureIntensity
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getString(index: Int): String {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getUUID(index: Int): UUID? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInstant(index: Int): Instant {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getDuration(index: Int): Duration {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <T> getList(
+ index: Int,
+ elementType: Class<T>,
+ ): List<T>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <T> getSet(
+ index: Int,
+ elementType: Class<T>,
+ ): Set<T>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <K, V> getMap(
+ index: Int,
+ keyType: Class<K>,
+ valueType: Class<V>,
+ ): Map<K, V>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun close() {
+ reader.close()
+ }
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/FailureTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/FailureTraceFormat.kt
new file mode 100644
index 00000000..892216a0
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/FailureTraceFormat.kt
@@ -0,0 +1,88 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.formats.failure
+
+import org.opendc.trace.TableColumn
+import org.opendc.trace.TableColumnType
+import org.opendc.trace.TableReader
+import org.opendc.trace.TableWriter
+import org.opendc.trace.conv.FAILURE_DURATION
+import org.opendc.trace.conv.FAILURE_INTENSITY
+import org.opendc.trace.conv.FAILURE_INTERVAL
+import org.opendc.trace.conv.TABLE_FAILURES
+import org.opendc.trace.formats.failure.parquet.FailureReadSupport
+import org.opendc.trace.spi.TableDetails
+import org.opendc.trace.spi.TraceFormat
+import org.opendc.trace.util.parquet.LocalParquetReader
+import java.nio.file.Path
+
+/**
+ * A [TraceFormat] implementation for the Failure Intensity trace.
+ */
+public class FailureTraceFormat : TraceFormat {
+ override val name: String = "failure"
+
+ override fun create(path: Path) {
+ throw UnsupportedOperationException("Writing not supported for this format")
+ }
+
+ override fun getTables(path: Path): List<String> = listOf(TABLE_FAILURES)
+
+ override fun getDetails(
+ path: Path,
+ table: String,
+ ): TableDetails {
+ return when (table) {
+ TABLE_FAILURES ->
+ TableDetails(
+ listOf(
+ TableColumn(FAILURE_INTERVAL, TableColumnType.Long),
+ TableColumn(FAILURE_DURATION, TableColumnType.Long),
+ TableColumn(FAILURE_INTENSITY, TableColumnType.Double),
+ ),
+ )
+ else -> throw IllegalArgumentException("Table $table not supported")
+ }
+ }
+
+ override fun newReader(
+ path: Path,
+ table: String,
+ projection: List<String>?,
+ ): TableReader {
+ return when (table) {
+ TABLE_FAILURES -> {
+ val reader = LocalParquetReader(path, FailureReadSupport(projection))
+ FailureTableReader(reader)
+ }
+ else -> throw IllegalArgumentException("Table $table not supported")
+ }
+ }
+
+ override fun newWriter(
+ path: Path,
+ table: String,
+ ): TableWriter {
+ throw UnsupportedOperationException("Writing not supported for this format")
+ }
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/parquet/FailureFragment.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/parquet/FailureFragment.kt
new file mode 100644
index 00000000..49f43aa1
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/parquet/FailureFragment.kt
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.formats.failure.parquet
+
+/**
+ * A task in the Workflow Trace Format.
+ */
+internal data class FailureFragment(
+ val failureInterval: Long,
+ val failureDuration: Long,
+ val failureIntensity: Double,
+)
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/parquet/FailureReadSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/parquet/FailureReadSupport.kt
new file mode 100644
index 00000000..d49f86c6
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/parquet/FailureReadSupport.kt
@@ -0,0 +1,98 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.formats.failure.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.api.InitContext
+import org.apache.parquet.hadoop.api.ReadSupport
+import org.apache.parquet.io.api.RecordMaterializer
+import org.apache.parquet.schema.MessageType
+import org.apache.parquet.schema.PrimitiveType
+import org.apache.parquet.schema.Types
+import org.opendc.trace.conv.FAILURE_DURATION
+import org.opendc.trace.conv.FAILURE_INTENSITY
+import org.opendc.trace.conv.FAILURE_INTERVAL
+
+/**
+ * A [ReadSupport] instance for [Task] objects.
+ *
+ * @param projection The projection of the table to read.
+ */
+internal class FailureReadSupport(private val projection: List<String>?) : ReadSupport<FailureFragment>() {
+ /**
+ * Mapping of table columns to their Parquet column names.
+ */
+ private val colMap =
+ mapOf(
+ FAILURE_INTERVAL to "interval",
+ FAILURE_DURATION to "duration",
+ FAILURE_INTENSITY to "intensity",
+ )
+
+ override fun init(context: InitContext): ReadContext {
+ val projectedSchema =
+ if (projection != null) {
+ Types.buildMessage()
+ .apply {
+ val fieldByName = READ_SCHEMA.fields.associateBy { it.name }
+
+ for (col in projection) {
+ val fieldName = colMap[col] ?: continue
+ addField(fieldByName.getValue(fieldName))
+ }
+ }
+ .named(READ_SCHEMA.name)
+ } else {
+ READ_SCHEMA
+ }
+ return ReadContext(projectedSchema)
+ }
+
+ override fun prepareForRead(
+ configuration: Configuration,
+ keyValueMetaData: Map<String, String>,
+ fileSchema: MessageType,
+ readContext: ReadContext,
+ ): RecordMaterializer<FailureFragment> = FailureRecordMaterializer(readContext.requestedSchema)
+
+ companion object {
+ /**
+ * Parquet read schema for the "tasks" table in the trace.
+ */
+ @JvmStatic
+ val READ_SCHEMA: MessageType =
+ Types.buildMessage()
+ .addFields(
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("failure_interval"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("failure_duration"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("failure_intensity"),
+ )
+ .named("failure_fragment")
+ }
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/parquet/FailureRecordMaterializer.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/parquet/FailureRecordMaterializer.kt
new file mode 100644
index 00000000..5a00f8c9
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/parquet/FailureRecordMaterializer.kt
@@ -0,0 +1,94 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.formats.failure.parquet
+
+import org.apache.parquet.io.api.Converter
+import org.apache.parquet.io.api.GroupConverter
+import org.apache.parquet.io.api.PrimitiveConverter
+import org.apache.parquet.io.api.RecordMaterializer
+import org.apache.parquet.schema.MessageType
+
+/**
+ * A [RecordMaterializer] for [Task] records.
+ */
+internal class FailureRecordMaterializer(schema: MessageType) : RecordMaterializer<FailureFragment>() {
+ /**
+ * State of current record being read.
+ */
+ private var localFailureInterval: Long = 0L
+ private var localFailureDuration: Long = 0L
+ private var localFailureIntensity: Double = 0.0
+
+ /**
+ * Root converter for the record.
+ */
+ private val root =
+ object : GroupConverter() {
+ /**
+ * The converters for the columns of the schema.
+ */
+ private val converters =
+ schema.fields.map { type ->
+ when (type.name) {
+ "failure_interval" ->
+ object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ localFailureInterval = value
+ }
+ }
+ "failure_duration" ->
+ object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ localFailureDuration = value
+ }
+ }
+ "failure_intensity" ->
+ object : PrimitiveConverter() {
+ override fun addDouble(value: Double) {
+ localFailureIntensity = value
+ }
+ }
+ else -> error("Unknown column $type")
+ }
+ }
+
+ override fun start() {
+ localFailureInterval = 0L
+ localFailureDuration = 0L
+ localFailureIntensity = 0.0
+ }
+
+ override fun end() {}
+
+ override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex]
+ }
+
+ override fun getCurrentRecord(): FailureFragment =
+ FailureFragment(
+ localFailureInterval,
+ localFailureDuration,
+ localFailureIntensity,
+ )
+
+ override fun getRootConverter(): GroupConverter = root
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt
index 67df667b..e586f90a 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt
@@ -27,6 +27,7 @@ import org.opendc.trace.TableWriter
import org.opendc.trace.azure.AzureTraceFormat
import org.opendc.trace.bitbrains.BitbrainsTraceFormat
import org.opendc.trace.formats.carbon.CarbonTraceFormat
+import org.opendc.trace.formats.failure.FailureTraceFormat
import org.opendc.trace.formats.opendc.OdcVmTraceFormat
import org.opendc.trace.gwf.GwfTraceFormat
import org.opendc.trace.swf.SwfTraceFormat
@@ -124,6 +125,7 @@ public interface TraceFormat {
"azure" -> AzureTraceFormat()
"bitbrains" -> BitbrainsTraceFormat()
"carbon" -> CarbonTraceFormat()
+ "failure" -> FailureTraceFormat()
"gwf" -> GwfTraceFormat()
"opendc-vm" -> OdcVmTraceFormat()
"swf" -> SwfTraceFormat()