summaryrefslogtreecommitdiff
path: root/opendc-trace/opendc-trace-opendc/src/main
diff options
context:
space:
mode:
authorDante Niewenhuis <d.niewenhuis@hotmail.com>2024-04-16 09:29:53 +0200
committerGitHub <noreply@github.com>2024-04-16 09:29:53 +0200
commitfff89d25bd3c7b874e68261d21695c473c30ed7d (patch)
treebe368dd745e8119dbdefd9cd0b012c7ff9080a7a /opendc-trace/opendc-trace-opendc/src/main
parenta7b0afbb5b7059274962ade234a50240677008fd (diff)
Revamped the trace system. All TraceFormat files are now in the api m… (#216)
* Revamped the trace system. All TraceFormat files are now in the api module. This fixes some problems with not being able to use types of traces * applied spotless
Diffstat (limited to 'opendc-trace/opendc-trace-opendc/src/main')
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt225
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt192
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt166
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt209
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt168
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt197
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt190
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt37
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt159
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt127
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt34
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt149
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt114
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt112
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt121
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat1
16 files changed, 0 insertions, 2201 deletions
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt
deleted file mode 100644
index 7bf48f1a..00000000
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.opendc
-
-import com.fasterxml.jackson.core.JsonParseException
-import com.fasterxml.jackson.core.JsonParser
-import com.fasterxml.jackson.core.JsonToken
-import org.opendc.trace.TableColumnType
-import org.opendc.trace.TableReader
-import org.opendc.trace.conv.INTERFERENCE_GROUP_MEMBERS
-import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE
-import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET
-import org.opendc.trace.util.convertTo
-import java.time.Duration
-import java.time.Instant
-import java.util.UUID
-
-/**
- * A [TableReader] implementation for the OpenDC VM interference JSON format.
- */
-internal class OdcVmInterferenceJsonTableReader(private val parser: JsonParser) : TableReader {
- /**
- * A flag to indicate whether a single row has been read already.
- */
- private var isStarted = false
-
- override fun nextRow(): Boolean {
- if (!isStarted) {
- isStarted = true
-
- parser.nextToken()
-
- if (!parser.isExpectedStartArrayToken) {
- throw JsonParseException(parser, "Expected array at start, but got ${parser.currentToken()}")
- }
- }
-
- return if (parser.isClosed || parser.nextToken() == JsonToken.END_ARRAY) {
- parser.close()
- reset()
- false
- } else {
- parseGroup(parser)
- true
- }
- }
-
- private val colMembers = 0
- private val colTarget = 1
- private val colScore = 2
-
- private val typeMembers = TableColumnType.Set(TableColumnType.String)
-
- override fun resolve(name: String): Int {
- return when (name) {
- INTERFERENCE_GROUP_MEMBERS -> colMembers
- INTERFERENCE_GROUP_TARGET -> colTarget
- INTERFERENCE_GROUP_SCORE -> colScore
- else -> -1
- }
- }
-
- override fun isNull(index: Int): Boolean {
- return when (index) {
- colMembers, colTarget, colScore -> false
- else -> throw IllegalArgumentException("Invalid column index $index")
- }
- }
-
- override fun getBoolean(index: Int): Boolean {
- throw IllegalArgumentException("Invalid column $index")
- }
-
- override fun getInt(index: Int): Int {
- throw IllegalArgumentException("Invalid column $index")
- }
-
- override fun getLong(index: Int): Long {
- throw IllegalArgumentException("Invalid column $index")
- }
-
- override fun getFloat(index: Int): Float {
- throw IllegalArgumentException("Invalid column $index")
- }
-
- override fun getDouble(index: Int): Double {
- checkActive()
- return when (index) {
- colTarget -> targetLoad
- colScore -> score
- else -> throw IllegalArgumentException("Invalid column $index")
- }
- }
-
- override fun getString(index: Int): String? {
- throw IllegalArgumentException("Invalid column $index")
- }
-
- override fun getUUID(index: Int): UUID? {
- throw IllegalArgumentException("Invalid column $index")
- }
-
- override fun getInstant(index: Int): Instant? {
- throw IllegalArgumentException("Invalid column $index")
- }
-
- override fun getDuration(index: Int): Duration? {
- throw IllegalArgumentException("Invalid column $index")
- }
-
- override fun <T> getList(
- index: Int,
- elementType: Class<T>,
- ): List<T>? {
- throw IllegalArgumentException("Invalid column $index")
- }
-
- override fun <T> getSet(
- index: Int,
- elementType: Class<T>,
- ): Set<T>? {
- checkActive()
- return when (index) {
- colMembers -> typeMembers.convertTo(members, elementType)
- else -> throw IllegalArgumentException("Invalid column $index")
- }
- }
-
- override fun <K, V> getMap(
- index: Int,
- keyType: Class<K>,
- valueType: Class<V>,
- ): Map<K, V>? {
- throw IllegalArgumentException("Invalid column $index")
- }
-
- override fun close() {
- parser.close()
- }
-
- private var members = emptySet<String>()
- private var targetLoad = Double.POSITIVE_INFINITY
- private var score = 1.0
-
- /**
- * Helper method to check if the reader is active.
- */
- private fun checkActive() {
- check(isStarted && !parser.isClosed) { "No active row. Did you call nextRow()?" }
- }
-
- /**
- * Reset the state.
- */
- private fun reset() {
- members = emptySet()
- targetLoad = Double.POSITIVE_INFINITY
- score = 1.0
- }
-
- /**
- * Parse a group an interference JSON file.
- */
- private fun parseGroup(parser: JsonParser) {
- var targetLoad = Double.POSITIVE_INFINITY
- var score = 1.0
- val members = mutableSetOf<String>()
-
- if (!parser.isExpectedStartObjectToken) {
- throw JsonParseException(parser, "Expected object, but got ${parser.currentToken()}")
- }
-
- while (parser.nextValue() != JsonToken.END_OBJECT) {
- when (parser.currentName) {
- "vms" -> parseGroupMembers(parser, members)
- "minServerLoad" -> targetLoad = parser.doubleValue
- "performanceScore" -> score = parser.doubleValue
- }
- }
-
- this.members = members
- this.targetLoad = targetLoad
- this.score = score
- }
-
- /**
- * Parse the members of a group.
- */
- private fun parseGroupMembers(
- parser: JsonParser,
- members: MutableSet<String>,
- ) {
- if (!parser.isExpectedStartArrayToken) {
- throw JsonParseException(parser, "Expected array for group members, but got ${parser.currentToken()}")
- }
-
- while (parser.nextValue() != JsonToken.END_ARRAY) {
- if (parser.currentToken() != JsonToken.VALUE_STRING) {
- throw JsonParseException(parser, "Expected string value for group member")
- }
-
- members.add(parser.text)
- }
- }
-}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt
deleted file mode 100644
index 93f5a976..00000000
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.opendc
-
-import com.fasterxml.jackson.core.JsonGenerator
-import org.opendc.trace.TableWriter
-import org.opendc.trace.conv.INTERFERENCE_GROUP_MEMBERS
-import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE
-import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET
-import java.time.Duration
-import java.time.Instant
-import java.util.UUID
-
-/**
- * A [TableWriter] implementation for the OpenDC VM interference JSON format.
- */
-internal class OdcVmInterferenceJsonTableWriter(private val generator: JsonGenerator) : TableWriter {
- /**
- * A flag to indicate whether a row has been started.
- */
- private var isRowActive = false
-
- init {
- generator.writeStartArray()
- }
-
- override fun startRow() {
- // Reset state
- members = emptySet()
- targetLoad = Double.POSITIVE_INFINITY
- score = 1.0
-
- // Mark row as active
- isRowActive = true
- }
-
- override fun endRow() {
- check(isRowActive) { "No active row" }
-
- generator.writeStartObject()
- generator.writeArrayFieldStart("vms")
- for (member in members) {
- generator.writeString(member)
- }
- generator.writeEndArray()
- generator.writeNumberField("minServerLoad", targetLoad)
- generator.writeNumberField("performanceScore", score)
- generator.writeEndObject()
- }
-
- override fun resolve(name: String): Int {
- return when (name) {
- INTERFERENCE_GROUP_MEMBERS -> colMembers
- INTERFERENCE_GROUP_TARGET -> colTarget
- INTERFERENCE_GROUP_SCORE -> colScore
- else -> -1
- }
- }
-
- override fun setBoolean(
- index: Int,
- value: Boolean,
- ) {
- throw IllegalArgumentException("Invalid column $index")
- }
-
- override fun setInt(
- index: Int,
- value: Int,
- ) {
- throw IllegalArgumentException("Invalid column $index")
- }
-
- override fun setLong(
- index: Int,
- value: Long,
- ) {
- throw IllegalArgumentException("Invalid column $index")
- }
-
- override fun setFloat(
- index: Int,
- value: Float,
- ) {
- throw IllegalArgumentException("Invalid column $index")
- }
-
- override fun setDouble(
- index: Int,
- value: Double,
- ) {
- check(isRowActive) { "No active row" }
-
- when (index) {
- colTarget -> targetLoad = (value as Number).toDouble()
- colScore -> score = (value as Number).toDouble()
- else -> throw IllegalArgumentException("Invalid column $index")
- }
- }
-
- override fun setString(
- index: Int,
- value: String,
- ) {
- throw IllegalArgumentException("Invalid column $index")
- }
-
- override fun setUUID(
- index: Int,
- value: UUID,
- ) {
- throw IllegalArgumentException("Invalid column $index")
- }
-
- override fun setInstant(
- index: Int,
- value: Instant,
- ) {
- throw IllegalArgumentException("Invalid column $index")
- }
-
- override fun setDuration(
- index: Int,
- value: Duration,
- ) {
- throw IllegalArgumentException("Invalid column $index")
- }
-
- override fun <T> setList(
- index: Int,
- value: List<T>,
- ) {
- throw IllegalArgumentException("Invalid column $index")
- }
-
- override fun <T> setSet(
- index: Int,
- value: Set<T>,
- ) {
- check(isRowActive) { "No active row" }
-
- @Suppress("UNCHECKED_CAST")
- when (index) {
- colMembers -> members = value as Set<String>
- else -> throw IllegalArgumentException("Invalid column index $index")
- }
- }
-
- override fun <K, V> setMap(
- index: Int,
- value: Map<K, V>,
- ) {
- throw IllegalArgumentException("Invalid column $index")
- }
-
- override fun flush() {
- generator.flush()
- }
-
- override fun close() {
- generator.writeEndArray()
- generator.close()
- }
-
- private val colMembers = 0
- private val colTarget = 1
- private val colScore = 2
-
- private var members = emptySet<String>()
- private var targetLoad = Double.POSITIVE_INFINITY
- private var score = 1.0
-}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
deleted file mode 100644
index 8e54f2b0..00000000
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.opendc
-
-import org.opendc.trace.TableReader
-import org.opendc.trace.conv.resourceCpuCount
-import org.opendc.trace.conv.resourceID
-import org.opendc.trace.conv.resourceStateCpuUsage
-import org.opendc.trace.conv.resourceStateDuration
-import org.opendc.trace.conv.resourceStateTimestamp
-import org.opendc.trace.opendc.parquet.ResourceState
-import org.opendc.trace.util.parquet.LocalParquetReader
-import java.time.Duration
-import java.time.Instant
-import java.util.UUID
-
-/**
- * A [TableReader] implementation for the OpenDC virtual machine trace format.
- */
-internal class OdcVmResourceStateTableReader(private val reader: LocalParquetReader<ResourceState>) : TableReader {
- /**
- * The current record.
- */
- private var record: ResourceState? = null
-
- override fun nextRow(): Boolean {
- try {
- val record = reader.read()
- this.record = record
-
- return record != null
- } catch (e: Throwable) {
- this.record = null
- throw e
- }
- }
-
- private val colID = 0
- private val colTimestamp = 1
- private val colDuration = 2
- private val colCpuCount = 3
- private val colCpuUsage = 4
-
- override fun resolve(name: String): Int {
- return when (name) {
- resourceID -> colID
- resourceStateTimestamp -> colTimestamp
- resourceStateDuration -> colDuration
- resourceCpuCount -> colCpuCount
- resourceStateCpuUsage -> colCpuUsage
- else -> -1
- }
- }
-
- override fun isNull(index: Int): Boolean {
- require(index in 0..colCpuUsage) { "Invalid column index" }
- return false
- }
-
- override fun getBoolean(index: Int): Boolean {
- throw IllegalArgumentException("Invalid column or type [index $index]")
- }
-
- override fun getInt(index: Int): Int {
- val record = checkNotNull(record) { "Reader in invalid state" }
- return when (index) {
- colCpuCount -> record.cpuCount
- else -> throw IllegalArgumentException("Invalid column or type [index $index]")
- }
- }
-
- override fun getLong(index: Int): Long {
- throw IllegalArgumentException("Invalid column or type [index $index]")
- }
-
- override fun getFloat(index: Int): Float {
- throw IllegalArgumentException("Invalid column or type [index $index]")
- }
-
- override fun getDouble(index: Int): Double {
- val record = checkNotNull(record) { "Reader in invalid state" }
- return when (index) {
- colCpuUsage -> record.cpuUsage
- else -> throw IllegalArgumentException("Invalid column or type [index $index]")
- }
- }
-
- override fun getString(index: Int): String {
- val record = checkNotNull(record) { "Reader in invalid state" }
-
- return when (index) {
- colID -> record.id
- else -> throw IllegalArgumentException("Invalid column index $index")
- }
- }
-
- override fun getUUID(index: Int): UUID? {
- throw IllegalArgumentException("Invalid column or type [index $index]")
- }
-
- override fun getInstant(index: Int): Instant {
- val record = checkNotNull(record) { "Reader in invalid state" }
-
- return when (index) {
- colTimestamp -> record.timestamp
- else -> throw IllegalArgumentException("Invalid column index $index")
- }
- }
-
- override fun getDuration(index: Int): Duration {
- val record = checkNotNull(record) { "Reader in invalid state" }
-
- return when (index) {
- colDuration -> record.duration
- else -> throw IllegalArgumentException("Invalid column index $index")
- }
- }
-
- override fun <T> getList(
- index: Int,
- elementType: Class<T>,
- ): List<T>? {
- throw IllegalArgumentException("Invalid column or type [index $index]")
- }
-
- override fun <T> getSet(
- index: Int,
- elementType: Class<T>,
- ): Set<T>? {
- throw IllegalArgumentException("Invalid column or type [index $index]")
- }
-
- override fun <K, V> getMap(
- index: Int,
- keyType: Class<K>,
- valueType: Class<V>,
- ): Map<K, V>? {
- throw IllegalArgumentException("Invalid column or type [index $index]")
- }
-
- override fun close() {
- reader.close()
- }
-
- override fun toString(): String = "OdcVmResourceStateTableReader"
-}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt
deleted file mode 100644
index 01cd13c8..00000000
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.opendc
-
-import org.apache.parquet.hadoop.ParquetWriter
-import org.opendc.trace.TableWriter
-import org.opendc.trace.conv.resourceCpuCount
-import org.opendc.trace.conv.resourceID
-import org.opendc.trace.conv.resourceStateCpuUsage
-import org.opendc.trace.conv.resourceStateDuration
-import org.opendc.trace.conv.resourceStateTimestamp
-import org.opendc.trace.opendc.parquet.ResourceState
-import java.time.Duration
-import java.time.Instant
-import java.util.UUID
-
-/**
- * A [TableWriter] implementation for the OpenDC virtual machine trace format.
- */
-internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter<ResourceState>) : TableWriter {
- /**
- * The current state for the record that is being written.
- */
- private var localIsActive = false
- private var localID: String = ""
- private var localTimestamp: Instant = Instant.MIN
- private var localDuration: Duration = Duration.ZERO
- private var localCpuCount: Int = 0
- private var localCpuUsage: Double = Double.NaN
-
- override fun startRow() {
- localIsActive = true
- localID = ""
- localTimestamp = Instant.MIN
- localDuration = Duration.ZERO
- localCpuCount = 0
- localCpuUsage = Double.NaN
- }
-
- override fun endRow() {
- check(localIsActive) { "No active row" }
- localIsActive = false
-
- check(lastId != localID || localTimestamp >= lastTimestamp) { "Records need to be ordered by (id, timestamp)" }
-
- writer.write(ResourceState(localID, localTimestamp, localDuration, localCpuCount, localCpuUsage))
-
- lastId = localID
- lastTimestamp = localTimestamp
- }
-
- override fun resolve(name: String): Int {
- return when (name) {
- resourceID -> colID
- resourceStateTimestamp -> colTimestamp
- resourceStateDuration -> colDuration
- resourceCpuCount -> colCpuCount
- resourceStateCpuUsage -> colCpuUsage
- else -> -1
- }
- }
-
- override fun setBoolean(
- index: Int,
- value: Boolean,
- ) {
- throw IllegalArgumentException("Invalid column or type [index $index]")
- }
-
- override fun setInt(
- index: Int,
- value: Int,
- ) {
- check(localIsActive) { "No active row" }
- when (index) {
- colCpuCount -> localCpuCount = value
- else -> throw IllegalArgumentException("Invalid column or type [index $index]")
- }
- }
-
- override fun setLong(
- index: Int,
- value: Long,
- ) {
- throw IllegalArgumentException("Invalid column or type [index $index]")
- }
-
- override fun setFloat(
- index: Int,
- value: Float,
- ) {
- throw IllegalArgumentException("Invalid column or type [index $index]")
- }
-
- override fun setDouble(
- index: Int,
- value: Double,
- ) {
- check(localIsActive) { "No active row" }
- when (index) {
- colCpuUsage -> localCpuUsage = value
- else -> throw IllegalArgumentException("Invalid column or type [index $index]")
- }
- }
-
- override fun setString(
- index: Int,
- value: String,
- ) {
- check(localIsActive) { "No active row" }
-
- when (index) {
- colID -> localID = value
- else -> throw IllegalArgumentException("Invalid column or type [index $index]")
- }
- }
-
- override fun setUUID(
- index: Int,
- value: UUID,
- ) {
- throw IllegalArgumentException("Invalid column or type [index $index]")
- }
-
- override fun setInstant(
- index: Int,
- value: Instant,
- ) {
- check(localIsActive) { "No active row" }
-
- when (index) {
- colTimestamp -> localTimestamp = value
- else -> throw IllegalArgumentException("Invalid column or type [index $index]")
- }
- }
-
- override fun setDuration(
- index: Int,
- value: Duration,
- ) {
- check(localIsActive) { "No active row" }
-
- when (index) {
- colDuration -> localDuration = value
- else -> throw IllegalArgumentException("Invalid column or type [index $index]")
- }
- }
-
- override fun <T> setList(
- index: Int,
- value: List<T>,
- ) {
- throw IllegalArgumentException("Invalid column or type [index $index]")
- }
-
- override fun <T> setSet(
- index: Int,
- value: Set<T>,
- ) {
- throw IllegalArgumentException("Invalid column or type [index $index]")
- }
-
- override fun <K, V> setMap(
- index: Int,
- value: Map<K, V>,
- ) {
- throw IllegalArgumentException("Invalid column or type [index $index]")
- }
-
- override fun flush() {
- // Not available
- }
-
- override fun close() {
- writer.close()
- }
-
- /**
- * Last column values that are used to check for correct partitioning.
- */
- private var lastId: String? = null
- private var lastTimestamp: Instant = Instant.MAX
-
- private val colID = 0
- private val colTimestamp = 1
- private val colDuration = 2
- private val colCpuCount = 3
- private val colCpuUsage = 4
-}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
deleted file mode 100644
index 195929aa..00000000
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.opendc
-
-import org.opendc.trace.TableReader
-import org.opendc.trace.conv.resourceCpuCapacity
-import org.opendc.trace.conv.resourceCpuCount
-import org.opendc.trace.conv.resourceID
-import org.opendc.trace.conv.resourceMemCapacity
-import org.opendc.trace.conv.resourceStartTime
-import org.opendc.trace.conv.resourceStopTime
-import org.opendc.trace.opendc.parquet.Resource
-import org.opendc.trace.util.parquet.LocalParquetReader
-import java.time.Duration
-import java.time.Instant
-import java.util.UUID
-
-/**
- * A [TableReader] implementation for the "resources table" in the OpenDC virtual machine trace format.
- */
-internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<Resource>) : TableReader {
- /**
- * The current record.
- */
- private var record: Resource? = null
-
- override fun nextRow(): Boolean {
- try {
- val record = reader.read()
- this.record = record
-
- return record != null
- } catch (e: Throwable) {
- this.record = null
- throw e
- }
- }
-
- private val colID = 0
- private val colStartTime = 1
- private val colStopTime = 2
- private val colCpuCount = 3
- private val colCpuCapacity = 4
- private val colMemCapacity = 5
-
- override fun resolve(name: String): Int {
- return when (name) {
- resourceID -> colID
- resourceStartTime -> colStartTime
- resourceStopTime -> colStopTime
- resourceCpuCount -> colCpuCount
- resourceCpuCapacity -> colCpuCapacity
- resourceMemCapacity -> colMemCapacity
- else -> -1
- }
- }
-
- override fun isNull(index: Int): Boolean {
- require(index in 0..colMemCapacity) { "Invalid column index" }
- return false
- }
-
- override fun getBoolean(index: Int): Boolean {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getInt(index: Int): Int {
- val record = checkNotNull(record) { "Reader in invalid state" }
-
- return when (index) {
- colCpuCount -> record.cpuCount
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun getLong(index: Int): Long {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getFloat(index: Int): Float {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getDouble(index: Int): Double {
- val record = checkNotNull(record) { "Reader in invalid state" }
-
- return when (index) {
- colCpuCapacity -> record.cpuCapacity
- colMemCapacity -> record.memCapacity
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun getString(index: Int): String {
- val record = checkNotNull(record) { "Reader in invalid state" }
-
- return when (index) {
- colID -> record.id
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun getUUID(index: Int): UUID? {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getInstant(index: Int): Instant {
- val record = checkNotNull(record) { "Reader in invalid state" }
-
- return when (index) {
- colStartTime -> record.startTime
- colStopTime -> record.stopTime
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun getDuration(index: Int): Duration? {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun <T> getList(
- index: Int,
- elementType: Class<T>,
- ): List<T>? {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun <T> getSet(
- index: Int,
- elementType: Class<T>,
- ): Set<T>? {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun <K, V> getMap(
- index: Int,
- keyType: Class<K>,
- valueType: Class<V>,
- ): Map<K, V>? {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun close() {
- reader.close()
- }
-
- override fun toString(): String = "OdcVmResourceTableReader"
-}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt
deleted file mode 100644
index 5bbc2f3f..00000000
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.opendc
-
-import org.apache.parquet.hadoop.ParquetWriter
-import org.opendc.trace.TableWriter
-import org.opendc.trace.conv.resourceCpuCapacity
-import org.opendc.trace.conv.resourceCpuCount
-import org.opendc.trace.conv.resourceID
-import org.opendc.trace.conv.resourceMemCapacity
-import org.opendc.trace.conv.resourceStartTime
-import org.opendc.trace.conv.resourceStopTime
-import org.opendc.trace.opendc.parquet.Resource
-import java.time.Duration
-import java.time.Instant
-import java.util.UUID
-
-/**
- * A [TableWriter] implementation for the OpenDC virtual machine trace format.
- */
-internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resource>) : TableWriter {
- /**
- * The current state for the record that is being written.
- */
- private var localIsActive = false
- private var localId: String = ""
- private var localStartTime: Instant = Instant.MIN
- private var localStopTime: Instant = Instant.MIN
- private var localCpuCount: Int = 0
- private var localCpuCapacity: Double = Double.NaN
- private var localMemCapacity: Double = Double.NaN
-
- override fun startRow() {
- localIsActive = true
- localId = ""
- localStartTime = Instant.MIN
- localStopTime = Instant.MIN
- localCpuCount = 0
- localCpuCapacity = Double.NaN
- localMemCapacity = Double.NaN
- }
-
- override fun endRow() {
- check(localIsActive) { "No active row" }
- localIsActive = false
- writer.write(Resource(localId, localStartTime, localStopTime, localCpuCount, localCpuCapacity, localMemCapacity))
- }
-
- override fun resolve(name: String): Int {
- return when (name) {
- resourceID -> colID
- resourceStartTime -> colStartTime
- resourceStopTime -> colStopTime
- resourceCpuCount -> colCpuCount
- resourceCpuCapacity -> colCpuCapacity
- resourceMemCapacity -> colMemCapacity
- else -> -1
- }
- }
-
- override fun setBoolean(
- index: Int,
- value: Boolean,
- ) {
- throw IllegalArgumentException("Invalid column or type [index $index]")
- }
-
- override fun setInt(
- index: Int,
- value: Int,
- ) {
- check(localIsActive) { "No active row" }
- when (index) {
- colCpuCount -> localCpuCount = value
- else -> throw IllegalArgumentException("Invalid column or type [index $index]")
- }
- }
-
- override fun setLong(
- index: Int,
- value: Long,
- ) {
- throw IllegalArgumentException("Invalid column or type [index $index]")
- }
-
- override fun setFloat(
- index: Int,
- value: Float,
- ) {
- throw IllegalArgumentException("Invalid column or type [index $index]")
- }
-
- override fun setDouble(
- index: Int,
- value: Double,
- ) {
- check(localIsActive) { "No active row" }
- when (index) {
- colCpuCapacity -> localCpuCapacity = value
- colMemCapacity -> localMemCapacity = value
- else -> throw IllegalArgumentException("Invalid column or type [index $index]")
- }
- }
-
- override fun setString(
- index: Int,
- value: String,
- ) {
- check(localIsActive) { "No active row" }
- when (index) {
- colID -> localId = value
- else -> throw IllegalArgumentException("Invalid column index $index")
- }
- }
-
- override fun setUUID(
- index: Int,
- value: UUID,
- ) {
- throw IllegalArgumentException("Invalid column or type [index $index]")
- }
-
- override fun setInstant(
- index: Int,
- value: Instant,
- ) {
- check(localIsActive) { "No active row" }
- when (index) {
- colStartTime -> localStartTime = value
- colStopTime -> localStopTime = value
- else -> throw IllegalArgumentException("Invalid column index $index")
- }
- }
-
- override fun setDuration(
- index: Int,
- value: Duration,
- ) {
- throw IllegalArgumentException("Invalid column or type [index $index]")
- }
-
- override fun <T> setList(
- index: Int,
- value: List<T>,
- ) {
- throw IllegalArgumentException("Invalid column or type [index $index]")
- }
-
- override fun <T> setSet(
- index: Int,
- value: Set<T>,
- ) {
- throw IllegalArgumentException("Invalid column or type [index $index]")
- }
-
- override fun <K, V> setMap(
- index: Int,
- value: Map<K, V>,
- ) {
- throw IllegalArgumentException("Invalid column or type [index $index]")
- }
-
- override fun flush() {
- // Not available
- }
-
- override fun close() {
- writer.close()
- }
-
- private val colID = 0
- private val colStartTime = 1
- private val colStopTime = 2
- private val colCpuCount = 3
- private val colCpuCapacity = 4
- private val colMemCapacity = 5
-}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
deleted file mode 100644
index 9abe872f..00000000
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.opendc
-
-import com.fasterxml.jackson.core.JsonEncoding
-import com.fasterxml.jackson.core.JsonFactory
-import org.apache.parquet.column.ParquetProperties
-import org.apache.parquet.hadoop.ParquetFileWriter
-import org.apache.parquet.hadoop.metadata.CompressionCodecName
-import org.opendc.trace.TableColumn
-import org.opendc.trace.TableColumnType
-import org.opendc.trace.TableReader
-import org.opendc.trace.TableWriter
-import org.opendc.trace.conv.INTERFERENCE_GROUP_MEMBERS
-import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE
-import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET
-import org.opendc.trace.conv.TABLE_INTERFERENCE_GROUPS
-import org.opendc.trace.conv.TABLE_RESOURCES
-import org.opendc.trace.conv.TABLE_RESOURCE_STATES
-import org.opendc.trace.conv.resourceCpuCapacity
-import org.opendc.trace.conv.resourceCpuCount
-import org.opendc.trace.conv.resourceID
-import org.opendc.trace.conv.resourceMemCapacity
-import org.opendc.trace.conv.resourceStartTime
-import org.opendc.trace.conv.resourceStateCpuUsage
-import org.opendc.trace.conv.resourceStateDuration
-import org.opendc.trace.conv.resourceStateTimestamp
-import org.opendc.trace.conv.resourceStopTime
-import org.opendc.trace.opendc.parquet.ResourceReadSupport
-import org.opendc.trace.opendc.parquet.ResourceStateReadSupport
-import org.opendc.trace.opendc.parquet.ResourceStateWriteSupport
-import org.opendc.trace.opendc.parquet.ResourceWriteSupport
-import org.opendc.trace.spi.TableDetails
-import org.opendc.trace.spi.TraceFormat
-import org.opendc.trace.util.parquet.LocalParquetReader
-import org.opendc.trace.util.parquet.LocalParquetWriter
-import java.nio.file.Files
-import java.nio.file.Path
-import kotlin.io.path.exists
-
-/**
- * A [TraceFormat] implementation of the OpenDC virtual machine trace format.
- */
-public class OdcVmTraceFormat : TraceFormat {
- /**
- * A [JsonFactory] that is used to parse the JSON-based interference model.
- */
- private val jsonFactory = JsonFactory()
-
- /**
- * The name of this trace format.
- */
- override val name: String = "opendc-vm"
-
- override fun create(path: Path) {
- // Construct directory containing the trace files
- Files.createDirectories(path)
-
- val tables = getTables(path)
-
- for (table in tables) {
- val writer = newWriter(path, table)
- writer.close()
- }
- }
-
- override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES, TABLE_INTERFERENCE_GROUPS)
-
- override fun getDetails(
- path: Path,
- table: String,
- ): TableDetails {
- return when (table) {
- TABLE_RESOURCES ->
- TableDetails(
- listOf(
- TableColumn(resourceID, TableColumnType.String),
- TableColumn(resourceStartTime, TableColumnType.Instant),
- TableColumn(resourceStopTime, TableColumnType.Instant),
- TableColumn(resourceCpuCount, TableColumnType.Int),
- TableColumn(resourceCpuCapacity, TableColumnType.Double),
- TableColumn(resourceMemCapacity, TableColumnType.Double),
- ),
- )
- TABLE_RESOURCE_STATES ->
- TableDetails(
- listOf(
- TableColumn(resourceID, TableColumnType.String),
- TableColumn(resourceStateTimestamp, TableColumnType.Instant),
- TableColumn(resourceStateDuration, TableColumnType.Duration),
- TableColumn(resourceCpuCount, TableColumnType.Int),
- TableColumn(resourceStateCpuUsage, TableColumnType.Double),
- ),
- )
- TABLE_INTERFERENCE_GROUPS ->
- TableDetails(
- listOf(
- TableColumn(INTERFERENCE_GROUP_MEMBERS, TableColumnType.Set(TableColumnType.String)),
- TableColumn(INTERFERENCE_GROUP_TARGET, TableColumnType.Double),
- TableColumn(INTERFERENCE_GROUP_SCORE, TableColumnType.Double),
- ),
- )
- else -> throw IllegalArgumentException("Table $table not supported")
- }
- }
-
- override fun newReader(
- path: Path,
- table: String,
- projection: List<String>?,
- ): TableReader {
- return when (table) {
- TABLE_RESOURCES -> {
- val reader = LocalParquetReader(path.resolve("meta.parquet"), ResourceReadSupport(projection))
- OdcVmResourceTableReader(reader)
- }
- TABLE_RESOURCE_STATES -> {
- val reader = LocalParquetReader(path.resolve("trace.parquet"), ResourceStateReadSupport(projection))
- OdcVmResourceStateTableReader(reader)
- }
- TABLE_INTERFERENCE_GROUPS -> {
- val modelPath = path.resolve("interference-model.json")
- val parser =
- if (modelPath.exists()) {
- jsonFactory.createParser(modelPath.toFile())
- } else {
- jsonFactory.createParser("[]") // If model does not exist, return empty model
- }
-
- OdcVmInterferenceJsonTableReader(parser)
- }
- else -> throw IllegalArgumentException("Table $table not supported")
- }
- }
-
- override fun newWriter(
- path: Path,
- table: String,
- ): TableWriter {
- return when (table) {
- TABLE_RESOURCES -> {
- val writer =
- LocalParquetWriter.builder(path.resolve("meta.parquet"), ResourceWriteSupport())
- .withCompressionCodec(CompressionCodecName.ZSTD)
- .withPageWriteChecksumEnabled(true)
- .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
- .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
- .build()
- OdcVmResourceTableWriter(writer)
- }
- TABLE_RESOURCE_STATES -> {
- val writer =
- LocalParquetWriter.builder(path.resolve("trace.parquet"), ResourceStateWriteSupport())
- .withCompressionCodec(CompressionCodecName.ZSTD)
- .withDictionaryEncoding("id", true)
- .withBloomFilterEnabled("id", true)
- .withPageWriteChecksumEnabled(true)
- .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
- .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
- .build()
- OdcVmResourceStateTableWriter(writer)
- }
- TABLE_INTERFERENCE_GROUPS -> {
- val generator = jsonFactory.createGenerator(path.resolve("interference-model.json").toFile(), JsonEncoding.UTF8)
- OdcVmInterferenceJsonTableWriter(generator)
- }
- else -> throw IllegalArgumentException("Table $table not supported")
- }
- }
-}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt
deleted file mode 100644
index 13eefe72..00000000
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.opendc.parquet
-
-import java.time.Instant
-
-/**
- * A description of a resource in a trace.
- */
-internal data class Resource(
- val id: String,
- val startTime: Instant,
- val stopTime: Instant,
- val cpuCount: Int,
- val cpuCapacity: Double,
- val memCapacity: Double,
-)
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt
deleted file mode 100644
index 8bada02e..00000000
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.opendc.parquet
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.parquet.hadoop.api.InitContext
-import org.apache.parquet.hadoop.api.ReadSupport
-import org.apache.parquet.io.api.RecordMaterializer
-import org.apache.parquet.schema.LogicalTypeAnnotation
-import org.apache.parquet.schema.MessageType
-import org.apache.parquet.schema.PrimitiveType
-import org.apache.parquet.schema.Types
-import org.opendc.trace.TableColumn
-import org.opendc.trace.conv.resourceCpuCapacity
-import org.opendc.trace.conv.resourceCpuCount
-import org.opendc.trace.conv.resourceID
-import org.opendc.trace.conv.resourceMemCapacity
-import org.opendc.trace.conv.resourceStartTime
-import org.opendc.trace.conv.resourceStopTime
-
-/**
- * A [ReadSupport] instance for [Resource] objects.
- */
-internal class ResourceReadSupport(private val projection: List<String>?) : ReadSupport<Resource>() {
- /**
- * Mapping from field names to [TableColumn]s.
- */
- private val fieldMap =
- mapOf(
- "id" to resourceID,
- "submissionTime" to resourceStartTime,
- "start_time" to resourceStartTime,
- "endTime" to resourceStopTime,
- "stop_time" to resourceStopTime,
- "maxCores" to resourceCpuCount,
- "cpu_count" to resourceCpuCount,
- "cpu_capacity" to resourceCpuCapacity,
- "requiredMemory" to resourceMemCapacity,
- "mem_capacity" to resourceMemCapacity,
- )
-
- override fun init(context: InitContext): ReadContext {
- val projectedSchema =
- if (projection != null) {
- Types.buildMessage()
- .apply {
- val projectionSet = projection.toSet()
-
- for (field in READ_SCHEMA.fields) {
- val col = fieldMap[field.name] ?: continue
- if (col in projectionSet) {
- addField(field)
- }
- }
- }
- .named(READ_SCHEMA.name)
- } else {
- READ_SCHEMA
- }
-
- return ReadContext(projectedSchema)
- }
-
- override fun prepareForRead(
- configuration: Configuration,
- keyValueMetaData: Map<String, String>,
- fileSchema: MessageType,
- readContext: ReadContext,
- ): RecordMaterializer<Resource> = ResourceRecordMaterializer(readContext.requestedSchema)
-
- companion object {
- /**
- * Parquet read schema (version 2.0) for the "resources" table in the trace.
- */
- @JvmStatic
- val READ_SCHEMA_V2_0: MessageType =
- Types.buildMessage()
- .addFields(
- Types
- .required(PrimitiveType.PrimitiveTypeName.BINARY)
- .`as`(LogicalTypeAnnotation.stringType())
- .named("id"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("submissionTime"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("endTime"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("maxCores"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("requiredMemory"),
- )
- .named("resource")
-
- /**
- * Parquet read schema (version 2.1) for the "resources" table in the trace.
- */
- @JvmStatic
- val READ_SCHEMA_V2_1: MessageType =
- Types.buildMessage()
- .addFields(
- Types
- .required(PrimitiveType.PrimitiveTypeName.BINARY)
- .`as`(LogicalTypeAnnotation.stringType())
- .named("id"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("start_time"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("stop_time"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("cpu_count"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("cpu_capacity"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("mem_capacity"),
- )
- .named("resource")
-
- /**
- * Parquet read schema for the "resources" table in the trace.
- */
- @JvmStatic
- val READ_SCHEMA: MessageType =
- READ_SCHEMA_V2_0
- .union(READ_SCHEMA_V2_1)
- }
-}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt
deleted file mode 100644
index 6e2afa7a..00000000
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.opendc.parquet
-
-import org.apache.parquet.io.api.Binary
-import org.apache.parquet.io.api.Converter
-import org.apache.parquet.io.api.GroupConverter
-import org.apache.parquet.io.api.PrimitiveConverter
-import org.apache.parquet.io.api.RecordMaterializer
-import org.apache.parquet.schema.MessageType
-import java.time.Instant
-
-/**
- * A [RecordMaterializer] for [Resource] records.
- */
-internal class ResourceRecordMaterializer(schema: MessageType) : RecordMaterializer<Resource>() {
- /**
- * State of current record being read.
- */
- private var localId = ""
- private var localStartTime = Instant.MIN
- private var localStopTime = Instant.MIN
- private var localCpuCount = 0
- private var localCpuCapacity = 0.0
- private var localMemCapacity = 0.0
-
- /**
- * Root converter for the record.
- */
- private val root =
- object : GroupConverter() {
- /**
- * The converters for the columns of the schema.
- */
- private val converters =
- schema.fields.map { type ->
- when (type.name) {
- "id" ->
- object : PrimitiveConverter() {
- override fun addBinary(value: Binary) {
- localId = value.toStringUsingUTF8()
- }
- }
- "start_time", "submissionTime" ->
- object : PrimitiveConverter() {
- override fun addLong(value: Long) {
- localStartTime = Instant.ofEpochMilli(value)
- }
- }
- "stop_time", "endTime" ->
- object : PrimitiveConverter() {
- override fun addLong(value: Long) {
- localStopTime = Instant.ofEpochMilli(value)
- }
- }
- "cpu_count", "maxCores" ->
- object : PrimitiveConverter() {
- override fun addInt(value: Int) {
- localCpuCount = value
- }
- }
- "cpu_capacity" ->
- object : PrimitiveConverter() {
- override fun addDouble(value: Double) {
- localCpuCapacity = value
- }
- }
- "mem_capacity", "requiredMemory" ->
- object : PrimitiveConverter() {
- override fun addDouble(value: Double) {
- localMemCapacity = value
- }
-
- override fun addLong(value: Long) {
- localMemCapacity = value.toDouble()
- }
- }
- else -> error("Unknown column $type")
- }
- }
-
- override fun start() {
- localId = ""
- localStartTime = Instant.MIN
- localStopTime = Instant.MIN
- localCpuCount = 0
- localCpuCapacity = 0.0
- localMemCapacity = 0.0
- }
-
- override fun end() {}
-
- override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex]
- }
-
- override fun getCurrentRecord(): Resource =
- Resource(
- localId,
- localStartTime,
- localStopTime,
- localCpuCount,
- localCpuCapacity,
- localMemCapacity,
- )
-
- override fun getRootConverter(): GroupConverter = root
-}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt
deleted file mode 100644
index 483f444c..00000000
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.opendc.parquet
-
-import java.time.Duration
-import java.time.Instant
-
-internal class ResourceState(
- val id: String,
- val timestamp: Instant,
- val duration: Duration,
- val cpuCount: Int,
- val cpuUsage: Double,
-)
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt
deleted file mode 100644
index 21e206a9..00000000
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.opendc.parquet
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.parquet.hadoop.api.InitContext
-import org.apache.parquet.hadoop.api.ReadSupport
-import org.apache.parquet.io.api.RecordMaterializer
-import org.apache.parquet.schema.LogicalTypeAnnotation
-import org.apache.parquet.schema.MessageType
-import org.apache.parquet.schema.PrimitiveType
-import org.apache.parquet.schema.Types
-import org.opendc.trace.TableColumn
-import org.opendc.trace.conv.resourceCpuCount
-import org.opendc.trace.conv.resourceID
-import org.opendc.trace.conv.resourceStateCpuUsage
-import org.opendc.trace.conv.resourceStateDuration
-import org.opendc.trace.conv.resourceStateTimestamp
-
-/**
- * A [ReadSupport] instance for [ResourceState] objects.
- */
-internal class ResourceStateReadSupport(private val projection: List<String>?) : ReadSupport<ResourceState>() {
- /**
- * Mapping from field names to [TableColumn]s.
- */
- private val fieldMap =
- mapOf(
- "id" to resourceID,
- "time" to resourceStateTimestamp,
- "timestamp" to resourceStateTimestamp,
- "duration" to resourceStateDuration,
- "cores" to resourceCpuCount,
- "cpu_count" to resourceCpuCount,
- "cpuUsage" to resourceStateCpuUsage,
- "cpu_usage" to resourceStateCpuUsage,
- )
-
- override fun init(context: InitContext): ReadContext {
- val projectedSchema =
- if (projection != null) {
- Types.buildMessage()
- .apply {
- val projectionSet = projection.toSet()
-
- for (field in READ_SCHEMA.fields) {
- val col = fieldMap[field.name] ?: continue
- if (col in projectionSet) {
- addField(field)
- }
- }
- }
- .named(READ_SCHEMA.name)
- } else {
- READ_SCHEMA
- }
-
- return ReadContext(projectedSchema)
- }
-
- override fun prepareForRead(
- configuration: Configuration,
- keyValueMetaData: Map<String, String>,
- fileSchema: MessageType,
- readContext: ReadContext,
- ): RecordMaterializer<ResourceState> = ResourceStateRecordMaterializer(readContext.requestedSchema)
-
- companion object {
- /**
- * Parquet read schema (version 2.0) for the "resource states" table in the trace.
- */
- @JvmStatic
- val READ_SCHEMA_V2_0: MessageType =
- Types.buildMessage()
- .addFields(
- Types
- .required(PrimitiveType.PrimitiveTypeName.BINARY)
- .`as`(LogicalTypeAnnotation.stringType())
- .named("id"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("time"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("duration"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("cores"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("cpuUsage"),
- )
- .named("resource_state")
-
- /**
- * Parquet read schema (version 2.1) for the "resource states" table in the trace.
- */
- @JvmStatic
- val READ_SCHEMA_V2_1: MessageType =
- Types.buildMessage()
- .addFields(
- Types
- .required(PrimitiveType.PrimitiveTypeName.BINARY)
- .`as`(LogicalTypeAnnotation.stringType())
- .named("id"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("timestamp"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("duration"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("cpu_count"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("cpu_usage"),
- )
- .named("resource_state")
-
- /**
- * Parquet read schema for the "resource states" table in the trace.
- */
- @JvmStatic
- val READ_SCHEMA: MessageType = READ_SCHEMA_V2_0.union(READ_SCHEMA_V2_1)
- }
-}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt
deleted file mode 100644
index 72d24e78..00000000
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.opendc.parquet
-
-import org.apache.parquet.io.api.Binary
-import org.apache.parquet.io.api.Converter
-import org.apache.parquet.io.api.GroupConverter
-import org.apache.parquet.io.api.PrimitiveConverter
-import org.apache.parquet.io.api.RecordMaterializer
-import org.apache.parquet.schema.MessageType
-import java.time.Duration
-import java.time.Instant
-
-/**
- * A [RecordMaterializer] for [ResourceState] records.
- */
-internal class ResourceStateRecordMaterializer(schema: MessageType) : RecordMaterializer<ResourceState>() {
- /**
- * State of current record being read.
- */
- private var localId = ""
- private var localTimestamp = Instant.MIN
- private var localDuration = Duration.ZERO
- private var localCpuCount = 0
- private var localCpuUsage = 0.0
-
- /**
- * Root converter for the record.
- */
- private val root =
- object : GroupConverter() {
- /**
- * The converters for the columns of the schema.
- */
- private val converters =
- schema.fields.map { type ->
- when (type.name) {
- "id" ->
- object : PrimitiveConverter() {
- override fun addBinary(value: Binary) {
- localId = value.toStringUsingUTF8()
- }
- }
- "timestamp", "time" ->
- object : PrimitiveConverter() {
- override fun addLong(value: Long) {
- localTimestamp = Instant.ofEpochMilli(value)
- }
- }
- "duration" ->
- object : PrimitiveConverter() {
- override fun addLong(value: Long) {
- localDuration = Duration.ofMillis(value)
- }
- }
- "cpu_count", "cores" ->
- object : PrimitiveConverter() {
- override fun addInt(value: Int) {
- localCpuCount = value
- }
- }
- "cpu_usage", "cpuUsage" ->
- object : PrimitiveConverter() {
- override fun addDouble(value: Double) {
- localCpuUsage = value
- }
- }
- "flops" ->
- object : PrimitiveConverter() {
- override fun addLong(value: Long) {
- // Ignore to support v1 format
- }
- }
- else -> error("Unknown column $type")
- }
- }
-
- override fun start() {
- localId = ""
- localTimestamp = Instant.MIN
- localDuration = Duration.ZERO
- localCpuCount = 0
- localCpuUsage = 0.0
- }
-
- override fun end() {}
-
- override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex]
- }
-
- override fun getCurrentRecord(): ResourceState = ResourceState(localId, localTimestamp, localDuration, localCpuCount, localCpuUsage)
-
- override fun getRootConverter(): GroupConverter = root
-}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt
deleted file mode 100644
index 2a6d8c12..00000000
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.opendc.parquet
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.parquet.hadoop.api.WriteSupport
-import org.apache.parquet.io.api.Binary
-import org.apache.parquet.io.api.RecordConsumer
-import org.apache.parquet.schema.LogicalTypeAnnotation
-import org.apache.parquet.schema.MessageType
-import org.apache.parquet.schema.PrimitiveType
-import org.apache.parquet.schema.Types
-
-/**
- * Support for writing [Resource] instances to Parquet format.
- */
-internal class ResourceStateWriteSupport : WriteSupport<ResourceState>() {
- /**
- * The current active record consumer.
- */
- private lateinit var recordConsumer: RecordConsumer
-
- override fun init(configuration: Configuration): WriteContext {
- return WriteContext(WRITE_SCHEMA, emptyMap())
- }
-
- override fun prepareForWrite(recordConsumer: RecordConsumer) {
- this.recordConsumer = recordConsumer
- }
-
- override fun write(record: ResourceState) {
- write(recordConsumer, record)
- }
-
- private fun write(
- consumer: RecordConsumer,
- record: ResourceState,
- ) {
- consumer.startMessage()
-
- consumer.startField("id", 0)
- consumer.addBinary(Binary.fromCharSequence(record.id))
- consumer.endField("id", 0)
-
- consumer.startField("timestamp", 1)
- consumer.addLong(record.timestamp.toEpochMilli())
- consumer.endField("timestamp", 1)
-
- consumer.startField("duration", 2)
- consumer.addLong(record.duration.toMillis())
- consumer.endField("duration", 2)
-
- consumer.startField("cpu_count", 3)
- consumer.addInteger(record.cpuCount)
- consumer.endField("cpu_count", 3)
-
- consumer.startField("cpu_usage", 4)
- consumer.addDouble(record.cpuUsage)
- consumer.endField("cpu_usage", 4)
-
- consumer.endMessage()
- }
-
- companion object {
- /**
- * Parquet schema for the "resource states" table in the trace.
- */
- @JvmStatic
- val WRITE_SCHEMA: MessageType =
- Types.buildMessage()
- .addFields(
- Types
- .required(PrimitiveType.PrimitiveTypeName.BINARY)
- .`as`(LogicalTypeAnnotation.stringType())
- .named("id"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("timestamp"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("duration"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("cpu_count"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("cpu_usage"),
- )
- .named("resource_state")
- }
-}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt
deleted file mode 100644
index ed62e2ce..00000000
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.opendc.parquet
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.parquet.hadoop.api.WriteSupport
-import org.apache.parquet.io.api.Binary
-import org.apache.parquet.io.api.RecordConsumer
-import org.apache.parquet.schema.LogicalTypeAnnotation
-import org.apache.parquet.schema.MessageType
-import org.apache.parquet.schema.PrimitiveType
-import org.apache.parquet.schema.Types
-import kotlin.math.roundToLong
-
-/**
- * Support for writing [Resource] instances to Parquet format.
- */
-internal class ResourceWriteSupport : WriteSupport<Resource>() {
- /**
- * The current active record consumer.
- */
- private lateinit var recordConsumer: RecordConsumer
-
- override fun init(configuration: Configuration): WriteContext {
- return WriteContext(WRITE_SCHEMA, emptyMap())
- }
-
- override fun prepareForWrite(recordConsumer: RecordConsumer) {
- this.recordConsumer = recordConsumer
- }
-
- override fun write(record: Resource) {
- write(recordConsumer, record)
- }
-
- private fun write(
- consumer: RecordConsumer,
- record: Resource,
- ) {
- consumer.startMessage()
-
- consumer.startField("id", 0)
- consumer.addBinary(Binary.fromCharSequence(record.id))
- consumer.endField("id", 0)
-
- consumer.startField("start_time", 1)
- consumer.addLong(record.startTime.toEpochMilli())
- consumer.endField("start_time", 1)
-
- consumer.startField("stop_time", 2)
- consumer.addLong(record.stopTime.toEpochMilli())
- consumer.endField("stop_time", 2)
-
- consumer.startField("cpu_count", 3)
- consumer.addInteger(record.cpuCount)
- consumer.endField("cpu_count", 3)
-
- consumer.startField("cpu_capacity", 4)
- consumer.addDouble(record.cpuCapacity)
- consumer.endField("cpu_capacity", 4)
-
- consumer.startField("mem_capacity", 5)
- consumer.addLong(record.memCapacity.roundToLong())
- consumer.endField("mem_capacity", 5)
-
- consumer.endMessage()
- }
-
- companion object {
- /**
- * Parquet schema for the "resources" table in the trace.
- */
- @JvmStatic
- val WRITE_SCHEMA: MessageType =
- Types.buildMessage()
- .addFields(
- Types
- .required(PrimitiveType.PrimitiveTypeName.BINARY)
- .`as`(LogicalTypeAnnotation.stringType())
- .named("id"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("start_time"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("stop_time"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("cpu_count"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("cpu_capacity"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("mem_capacity"),
- )
- .named("resource")
- }
-}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat b/opendc-trace/opendc-trace-opendc/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat
deleted file mode 100644
index 94094af4..00000000
--- a/opendc-trace/opendc-trace-opendc/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat
+++ /dev/null
@@ -1 +0,0 @@
-org.opendc.trace.opendc.OdcVmTraceFormat