summaryrefslogtreecommitdiff
path: root/opendc-trace/opendc-trace-api/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-trace/opendc-trace-api/src/main')
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/CarbonIntensityColumns.kt35
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt6
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/Tables.kt2
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/CarbonTableReader.kt140
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/CarbonTraceFormat.kt86
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityFragment.kt33
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityReadSupport.kt95
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityRecordMaterializer.kt86
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt14
9 files changed, 486 insertions, 11 deletions
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/CarbonIntensityColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/CarbonIntensityColumns.kt
new file mode 100644
index 00000000..de74c4fd
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/CarbonIntensityColumns.kt
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+@file:JvmName("CarbonIntensityColumns")
+
+package org.opendc.trace.conv
+
+/**
+ * A column containing the task identifier.
+ */
+public const val CARBON_INTENSITY_TIMESTAMP: String = "timestamp"
+
+/**
+ * A column containing the task identifier.
+ */
+public const val CARBON_INTENSITY_VALUE: String = "carbon_intensity"
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt
index 046dd13d..baaa0690 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt
@@ -43,6 +43,12 @@ public val resourceClusterID: String = "cluster_id"
public val resourceStartTime: String = "start_time"
/**
+ * Start time for the resource.
+ */
+@JvmField
+public val resourceCarbonIntensity: String = "carbon_intensity"
+
+/**
* End time for the resource.
*/
@JvmField
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/Tables.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/Tables.kt
index 495628da..9b8fc6cf 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/Tables.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/Tables.kt
@@ -48,3 +48,5 @@ public const val TABLE_RESOURCE_STATES: String = "resource_states"
* A table containing the groups of resources that interfere when run on the same execution platform.
*/
public const val TABLE_INTERFERENCE_GROUPS: String = "interference_groups"
+
+public const val TABLE_CARBON_INTENSITY: String = "carbon_intensities"
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/CarbonTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/CarbonTableReader.kt
new file mode 100644
index 00000000..226c8806
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/CarbonTableReader.kt
@@ -0,0 +1,140 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.formats.carbon
+
+import org.opendc.trace.TableReader
+import org.opendc.trace.conv.CARBON_INTENSITY_TIMESTAMP
+import org.opendc.trace.conv.CARBON_INTENSITY_VALUE
+import org.opendc.trace.formats.carbon.parquet.CarbonIntensityFragment
+import org.opendc.trace.util.parquet.LocalParquetReader
+import java.time.Duration
+import java.time.Instant
+import java.util.UUID
+
+/**
+ * A [TableReader] implementation for the WTF format.
+ */
+internal class CarbonTableReader(private val reader: LocalParquetReader<CarbonIntensityFragment>) : TableReader {
+ /**
+ * The current record.
+ */
+ private var record: CarbonIntensityFragment? = null
+
+ override fun nextRow(): Boolean {
+ try {
+ val record = reader.read()
+ this.record = record
+
+ return record != null
+ } catch (e: Throwable) {
+ this.record = null
+ throw e
+ }
+ }
+
+ private val colTimestamp = 0
+ private val colCarbonIntensity = 1
+
+ override fun resolve(name: String): Int {
+ return when (name) {
+ CARBON_INTENSITY_TIMESTAMP -> colTimestamp
+ CARBON_INTENSITY_VALUE -> colCarbonIntensity
+ else -> -1
+ }
+ }
+
+ override fun isNull(index: Int): Boolean {
+ require(index in colTimestamp..colCarbonIntensity) { "Invalid column index" }
+ return false
+ }
+
+ override fun getBoolean(index: Int): Boolean {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInt(index: Int): Int {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getLong(index: Int): Long {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getFloat(index: Int): Float {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getDouble(index: Int): Double {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+ return when (index) {
+ colCarbonIntensity -> record.carbonIntensity
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getString(index: Int): String {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getUUID(index: Int): UUID? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInstant(index: Int): Instant {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+ return when (index) {
+ colTimestamp -> record.timestamp
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getDuration(index: Int): Duration {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <T> getList(
+ index: Int,
+ elementType: Class<T>,
+ ): List<T>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <T> getSet(
+ index: Int,
+ elementType: Class<T>,
+ ): Set<T>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <K, V> getMap(
+ index: Int,
+ keyType: Class<K>,
+ valueType: Class<V>,
+ ): Map<K, V>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun close() {
+ reader.close()
+ }
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/CarbonTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/CarbonTraceFormat.kt
new file mode 100644
index 00000000..0daa1297
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/CarbonTraceFormat.kt
@@ -0,0 +1,86 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.formats.carbon
+
+import org.opendc.trace.TableColumn
+import org.opendc.trace.TableColumnType
+import org.opendc.trace.TableReader
+import org.opendc.trace.TableWriter
+import org.opendc.trace.conv.CARBON_INTENSITY_TIMESTAMP
+import org.opendc.trace.conv.CARBON_INTENSITY_VALUE
+import org.opendc.trace.conv.TABLE_CARBON_INTENSITY
+import org.opendc.trace.formats.carbon.parquet.CarbonIntensityReadSupport
+import org.opendc.trace.spi.TableDetails
+import org.opendc.trace.spi.TraceFormat
+import org.opendc.trace.util.parquet.LocalParquetReader
+import java.nio.file.Path
+
+/**
+ * A [TraceFormat] implementation for the Carbon Intensity trace.
+ */
+public class CarbonTraceFormat : TraceFormat {
+ override val name: String = "carbon_intensity"
+
+ override fun create(path: Path) {
+ throw UnsupportedOperationException("Writing not supported for this format")
+ }
+
+ override fun getTables(path: Path): List<String> = listOf(TABLE_CARBON_INTENSITY)
+
+ override fun getDetails(
+ path: Path,
+ table: String,
+ ): TableDetails {
+ return when (table) {
+ TABLE_CARBON_INTENSITY ->
+ TableDetails(
+ listOf(
+ TableColumn(CARBON_INTENSITY_TIMESTAMP, TableColumnType.Instant),
+ TableColumn(CARBON_INTENSITY_VALUE, TableColumnType.Double),
+ ),
+ )
+ else -> throw IllegalArgumentException("Table $table not supported")
+ }
+ }
+
+ override fun newReader(
+ path: Path,
+ table: String,
+ projection: List<String>?,
+ ): TableReader {
+ return when (table) {
+ TABLE_CARBON_INTENSITY -> {
+ val reader = LocalParquetReader(path, CarbonIntensityReadSupport(projection))
+ CarbonTableReader(reader)
+ }
+ else -> throw IllegalArgumentException("Table $table not supported")
+ }
+ }
+
+ override fun newWriter(
+ path: Path,
+ table: String,
+ ): TableWriter {
+ throw UnsupportedOperationException("Writing not supported for this format")
+ }
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityFragment.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityFragment.kt
new file mode 100644
index 00000000..3211cb6c
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityFragment.kt
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.formats.carbon.parquet
+
+import java.time.Instant
+
+/**
+ * A task in the Workflow Trace Format.
+ */
+internal data class CarbonIntensityFragment(
+ val timestamp: Instant,
+ val carbonIntensity: Double,
+)
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityReadSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityReadSupport.kt
new file mode 100644
index 00000000..2f4eac05
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityReadSupport.kt
@@ -0,0 +1,95 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.formats.carbon.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.api.InitContext
+import org.apache.parquet.hadoop.api.ReadSupport
+import org.apache.parquet.io.api.RecordMaterializer
+import org.apache.parquet.schema.LogicalTypeAnnotation
+import org.apache.parquet.schema.MessageType
+import org.apache.parquet.schema.PrimitiveType
+import org.apache.parquet.schema.Types
+import org.opendc.trace.conv.CARBON_INTENSITY_TIMESTAMP
+import org.opendc.trace.conv.CARBON_INTENSITY_VALUE
+
+/**
+ * A [ReadSupport] instance for [Task] objects.
+ *
+ * @param projection The projection of the table to read.
+ */
+internal class CarbonIntensityReadSupport(private val projection: List<String>?) : ReadSupport<CarbonIntensityFragment>() {
+ /**
+ * Mapping of table columns to their Parquet column names.
+ */
+ private val colMap =
+ mapOf(
+ CARBON_INTENSITY_TIMESTAMP to "timestamp",
+ CARBON_INTENSITY_VALUE to "carbon_intensity",
+ )
+
+ override fun init(context: InitContext): ReadContext {
+ val projectedSchema =
+ if (projection != null) {
+ Types.buildMessage()
+ .apply {
+ val fieldByName = READ_SCHEMA.fields.associateBy { it.name }
+
+ for (col in projection) {
+ val fieldName = colMap[col] ?: continue
+ addField(fieldByName.getValue(fieldName))
+ }
+ }
+ .named(READ_SCHEMA.name)
+ } else {
+ READ_SCHEMA
+ }
+ return ReadContext(projectedSchema)
+ }
+
+ override fun prepareForRead(
+ configuration: Configuration,
+ keyValueMetaData: Map<String, String>,
+ fileSchema: MessageType,
+ readContext: ReadContext,
+ ): RecordMaterializer<CarbonIntensityFragment> = CarbonIntensityRecordMaterializer(readContext.requestedSchema)
+
+ companion object {
+ /**
+ * Parquet read schema for the "tasks" table in the trace.
+ */
+ @JvmStatic
+ val READ_SCHEMA: MessageType =
+ Types.buildMessage()
+ .addFields(
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("timestamp"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("carbon_intensity"),
+ )
+ .named("carbon_intensity_fragment")
+ }
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityRecordMaterializer.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityRecordMaterializer.kt
new file mode 100644
index 00000000..f5d68f22
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityRecordMaterializer.kt
@@ -0,0 +1,86 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.formats.carbon.parquet
+
+import org.apache.parquet.io.api.Converter
+import org.apache.parquet.io.api.GroupConverter
+import org.apache.parquet.io.api.PrimitiveConverter
+import org.apache.parquet.io.api.RecordMaterializer
+import org.apache.parquet.schema.MessageType
+import java.time.Instant
+
+/**
+ * A [RecordMaterializer] for [Task] records.
+ */
+internal class CarbonIntensityRecordMaterializer(schema: MessageType) : RecordMaterializer<CarbonIntensityFragment>() {
+ /**
+ * State of current record being read.
+ */
+ private var localTimestamp: Instant = Instant.MIN
+ private var localCarbonIntensity: Double = 0.0
+
+ /**
+ * Root converter for the record.
+ */
+ private val root =
+ object : GroupConverter() {
+ /**
+ * The converters for the columns of the schema.
+ */
+ private val converters =
+ schema.fields.map { type ->
+ when (type.name) {
+ "timestamp" ->
+ object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ localTimestamp = Instant.ofEpochMilli(value)
+ }
+ }
+ "carbon_intensity" ->
+ object : PrimitiveConverter() {
+ override fun addDouble(value: Double) {
+ localCarbonIntensity = value
+ }
+ }
+ else -> error("Unknown column $type")
+ }
+ }
+
+ override fun start() {
+ localTimestamp = Instant.MIN
+ localCarbonIntensity = 0.0
+ }
+
+ override fun end() {}
+
+ override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex]
+ }
+
+ override fun getCurrentRecord(): CarbonIntensityFragment =
+ CarbonIntensityFragment(
+ localTimestamp,
+ localCarbonIntensity,
+ )
+
+ override fun getRootConverter(): GroupConverter = root
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt
index 89cac608..67df667b 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt
@@ -26,6 +26,7 @@ import org.opendc.trace.TableReader
import org.opendc.trace.TableWriter
import org.opendc.trace.azure.AzureTraceFormat
import org.opendc.trace.bitbrains.BitbrainsTraceFormat
+import org.opendc.trace.formats.carbon.CarbonTraceFormat
import org.opendc.trace.formats.opendc.OdcVmTraceFormat
import org.opendc.trace.gwf.GwfTraceFormat
import org.opendc.trace.swf.SwfTraceFormat
@@ -114,26 +115,17 @@ public interface TraceFormat {
return ServiceLoader.load(TraceFormat::class.java)
}
-// /**
-// * Obtain a [TraceFormat] implementation by [name].
-// */
-// @JvmStatic
-// public fun byName(name: String): TraceFormat? {
-//
-// val loader = ServiceLoader.load(TraceFormat::class.java)
-// return loader.find { it.name == name }
-// }
-
/**
* Obtain a [TraceFormat] implementation by [name].
*/
@JvmStatic
public fun byName(name: String): TraceFormat? {
return when (name) {
- "opendc-vm" -> OdcVmTraceFormat()
"azure" -> AzureTraceFormat()
"bitbrains" -> BitbrainsTraceFormat()
+ "carbon" -> CarbonTraceFormat()
"gwf" -> GwfTraceFormat()
+ "opendc-vm" -> OdcVmTraceFormat()
"swf" -> SwfTraceFormat()
"wfformat" -> WfFormatTraceFormat()
"wtf" -> WtfTraceFormat()