diff options
Diffstat (limited to 'opendc-trace/opendc-trace-api/src/main/kotlin')
55 files changed, 963 insertions, 5179 deletions
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/CarbonIntensityColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/CarbonColumns.kt index de74c4fd..32cdd78b 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/CarbonIntensityColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/CarbonColumns.kt @@ -20,16 +20,16 @@ * SOFTWARE. */ -@file:JvmName("CarbonIntensityColumns") +@file:JvmName("CarbonColumns") package org.opendc.trace.conv /** - * A column containing the task identifier. + * A column containing the timestamp of the carbon intensity measurement. */ -public const val CARBON_INTENSITY_TIMESTAMP: String = "timestamp" +public const val CARBON_TIMESTAMP: String = "timestamp" /** - * A column containing the task identifier. + * A column containing the intensity of the carbon when sampled. */ -public const val CARBON_INTENSITY_VALUE: String = "carbon_intensity" +public const val CARBON_INTENSITY: String = "carbon_intensity" diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/FragmentColumns.kt index fbbfdea9..e0d01ef2 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/FragmentColumns.kt @@ -20,21 +20,26 @@ * SOFTWARE. */ -@file:JvmName("InterferenceGroupColumns") +@file:JvmName("FragmentColumns") package org.opendc.trace.conv /** - * Members of the interference group. + * Duration for the fragment. */ -public const val INTERFERENCE_GROUP_MEMBERS: String = "members" +public const val FRAGMENT_DURATION: String = "duration" /** - * Target load after which the interference occurs. + * Total CPU usage during the fragment in MHz. */ -public const val INTERFERENCE_GROUP_TARGET: String = "target" +public const val FRAGMENT_CPU_USAGE: String = "cpu_usage" /** - * Performance score when the interference occurs. + * Total GPU usage during the fragment in MHz. */ -public const val INTERFERENCE_GROUP_SCORE: String = "score" +public const val FRAGMENT_GPU_USAGE: String = "gpu_usage" + +/** + * Memory usage during the fragment in KB. + */ +public const val FRAGMENT_MEM_USAGE: String = "mem_usage" diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt deleted file mode 100644 index 3d0341b2..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -@file:JvmName("ResourceColumns") - -package org.opendc.trace.conv - -/** - * Identifier of the resource. - */ -@JvmField -public val resourceID: String = "id" - -/** - * The cluster to which the resource belongs. - */ -@JvmField -public val resourceClusterID: String = "cluster_id" - -/** - * Start time for the resource. - */ -@JvmField -public val resourceSubmissionTime: String = "submission_time" - -/** - * Carbon intensity of the resource. - */ -@JvmField -public val resourceCarbonIntensity: String = "carbon_intensity" - -/** - * End time for the resource. - */ -@JvmField -public val resourceDuration: String = "duration" - -/** - * Number of CPUs for the resource. - */ -@JvmField -public val resourceCpuCount: String = "cpu_count" - -/** - * Total CPU capacity of the resource in MHz. - */ -@JvmField -public val resourceCpuCapacity: String = "cpu_capacity" - -/** - * Memory capacity for the resource in KB. - */ -@JvmField -public val resourceMemCapacity: String = "mem_capacity" - -/** - * Number of GPU cores for the resource. - */ -@JvmField -public val resourceGpuCount: String = "gpu_count" - -/** - * Total GPU capacity of the resource in MHz. - */ -@JvmField -public val resourceGpuCapacity: String = "gpu_capacity" - -/** - * Total GPU memory capacity of the resource in MB. - */ -@JvmField -public val resourceGpuMemCapacity: String = "gpu_mem_capacity" - -/** - * The parents of the resource that need to be completed before this resource can be used. - */ -@JvmField -public val resourceParents: String = "parents" - -/** - * The children of the resource that cannot be started before this is completed. - */ -@JvmField -public val resourceChildren: String = "children" - -/** - * Nature of the task. Delayable, interruptible, etc. - */ -@JvmField -public val resourceNature: String = "nature" - -/** - * Deadline of the task. - */ -@JvmField -public val resourceDeadline: String = "deadline" diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt deleted file mode 100644 index f4ab7759..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -@file:JvmName("ResourceStateColumns") - -package org.opendc.trace.conv - -/** - * The timestamp at which the state was recorded. - */ -@JvmField -public val resourceStateTimestamp: String = "timestamp" - -/** - * Duration for the state. - */ -@JvmField -public val resourceStateDuration: String = "duration" - -/** - * A flag to indicate that the resource is powered on. - */ -@JvmField -public val resourceStatePoweredOn: String = "powered_on" - -/** - * Total CPU usage of the resource in MHz. - */ -@JvmField -public val resourceStateCpuUsage: String = "cpu_usage" - -/** - * Total CPU usage of the resource in percentage. - */ -@JvmField -public val resourceStateCpuUsagePct: String = "cpu_usage_pct" - -/** - * Total CPU demand of the resource in MHz. - */ -@JvmField -public val resourceStateCpuDemand: String = "cpu_demand" - -/** - * CPU ready percentage. - */ -@JvmField -public val resourceStateCpuReadyPct: String = "cpu_ready_pct" - -/** - * Memory usage of the resource in KB. - */ -@JvmField -public val resourceStateMemUsage: String = "mem_usage" - -/** - * Disk read throughput of the resource in KB/s. - */ -@JvmField -public val resourceStateDiskRead: String = "disk_read" - -/** - * Disk write throughput of the resource in KB/s. - */ -@JvmField -public val resourceStateDiskWrite: String = "disk_write" - -/** - * Network receive throughput of the resource in KB/s. - */ -@JvmField -public val resourceStateNetRx: String = "net_rx" - -/** - * Network transmit throughput of the resource in KB/s. - */ -@JvmField -public val resourceStateNetTx: String = "net_tx" - -/** - * Total GPU capacity of the resource in MHz. - */ -@JvmField -public val resourceStateGpuUsage: String = "gpu_usage" diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/Tables.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/Tables.kt index d4019f73..310d268a 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/Tables.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/Tables.kt @@ -25,30 +25,21 @@ package org.opendc.trace.conv /** - * A table containing all workflows in a workload. - */ -public const val TABLE_WORKFLOWS: String = "workflows" - -/** * A table containing all tasks in a workload. */ public const val TABLE_TASKS: String = "tasks" /** - * A table containing all resources in a workload. + * A table containing all resource states in a workload. */ -public const val TABLE_RESOURCES: String = "resources" +public const val TABLE_FRAGMENTS: String = "fragments" /** - * A table containing all resource states in a workload. + * A table containing the carbon intensities of the region */ -public const val TABLE_RESOURCE_STATES: String = "resource_states" +public const val TABLE_CARBON: String = "carbon" /** - * A table containing the groups of resources that interfere when run on the same execution platform. + * A table containing failures that can be injected during simulation. */ -public const val TABLE_INTERFERENCE_GROUPS: String = "interference_groups" - -public const val TABLE_CARBON_INTENSITIES: String = "carbon_intensities" - public const val TABLE_FAILURES: String = "failures" diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt index 6ca87a60..0df52c71 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt @@ -25,61 +25,71 @@ package org.opendc.trace.conv /** - * A column containing the task identifier. + * Identifier of the task. */ public const val TASK_ID: String = "id" /** - * A column containing the identifier of the workflow. + * Identifier of the task. */ -public const val TASK_WORKFLOW_ID: String = "workflow_id" +public const val TASK_NAME: String = "name" /** - * A column containing the submission time of the task. + * The time of submission of the task. */ -public const val TASK_SUBMIT_TIME: String = "submit_time" +public const val TASK_SUBMISSION_TIME: String = "submission_time" /** - * A column containing the wait time of the task. + * The duration of a task in ms */ -public const val TASK_WAIT_TIME: String = "wait_time" +public const val TASK_DURATION: String = "duration" /** - * A column containing the runtime time of the task. + * Number of CPUs for the task. */ -public const val TASK_RUNTIME: String = "runtime" +public const val TASK_CPU_COUNT: String = "cpu_count" /** - * A column containing the parents of a task. + * Total CPU capacity of the task in MHz. */ -public const val TASK_PARENTS: String = "parents" +public const val TASK_CPU_CAPACITY: String = "cpu_capacity" /** - * A column containing the children of a task. + * Memory capacity for the task in KB. */ -public const val TASK_CHILDREN: String = "children" +public const val TASK_MEM_CAPACITY: String = "mem_capacity" + +/** + * Number of GPU cores for the task. + */ +public const val TASK_GPU_COUNT: String = "gpu_count" /** - * A column containing the requested CPUs of a task. + * Total GPU capacity of the task in MHz. */ -public const val TASK_REQ_NCPUS: String = "req_ncpus" +public const val TASK_GPU_CAPACITY: String = "gpu_capacity" /** - * A column containing the allocated CPUs of a task. + * Total GPU memory capacity of the task in MB. */ -public const val TASK_ALLOC_NCPUS: String = "alloc_ncpus" +public const val TASK_GPU_MEM_CAPACITY: String = "gpu_mem_capacity" /** - * A column containing the status of a task. + * The parents of the task that need to be completed before this task can be used. */ -public const val TASK_STATUS: String = "status" +public const val TASK_PARENTS: String = "parents" + +/** + * The children of the task that cannot be started before this is completed. + */ +public const val TASK_CHILDREN: String = "children" /** - * A column containing the group id of a task. + * Nature of the task. Delayable, interruptible, etc. */ -public const val TASK_GROUP_ID: String = "group_id" +public const val TASK_NATURE: String = "nature" /** - * A column containing the user id of a task. + * Deadline of the task. */ -public const val TASK_USER_ID: String = "user_id" +public const val TASK_DEADLINE: String = "deadline" diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureResourceStateTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureResourceStateTableReader.kt deleted file mode 100644 index bcf6ff52..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureResourceStateTableReader.kt +++ /dev/null @@ -1,219 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.azure - -import com.fasterxml.jackson.core.JsonToken -import com.fasterxml.jackson.dataformat.csv.CsvParser -import com.fasterxml.jackson.dataformat.csv.CsvSchema -import org.opendc.trace.TableReader -import org.opendc.trace.conv.resourceID -import org.opendc.trace.conv.resourceStateCpuUsagePct -import org.opendc.trace.conv.resourceStateTimestamp -import java.time.Duration -import java.time.Instant -import java.util.UUID - -/** - * A [TableReader] for the Azure v1 VM resource state table. - */ -internal class AzureResourceStateTableReader(private val parser: CsvParser) : TableReader { - /** - * A flag to indicate whether a single row has been read already. - */ - private var isStarted = false - - init { - parser.schema = schema - } - - override fun nextRow(): Boolean { - if (!isStarted) { - isStarted = true - } - - reset() - - if (!nextStart()) { - return false - } - - while (true) { - val token = parser.nextValue() - - if (token == null || token == JsonToken.END_OBJECT) { - break - } - - when (parser.currentName) { - "timestamp" -> timestamp = Instant.ofEpochSecond(parser.longValue) - "vm id" -> id = parser.text - "CPU avg cpu" -> cpuUsagePct = (parser.doubleValue / 100.0) // Convert from % to [0, 1] - } - } - - return true - } - - private val colID = 0 - private val colTimestamp = 1 - private val colCpuUsagePct = 2 - - override fun resolve(name: String): Int { - return when (name) { - resourceID -> colID - resourceStateTimestamp -> colTimestamp - resourceStateCpuUsagePct -> colCpuUsagePct - else -> -1 - } - } - - override fun isNull(index: Int): Boolean { - require(index in 0..colCpuUsagePct) { "Invalid column index" } - return false - } - - override fun getBoolean(index: Int): Boolean { - throw IllegalArgumentException("Invalid column") - } - - override fun getInt(index: Int): Int { - throw IllegalArgumentException("Invalid column") - } - - override fun getLong(index: Int): Long { - throw IllegalArgumentException("Invalid column") - } - - override fun getFloat(index: Int): Float { - throw IllegalArgumentException("Invalid column") - } - - override fun getDouble(index: Int): Double { - checkActive() - return when (index) { - colCpuUsagePct -> cpuUsagePct - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getString(index: Int): String? { - checkActive() - return when (index) { - colID -> id - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getUUID(index: Int): UUID? { - throw IllegalArgumentException("Invalid column") - } - - override fun getInstant(index: Int): Instant? { - checkActive() - return when (index) { - colTimestamp -> timestamp - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getDuration(index: Int): Duration? { - throw IllegalArgumentException("Invalid column") - } - - override fun <T> getList( - index: Int, - elementType: Class<T>, - ): List<T>? { - throw IllegalArgumentException("Invalid column") - } - - override fun <K, V> getMap( - index: Int, - keyType: Class<K>, - valueType: Class<V>, - ): Map<K, V>? { - throw IllegalArgumentException("Invalid column") - } - - override fun <T> getSet( - index: Int, - elementType: Class<T>, - ): Set<T>? { - throw IllegalArgumentException("Invalid column") - } - - override fun close() { - parser.close() - } - - /** - * Helper method to check if the reader is active. - */ - private fun checkActive() { - check(isStarted && !parser.isClosed) { "No active row. Did you call nextRow()?" } - } - - /** - * Advance the parser until the next object start. - */ - private fun nextStart(): Boolean { - var token = parser.nextValue() - - while (token != null && token != JsonToken.START_OBJECT) { - token = parser.nextValue() - } - - return token != null - } - - /** - * State fields of the reader. - */ - private var id: String? = null - private var timestamp: Instant? = null - private var cpuUsagePct = Double.NaN - - /** - * Reset the state. - */ - private fun reset() { - id = null - timestamp = null - cpuUsagePct = Double.NaN - } - - companion object { - /** - * The [CsvSchema] that is used to parse the trace. - */ - private val schema = - CsvSchema.builder() - .addColumn("timestamp", CsvSchema.ColumnType.NUMBER) - .addColumn("vm id", CsvSchema.ColumnType.STRING) - .addColumn("CPU min cpu", CsvSchema.ColumnType.NUMBER) - .addColumn("CPU max cpu", CsvSchema.ColumnType.NUMBER) - .addColumn("CPU avg cpu", CsvSchema.ColumnType.NUMBER) - .setAllowComments(true) - .build() - } -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureResourceTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureResourceTableReader.kt deleted file mode 100644 index 55f26fa6..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureResourceTableReader.kt +++ /dev/null @@ -1,246 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.azure - -import com.fasterxml.jackson.core.JsonToken -import com.fasterxml.jackson.dataformat.csv.CsvParser -import com.fasterxml.jackson.dataformat.csv.CsvSchema -import org.opendc.trace.TableReader -import org.opendc.trace.conv.resourceCpuCount -import org.opendc.trace.conv.resourceDuration -import org.opendc.trace.conv.resourceID -import org.opendc.trace.conv.resourceMemCapacity -import org.opendc.trace.conv.resourceSubmissionTime -import java.time.Duration -import java.time.Instant -import java.util.UUID - -/** - * A [TableReader] for the Azure v1 VM resources table. - */ -internal class AzureResourceTableReader(private val parser: CsvParser) : TableReader { - /** - * A flag to indicate whether a single row has been read already. - */ - private var isStarted = false - - init { - parser.schema = schema - } - - override fun nextRow(): Boolean { - if (!isStarted) { - isStarted = true - } - - reset() - - if (!nextStart()) { - return false - } - - while (true) { - val token = parser.nextValue() - - if (token == null || token == JsonToken.END_OBJECT) { - break - } - - when (parser.currentName) { - "vm id" -> id = parser.text - "timestamp vm created" -> startTime = Instant.ofEpochSecond(parser.longValue) - "timestamp vm deleted" -> stopTime = Instant.ofEpochSecond(parser.longValue) - "vm virtual core count" -> cpuCores = parser.intValue - "vm memory" -> memCapacity = parser.doubleValue * 1e6 // GB to KB - } - } - - return true - } - - private val colID = 0 - private val colStartTime = 1 - private val colStopTime = 2 - private val colCpuCount = 3 - private val colMemCapacity = 4 - - override fun resolve(name: String): Int { - return when (name) { - resourceID -> colID - resourceSubmissionTime -> colStartTime - resourceDuration -> colStopTime - resourceCpuCount -> colCpuCount - resourceMemCapacity -> colMemCapacity - else -> -1 - } - } - - override fun isNull(index: Int): Boolean { - require(index in 0..colMemCapacity) { "Invalid column index" } - return false - } - - override fun getBoolean(index: Int): Boolean { - throw IllegalArgumentException("Invalid column") - } - - override fun getInt(index: Int): Int { - checkActive() - return when (index) { - colCpuCount -> cpuCores - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getLong(index: Int): Long { - checkActive() - return when (index) { - colCpuCount -> cpuCores.toLong() - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getFloat(index: Int): Float { - throw IllegalArgumentException("Invalid column") - } - - override fun getDouble(index: Int): Double { - checkActive() - return when (index) { - colMemCapacity -> memCapacity - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getString(index: Int): String? { - checkActive() - return when (index) { - colID -> id - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getUUID(index: Int): UUID? { - throw IllegalArgumentException("Invalid column") - } - - override fun getInstant(index: Int): Instant? { - checkActive() - return when (index) { - colStartTime -> startTime - colStopTime -> stopTime - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getDuration(index: Int): Duration? { - throw IllegalArgumentException("Invalid column") - } - - override fun <T> getList( - index: Int, - elementType: Class<T>, - ): List<T>? { - throw IllegalArgumentException("Invalid column") - } - - override fun <T> getSet( - index: Int, - elementType: Class<T>, - ): Set<T>? { - throw IllegalArgumentException("Invalid column") - } - - override fun <K, V> getMap( - index: Int, - keyType: Class<K>, - valueType: Class<V>, - ): Map<K, V>? { - throw IllegalArgumentException("Invalid column") - } - - override fun close() { - parser.close() - } - - /** - * Helper method to check if the reader is active. - */ - private fun checkActive() { - check(isStarted && !parser.isClosed) { "No active row. Did you call nextRow()?" } - } - - /** - * Advance the parser until the next object start. - */ - private fun nextStart(): Boolean { - var token = parser.nextValue() - - while (token != null && token != JsonToken.START_OBJECT) { - token = parser.nextValue() - } - - return token != null - } - - /** - * State fields of the reader. - */ - private var id: String? = null - private var startTime: Instant? = null - private var stopTime: Instant? = null - private var cpuCores = -1 - private var memCapacity = Double.NaN - - /** - * Reset the state. - */ - private fun reset() { - id = null - startTime = null - stopTime = null - cpuCores = -1 - memCapacity = Double.NaN - } - - companion object { - /** - * The [CsvSchema] that is used to parse the trace. - */ - private val schema = - CsvSchema.builder() - .addColumn("vm id", CsvSchema.ColumnType.NUMBER) - .addColumn("subscription id", CsvSchema.ColumnType.STRING) - .addColumn("deployment id", CsvSchema.ColumnType.NUMBER) - .addColumn("timestamp vm created", CsvSchema.ColumnType.NUMBER) - .addColumn("timestamp vm deleted", CsvSchema.ColumnType.NUMBER) - .addColumn("max cpu", CsvSchema.ColumnType.NUMBER) - .addColumn("avg cpu", CsvSchema.ColumnType.NUMBER) - .addColumn("p95 cpu", CsvSchema.ColumnType.NUMBER) - .addColumn("vm category", CsvSchema.ColumnType.NUMBER) - .addColumn("vm virtual core count", CsvSchema.ColumnType.NUMBER) - .addColumn("vm memory", CsvSchema.ColumnType.NUMBER) - .setAllowComments(true) - .build() - } -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureTraceFormat.kt deleted file mode 100644 index 7ce1c11a..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureTraceFormat.kt +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.azure - -import com.fasterxml.jackson.dataformat.csv.CsvFactory -import com.fasterxml.jackson.dataformat.csv.CsvParser -import org.opendc.trace.TableColumn -import org.opendc.trace.TableColumnType -import org.opendc.trace.TableReader -import org.opendc.trace.TableWriter -import org.opendc.trace.conv.TABLE_RESOURCES -import org.opendc.trace.conv.TABLE_RESOURCE_STATES -import org.opendc.trace.conv.resourceCpuCount -import org.opendc.trace.conv.resourceDuration -import org.opendc.trace.conv.resourceID -import org.opendc.trace.conv.resourceMemCapacity -import org.opendc.trace.conv.resourceStateCpuUsagePct -import org.opendc.trace.conv.resourceStateTimestamp -import org.opendc.trace.conv.resourceSubmissionTime -import org.opendc.trace.spi.TableDetails -import org.opendc.trace.spi.TraceFormat -import org.opendc.trace.util.CompositeTableReader -import java.nio.file.Files -import java.nio.file.Path -import java.util.stream.Collectors -import java.util.zip.GZIPInputStream -import kotlin.io.path.inputStream -import kotlin.io.path.name - -/** - * A format implementation for the Azure v1 format. - */ -public class AzureTraceFormat : TraceFormat { - /** - * The name of this trace format. - */ - override val name: String = "azure" - - /** - * The [CsvFactory] used to create the parser. - */ - private val factory = - CsvFactory() - .enable(CsvParser.Feature.ALLOW_COMMENTS) - .enable(CsvParser.Feature.TRIM_SPACES) - - override fun create(path: Path) { - throw UnsupportedOperationException("Writing not supported for this format") - } - - override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) - - override fun getDetails( - path: Path, - table: String, - ): TableDetails { - return when (table) { - TABLE_RESOURCES -> - TableDetails( - listOf( - TableColumn(resourceID, TableColumnType.String), - TableColumn(resourceSubmissionTime, TableColumnType.Instant), - TableColumn(resourceDuration, TableColumnType.Instant), - TableColumn(resourceCpuCount, TableColumnType.Int), - TableColumn(resourceMemCapacity, TableColumnType.Double), - ), - ) - TABLE_RESOURCE_STATES -> - TableDetails( - listOf( - TableColumn(resourceID, TableColumnType.String), - TableColumn(resourceStateTimestamp, TableColumnType.Instant), - TableColumn(resourceStateCpuUsagePct, TableColumnType.Double), - ), - ) - else -> throw IllegalArgumentException("Table $table not supported") - } - } - - override fun newReader( - path: Path, - table: String, - projection: List<String>?, - ): TableReader { - return when (table) { - TABLE_RESOURCES -> { - val stream = GZIPInputStream(path.resolve("vmtable/vmtable.csv.gz").inputStream()) - AzureResourceTableReader(factory.createParser(stream)) - } - TABLE_RESOURCE_STATES -> newResourceStateReader(path) - else -> throw IllegalArgumentException("Table $table not supported") - } - } - - override fun newWriter( - path: Path, - table: String, - ): TableWriter { - throw UnsupportedOperationException("Writing not supported for this format") - } - - /** - * Construct a [TableReader] for reading over all VM CPU readings. - */ - private fun newResourceStateReader(path: Path): TableReader { - val partitions = - Files.walk(path.resolve("vm_cpu_readings"), 1) - .filter { !Files.isDirectory(it) && it.name.endsWith(".csv.gz") } - .collect(Collectors.toMap({ it.name.removeSuffix(".csv.gz") }, { it })) - .toSortedMap() - val it = partitions.iterator() - - return object : CompositeTableReader() { - override fun nextReader(): TableReader? { - return if (it.hasNext()) { - val (_, partPath) = it.next() - val stream = GZIPInputStream(partPath.inputStream()) - return AzureResourceStateTableReader(factory.createParser(stream)) - } else { - null - } - } - - override fun toString(): String = "AzureCompositeTableReader" - } - } -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsExResourceStateTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsExResourceStateTableReader.kt deleted file mode 100644 index 8387d1ed..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsExResourceStateTableReader.kt +++ /dev/null @@ -1,292 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.bitbrains - -import org.opendc.trace.TableReader -import org.opendc.trace.conv.resourceClusterID -import org.opendc.trace.conv.resourceCpuCapacity -import org.opendc.trace.conv.resourceCpuCount -import org.opendc.trace.conv.resourceID -import org.opendc.trace.conv.resourceMemCapacity -import org.opendc.trace.conv.resourceStateCpuDemand -import org.opendc.trace.conv.resourceStateCpuReadyPct -import org.opendc.trace.conv.resourceStateCpuUsage -import org.opendc.trace.conv.resourceStateCpuUsagePct -import org.opendc.trace.conv.resourceStateDiskRead -import org.opendc.trace.conv.resourceStateDiskWrite -import org.opendc.trace.conv.resourceStateTimestamp -import java.io.BufferedReader -import java.time.Duration -import java.time.Instant -import java.util.UUID - -/** - * A [TableReader] for the Bitbrains resource state table. - */ -internal class BitbrainsExResourceStateTableReader(private val reader: BufferedReader) : TableReader { - private var state = State.Pending - - override fun nextRow(): Boolean { - val state = state - if (state == State.Closed) { - return false - } else if (state == State.Pending) { - this.state = State.Active - } - - reset() - - var line: String? - var num = 0 - - while (true) { - line = reader.readLine() - - if (line == null) { - this.state = State.Closed - return false - } - - num++ - - if (line[0] == '#' || line.isBlank()) { - // Ignore empty lines or comments - continue - } - - break - } - - line = line!!.trim() - - val length = line.length - var col = 0 - var start: Int - var end = 0 - - while (end < length) { - // Trim all whitespace before the field - start = end - while (start < length && line[start].isWhitespace()) { - start++ - } - - end = line.indexOf(' ', start) - - if (end < 0) { - end = length - } - - val field = line.subSequence(start, end) as String - when (col++) { - colTimestamp -> timestamp = Instant.ofEpochSecond(field.toLong(10)) - colCpuUsage -> cpuUsage = field.toDouble() - colCpuDemand -> cpuDemand = field.toDouble() - colDiskRead -> diskRead = field.toDouble() - colDiskWrite -> diskWrite = field.toDouble() - colClusterID -> cluster = field.trim() - colNcpus -> cpuCores = field.toInt(10) - colCpuReadyPct -> cpuReadyPct = field.toDouble() - colPoweredOn -> poweredOn = field.toInt(10) == 1 - colCpuCapacity -> cpuCapacity = field.toDouble() - colID -> id = field.trim() - colMemCapacity -> memCapacity = field.toDouble() * 1000 // Convert from MB to KB - } - } - - return true - } - - override fun resolve(name: String): Int { - return when (name) { - resourceID -> colID - resourceClusterID -> colClusterID - resourceStateTimestamp -> colTimestamp - resourceCpuCount -> colNcpus - resourceCpuCapacity -> colCpuCapacity - resourceStateCpuUsage -> colCpuUsage - resourceStateCpuUsagePct -> colCpuUsagePct - resourceStateCpuDemand -> colCpuDemand - resourceStateCpuReadyPct -> colCpuReadyPct - resourceMemCapacity -> colMemCapacity - resourceStateDiskRead -> colDiskRead - resourceStateDiskWrite -> colDiskWrite - else -> -1 - } - } - - override fun isNull(index: Int): Boolean { - require(index in 0 until colMax) { "Invalid column index" } - return false - } - - override fun getBoolean(index: Int): Boolean { - check(state == State.Active) { "No active row" } - return when (index) { - colPoweredOn -> poweredOn - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getInt(index: Int): Int { - check(state == State.Active) { "No active row" } - return when (index) { - colNcpus -> cpuCores - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getLong(index: Int): Long { - throw IllegalArgumentException("Invalid column") - } - - override fun getFloat(index: Int): Float { - throw IllegalArgumentException("Invalid column") - } - - override fun getDouble(index: Int): Double { - check(state == State.Active) { "No active row" } - return when (index) { - colCpuCapacity -> cpuCapacity - colCpuUsage -> cpuUsage - colCpuUsagePct -> cpuUsage / cpuCapacity - colCpuReadyPct -> cpuReadyPct - colCpuDemand -> cpuDemand - colMemCapacity -> memCapacity - colDiskRead -> diskRead - colDiskWrite -> diskWrite - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getString(index: Int): String? { - check(state == State.Active) { "No active row" } - return when (index) { - colID -> id - colClusterID -> cluster - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getUUID(index: Int): UUID? { - throw IllegalArgumentException("Invalid column") - } - - override fun getInstant(index: Int): Instant? { - check(state == State.Active) { "No active row" } - return when (index) { - colTimestamp -> timestamp - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getDuration(index: Int): Duration? { - throw IllegalArgumentException("Invalid column") - } - - override fun <T> getList( - index: Int, - elementType: Class<T>, - ): List<T>? { - throw IllegalArgumentException("Invalid column") - } - - override fun <T> getSet( - index: Int, - elementType: Class<T>, - ): Set<T>? { - throw IllegalArgumentException("Invalid column") - } - - override fun <K, V> getMap( - index: Int, - keyType: Class<K>, - valueType: Class<V>, - ): Map<K, V>? { - throw IllegalArgumentException("Invalid column") - } - - override fun close() { - reader.close() - reset() - state = State.Closed - } - - /** - * State fields of the reader. - */ - private var id: String? = null - private var cluster: String? = null - private var timestamp: Instant? = null - private var cpuCores = -1 - private var cpuCapacity = Double.NaN - private var cpuUsage = Double.NaN - private var cpuDemand = Double.NaN - private var cpuReadyPct = Double.NaN - private var memCapacity = Double.NaN - private var diskRead = Double.NaN - private var diskWrite = Double.NaN - private var poweredOn: Boolean = false - - /** - * Reset the state of the reader. - */ - private fun reset() { - id = null - timestamp = null - cluster = null - cpuCores = -1 - cpuCapacity = Double.NaN - cpuUsage = Double.NaN - cpuDemand = Double.NaN - cpuReadyPct = Double.NaN - memCapacity = Double.NaN - diskRead = Double.NaN - diskWrite = Double.NaN - poweredOn = false - } - - /** - * Default column indices for the extended Bitbrains format. - */ - private val colTimestamp = 0 - private val colCpuUsage = 1 - private val colCpuDemand = 2 - private val colDiskRead = 4 - private val colDiskWrite = 6 - private val colClusterID = 10 - private val colNcpus = 12 - private val colCpuReadyPct = 13 - private val colPoweredOn = 14 - private val colCpuCapacity = 18 - private val colID = 19 - private val colMemCapacity = 20 - private val colCpuUsagePct = 21 - private val colMax = colCpuUsagePct + 1 - - private enum class State { - Pending, - Active, - Closed, - } -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsExTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsExTraceFormat.kt deleted file mode 100644 index 6115953f..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsExTraceFormat.kt +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.bitbrains - -import org.opendc.trace.TableColumn -import org.opendc.trace.TableColumnType -import org.opendc.trace.TableReader -import org.opendc.trace.TableWriter -import org.opendc.trace.conv.TABLE_RESOURCE_STATES -import org.opendc.trace.conv.resourceClusterID -import org.opendc.trace.conv.resourceCpuCapacity -import org.opendc.trace.conv.resourceCpuCount -import org.opendc.trace.conv.resourceID -import org.opendc.trace.conv.resourceMemCapacity -import org.opendc.trace.conv.resourceStateCpuDemand -import org.opendc.trace.conv.resourceStateCpuReadyPct -import org.opendc.trace.conv.resourceStateCpuUsage -import org.opendc.trace.conv.resourceStateCpuUsagePct -import org.opendc.trace.conv.resourceStateDiskRead -import org.opendc.trace.conv.resourceStateDiskWrite -import org.opendc.trace.conv.resourceStateTimestamp -import org.opendc.trace.spi.TableDetails -import org.opendc.trace.spi.TraceFormat -import org.opendc.trace.util.CompositeTableReader -import java.nio.file.Files -import java.nio.file.Path -import java.util.stream.Collectors -import kotlin.io.path.bufferedReader -import kotlin.io.path.extension -import kotlin.io.path.nameWithoutExtension - -/** - * A format implementation for the extended Bitbrains trace format. - */ -public class BitbrainsExTraceFormat : TraceFormat { - /** - * The name of this trace format. - */ - override val name: String = "bitbrains-ex" - - override fun create(path: Path) { - throw UnsupportedOperationException("Writing not supported for this format") - } - - override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCE_STATES) - - override fun getDetails( - path: Path, - table: String, - ): TableDetails { - return when (table) { - TABLE_RESOURCE_STATES -> - TableDetails( - listOf( - TableColumn(resourceID, TableColumnType.String), - TableColumn(resourceClusterID, TableColumnType.String), - TableColumn(resourceStateTimestamp, TableColumnType.Instant), - TableColumn(resourceCpuCount, TableColumnType.Int), - TableColumn(resourceCpuCapacity, TableColumnType.Double), - TableColumn(resourceStateCpuUsage, TableColumnType.Double), - TableColumn(resourceStateCpuUsagePct, TableColumnType.Double), - TableColumn(resourceStateCpuDemand, TableColumnType.Double), - TableColumn(resourceStateCpuReadyPct, TableColumnType.Double), - TableColumn(resourceMemCapacity, TableColumnType.Double), - TableColumn(resourceStateDiskRead, TableColumnType.Double), - TableColumn(resourceStateDiskWrite, TableColumnType.Double), - ), - ) - else -> throw IllegalArgumentException("Table $table not supported") - } - } - - override fun newReader( - path: Path, - table: String, - projection: List<String>?, - ): TableReader { - return when (table) { - TABLE_RESOURCE_STATES -> newResourceStateReader(path) - else -> throw IllegalArgumentException("Table $table not supported") - } - } - - override fun newWriter( - path: Path, - table: String, - ): TableWriter { - throw UnsupportedOperationException("Writing not supported for this format") - } - - /** - * Construct a [TableReader] for reading over all resource state partitions. - */ - private fun newResourceStateReader(path: Path): TableReader { - val partitions = - Files.walk(path, 1) - .filter { !Files.isDirectory(it) && it.extension == "txt" } - .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) - .toSortedMap() - val it = partitions.iterator() - - return object : CompositeTableReader() { - override fun nextReader(): TableReader? { - return if (it.hasNext()) { - val (_, partPath) = it.next() - return BitbrainsExResourceStateTableReader(partPath.bufferedReader()) - } else { - null - } - } - - override fun toString(): String = "BitbrainsExCompositeTableReader" - } - } -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsResourceStateTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsResourceStateTableReader.kt deleted file mode 100644 index e264fccb..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsResourceStateTableReader.kt +++ /dev/null @@ -1,365 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.bitbrains - -import com.fasterxml.jackson.core.JsonParseException -import com.fasterxml.jackson.core.JsonToken -import com.fasterxml.jackson.dataformat.csv.CsvParser -import com.fasterxml.jackson.dataformat.csv.CsvSchema -import org.opendc.trace.TableReader -import org.opendc.trace.conv.resourceCpuCapacity -import org.opendc.trace.conv.resourceCpuCount -import org.opendc.trace.conv.resourceID -import org.opendc.trace.conv.resourceMemCapacity -import org.opendc.trace.conv.resourceStateCpuUsage -import org.opendc.trace.conv.resourceStateCpuUsagePct -import org.opendc.trace.conv.resourceStateDiskRead -import org.opendc.trace.conv.resourceStateDiskWrite -import org.opendc.trace.conv.resourceStateMemUsage -import org.opendc.trace.conv.resourceStateNetRx -import org.opendc.trace.conv.resourceStateNetTx -import org.opendc.trace.conv.resourceStateTimestamp -import java.text.NumberFormat -import java.time.Duration -import java.time.Instant -import java.time.LocalDateTime -import java.time.ZoneOffset -import java.time.format.DateTimeFormatter -import java.time.format.DateTimeParseException -import java.util.Locale -import java.util.UUID - -/** - * A [TableReader] for the Bitbrains resource state table. - */ -internal class BitbrainsResourceStateTableReader(private val partition: String, private val parser: CsvParser) : TableReader { - /** - * A flag to indicate whether a single row has been read already. - */ - private var isStarted = false - - /** - * The [DateTimeFormatter] used to parse the timestamps in case of the Materna trace. - */ - private val formatter = DateTimeFormatter.ofPattern("dd.MM.yyyy HH:mm:ss") - - /** - * The type of timestamps in the trace. - */ - private var timestampType: TimestampType = TimestampType.UNDECIDED - - /** - * The [NumberFormat] used to parse doubles containing a comma. - */ - private val nf = NumberFormat.getInstance(Locale.GERMAN) - - /** - * A flag to indicate that the trace contains decimals with a comma separator. - */ - private var usesCommaDecimalSeparator = false - - init { - parser.schema = schema - } - - override fun nextRow(): Boolean { - if (!isStarted) { - isStarted = true - } - - // Reset the row state - reset() - - if (!nextStart()) { - return false - } - - while (true) { - val token = parser.nextValue() - - if (token == null || token == JsonToken.END_OBJECT) { - break - } - - when (parser.currentName) { - "Timestamp [ms]" -> { - timestamp = - when (timestampType) { - TimestampType.UNDECIDED -> { - try { - val res = LocalDateTime.parse(parser.text, formatter).toInstant(ZoneOffset.UTC) - timestampType = TimestampType.DATE_TIME - res - } catch (e: DateTimeParseException) { - timestampType = TimestampType.EPOCH_MILLIS - Instant.ofEpochSecond(parser.longValue) - } - } - TimestampType.DATE_TIME -> LocalDateTime.parse(parser.text, formatter).toInstant(ZoneOffset.UTC) - TimestampType.EPOCH_MILLIS -> Instant.ofEpochSecond(parser.longValue) - } - } - "CPU cores" -> cpuCores = parser.intValue - "CPU capacity provisioned [MHZ]" -> cpuCapacity = parseSafeDouble() - "CPU usage [MHZ]" -> cpuUsage = parseSafeDouble() - "CPU usage [%]" -> cpuUsagePct = parseSafeDouble() / 100.0 // Convert to range [0, 1] - "Memory capacity provisioned [KB]" -> memCapacity = parseSafeDouble() - "Memory usage [KB]" -> memUsage = parseSafeDouble() - "Disk read throughput [KB/s]" -> diskRead = parseSafeDouble() - "Disk write throughput [KB/s]" -> diskWrite = parseSafeDouble() - "Network received throughput [KB/s]" -> netReceived = parseSafeDouble() - "Network transmitted throughput [KB/s]" -> netTransmitted = parseSafeDouble() - } - } - - return true - } - - private val colTimestamp = 0 - private val colCpuCount = 1 - private val colCpuCapacity = 2 - private val colCpuUsage = 3 - private val colCpuUsagePct = 4 - private val colMemCapacity = 5 - private val colMemUsage = 6 - private val colDiskRead = 7 - private val colDiskWrite = 8 - private val colNetRx = 9 - private val colNetTx = 10 - private val colID = 11 - - override fun resolve(name: String): Int { - return when (name) { - resourceID -> colID - resourceStateTimestamp -> colTimestamp - resourceCpuCount -> colCpuCount - resourceCpuCapacity -> colCpuCapacity - resourceStateCpuUsage -> colCpuUsage - resourceStateCpuUsagePct -> colCpuUsagePct - resourceMemCapacity -> colMemCapacity - resourceStateMemUsage -> colMemUsage - resourceStateDiskRead -> colDiskRead - resourceStateDiskWrite -> colDiskWrite - resourceStateNetRx -> colNetRx - resourceStateNetTx -> colNetTx - else -> -1 - } - } - - override fun isNull(index: Int): Boolean { - require(index in 0..colID) { "Invalid column index" } - return false - } - - override fun getBoolean(index: Int): Boolean { - throw IllegalArgumentException("Invalid column") - } - - override fun getInt(index: Int): Int { - checkActive() - return when (index) { - colCpuCount -> cpuCores - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getLong(index: Int): Long { - throw IllegalArgumentException("Invalid column") - } - - override fun getFloat(index: Int): Float { - throw IllegalArgumentException("Invalid column") - } - - override fun getDouble(index: Int): Double { - checkActive() - return when (index) { - colCpuCapacity -> cpuCapacity - colCpuUsage -> cpuUsage - colCpuUsagePct -> cpuUsagePct - colMemCapacity -> memCapacity - colMemUsage -> memUsage - colDiskRead -> diskRead - colDiskWrite -> diskWrite - colNetRx -> netReceived - colNetTx -> netTransmitted - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getString(index: Int): String { - checkActive() - return when (index) { - colID -> partition - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getUUID(index: Int): UUID? { - throw IllegalArgumentException("Invalid column") - } - - override fun getInstant(index: Int): Instant? { - checkActive() - return when (index) { - colTimestamp -> timestamp - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getDuration(index: Int): Duration? { - throw IllegalArgumentException("Invalid column") - } - - override fun <T> getList( - index: Int, - elementType: Class<T>, - ): List<T>? { - throw IllegalArgumentException("Invalid column") - } - - override fun <T> getSet( - index: Int, - elementType: Class<T>, - ): Set<T>? { - throw IllegalArgumentException("Invalid column") - } - - override fun <K, V> getMap( - index: Int, - keyType: Class<K>, - valueType: Class<V>, - ): Map<K, V>? { - throw IllegalArgumentException("Invalid column") - } - - override fun close() { - parser.close() - } - - /** - * Helper method to check if the reader is active. - */ - private fun checkActive() { - check(isStarted && !parser.isClosed) { "No active row. Did you call nextRow()?" } - } - - /** - * Advance the parser until the next object start. - */ - private fun nextStart(): Boolean { - var token = parser.nextValue() - - while (token != null && token != JsonToken.START_OBJECT) { - token = parser.nextValue() - } - - return token != null - } - - /** - * Try to parse the current value safely as double. - */ - private fun parseSafeDouble(): Double { - if (!usesCommaDecimalSeparator) { - try { - return parser.doubleValue - } catch (e: JsonParseException) { - usesCommaDecimalSeparator = true - } - } - - val text = parser.text - if (text.isBlank()) { - return 0.0 - } - - return nf.parse(text).toDouble() - } - - /** - * State fields of the reader. - */ - private var timestamp: Instant? = null - private var cpuCores = -1 - private var cpuCapacity = Double.NaN - private var cpuUsage = Double.NaN - private var cpuUsagePct = Double.NaN - private var memCapacity = Double.NaN - private var memUsage = Double.NaN - private var diskRead = Double.NaN - private var diskWrite = Double.NaN - private var netReceived = Double.NaN - private var netTransmitted = Double.NaN - - /** - * Reset the state. - */ - private fun reset() { - timestamp = null - cpuCores = -1 - cpuCapacity = Double.NaN - cpuUsage = Double.NaN - cpuUsagePct = Double.NaN - memCapacity = Double.NaN - memUsage = Double.NaN - diskRead = Double.NaN - diskWrite = Double.NaN - netReceived = Double.NaN - netTransmitted = Double.NaN - } - - /** - * The type of the timestamp in the trace. - */ - private enum class TimestampType { - UNDECIDED, - DATE_TIME, - EPOCH_MILLIS, - } - - companion object { - /** - * The [CsvSchema] that is used to parse the trace. - */ - private val schema = - CsvSchema.builder() - .addColumn("Timestamp [ms]", CsvSchema.ColumnType.NUMBER_OR_STRING) - .addColumn("CPU cores", CsvSchema.ColumnType.NUMBER) - .addColumn("CPU capacity provisioned [MHZ]", CsvSchema.ColumnType.NUMBER) - .addColumn("CPU usage [MHZ]", CsvSchema.ColumnType.NUMBER) - .addColumn("CPU usage [%]", CsvSchema.ColumnType.NUMBER) - .addColumn("Memory capacity provisioned [KB]", CsvSchema.ColumnType.NUMBER) - .addColumn("Memory usage [KB]", CsvSchema.ColumnType.NUMBER) - .addColumn("Memory usage [%]", CsvSchema.ColumnType.NUMBER) - .addColumn("Disk read throughput [KB/s]", CsvSchema.ColumnType.NUMBER) - .addColumn("Disk write throughput [KB/s]", CsvSchema.ColumnType.NUMBER) - .addColumn("Disk size [GB]", CsvSchema.ColumnType.NUMBER) - .addColumn("Network received throughput [KB/s]", CsvSchema.ColumnType.NUMBER) - .addColumn("Network transmitted throughput [KB/s]", CsvSchema.ColumnType.NUMBER) - .setAllowComments(true) - .setUseHeader(true) - .setColumnSeparator(';') - .build() - } -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsResourceTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsResourceTableReader.kt deleted file mode 100644 index a12785f0..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsResourceTableReader.kt +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.bitbrains - -import com.fasterxml.jackson.dataformat.csv.CsvFactory -import org.opendc.trace.TableReader -import org.opendc.trace.conv.resourceID -import java.nio.file.Path -import java.time.Duration -import java.time.Instant -import java.util.UUID - -/** - * A [TableReader] for the Bitbrains resource table. - */ -internal class BitbrainsResourceTableReader(private val factory: CsvFactory, vms: Map<String, Path>) : TableReader { - /** - * An iterator to iterate over the resource entries. - */ - private val it = vms.iterator() - - /** - * The state of the reader. - */ - private var state = State.Pending - - override fun nextRow(): Boolean { - if (state == State.Pending) { - state = State.Active - } - - reset() - - while (it.hasNext()) { - val (name, path) = it.next() - - val parser = factory.createParser(path.toFile()) - val reader = BitbrainsResourceStateTableReader(name, parser) - val idCol = reader.resolve(resourceID) - - try { - if (!reader.nextRow()) { - continue - } - - id = reader.getString(idCol) - return true - } finally { - reader.close() - } - } - - state = State.Closed - return false - } - - private val colID = 0 - - override fun resolve(name: String): Int { - return when (name) { - resourceID -> colID - else -> -1 - } - } - - override fun isNull(index: Int): Boolean { - require(index in 0..colID) { "Invalid column index" } - return false - } - - override fun getBoolean(index: Int): Boolean { - throw IllegalArgumentException("Invalid column") - } - - override fun getInt(index: Int): Int { - throw IllegalArgumentException("Invalid column") - } - - override fun getLong(index: Int): Long { - throw IllegalArgumentException("Invalid column") - } - - override fun getFloat(index: Int): Float { - throw IllegalArgumentException("Invalid column") - } - - override fun getDouble(index: Int): Double { - throw IllegalArgumentException("Invalid column") - } - - override fun getString(index: Int): String? { - check(state == State.Active) { "No active row" } - return when (index) { - colID -> id - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getUUID(index: Int): UUID? { - throw IllegalArgumentException("Invalid column") - } - - override fun getInstant(index: Int): Instant? { - throw IllegalArgumentException("Invalid column") - } - - override fun getDuration(index: Int): Duration? { - throw IllegalArgumentException("Invalid column") - } - - override fun <T> getList( - index: Int, - elementType: Class<T>, - ): List<T>? { - throw IllegalArgumentException("Invalid column") - } - - override fun <T> getSet( - index: Int, - elementType: Class<T>, - ): Set<T>? { - throw IllegalArgumentException("Invalid column") - } - - override fun <K, V> getMap( - index: Int, - keyType: Class<K>, - valueType: Class<V>, - ): Map<K, V>? { - throw IllegalArgumentException("Invalid column") - } - - override fun close() { - reset() - state = State.Closed - } - - /** - * State fields of the reader. - */ - private var id: String? = null - - /** - * Reset the state of the reader. - */ - private fun reset() { - id = null - } - - private enum class State { - Pending, - Active, - Closed, - } -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsTraceFormat.kt deleted file mode 100644 index 23853077..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsTraceFormat.kt +++ /dev/null @@ -1,159 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.bitbrains - -import com.fasterxml.jackson.dataformat.csv.CsvFactory -import com.fasterxml.jackson.dataformat.csv.CsvParser -import org.opendc.trace.TableColumn -import org.opendc.trace.TableColumnType -import org.opendc.trace.TableReader -import org.opendc.trace.TableWriter -import org.opendc.trace.conv.TABLE_RESOURCES -import org.opendc.trace.conv.TABLE_RESOURCE_STATES -import org.opendc.trace.conv.resourceCpuCapacity -import org.opendc.trace.conv.resourceCpuCount -import org.opendc.trace.conv.resourceID -import org.opendc.trace.conv.resourceMemCapacity -import org.opendc.trace.conv.resourceStateCpuUsage -import org.opendc.trace.conv.resourceStateCpuUsagePct -import org.opendc.trace.conv.resourceStateDiskRead -import org.opendc.trace.conv.resourceStateDiskWrite -import org.opendc.trace.conv.resourceStateMemUsage -import org.opendc.trace.conv.resourceStateNetRx -import org.opendc.trace.conv.resourceStateNetTx -import org.opendc.trace.conv.resourceStateTimestamp -import org.opendc.trace.spi.TableDetails -import org.opendc.trace.spi.TraceFormat -import org.opendc.trace.util.CompositeTableReader -import java.nio.file.Files -import java.nio.file.Path -import java.util.stream.Collectors -import kotlin.io.path.extension -import kotlin.io.path.nameWithoutExtension - -/** - * A format implementation for the GWF trace format. - */ -public class BitbrainsTraceFormat : TraceFormat { - /** - * The name of this trace format. - */ - override val name: String = "bitbrains" - - /** - * The [CsvFactory] used to create the parser. - */ - private val factory = - CsvFactory() - .enable(CsvParser.Feature.ALLOW_COMMENTS) - .enable(CsvParser.Feature.TRIM_SPACES) - - override fun create(path: Path) { - throw UnsupportedOperationException("Writing not supported for this format") - } - - override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) - - override fun getDetails( - path: Path, - table: String, - ): TableDetails { - return when (table) { - TABLE_RESOURCES -> - TableDetails( - listOf( - TableColumn(resourceID, TableColumnType.String), - ), - ) - TABLE_RESOURCE_STATES -> - TableDetails( - listOf( - TableColumn(resourceID, TableColumnType.String), - TableColumn(resourceStateTimestamp, TableColumnType.Instant), - TableColumn(resourceCpuCount, TableColumnType.Int), - TableColumn(resourceCpuCapacity, TableColumnType.Double), - TableColumn(resourceStateCpuUsage, TableColumnType.Double), - TableColumn(resourceStateCpuUsagePct, TableColumnType.Double), - TableColumn(resourceMemCapacity, TableColumnType.Double), - TableColumn(resourceStateMemUsage, TableColumnType.Double), - TableColumn(resourceStateDiskRead, TableColumnType.Double), - TableColumn(resourceStateDiskWrite, TableColumnType.Double), - TableColumn(resourceStateNetRx, TableColumnType.Double), - TableColumn(resourceStateNetTx, TableColumnType.Double), - ), - ) - else -> throw IllegalArgumentException("Table $table not supported") - } - } - - override fun newReader( - path: Path, - table: String, - projection: List<String>?, - ): TableReader { - return when (table) { - TABLE_RESOURCES -> { - val vms = - Files.walk(path, 1) - .filter { !Files.isDirectory(it) && it.extension == "csv" } - .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) - .toSortedMap() - BitbrainsResourceTableReader(factory, vms) - } - TABLE_RESOURCE_STATES -> newResourceStateReader(path) - else -> throw IllegalArgumentException("Table $table not supported") - } - } - - override fun newWriter( - path: Path, - table: String, - ): TableWriter { - throw UnsupportedOperationException("Writing not supported for this format") - } - - /** - * Construct a [TableReader] for reading over all resource state partitions. - */ - private fun newResourceStateReader(path: Path): TableReader { - val partitions = - Files.walk(path, 1) - .filter { !Files.isDirectory(it) && it.extension == "csv" } - .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) - .toSortedMap() - val it = partitions.iterator() - - return object : CompositeTableReader() { - override fun nextReader(): TableReader? { - return if (it.hasNext()) { - val (partition, partPath) = it.next() - return BitbrainsResourceStateTableReader(partition, factory.createParser(partPath.toFile())) - } else { - null - } - } - - override fun toString(): String = "BitbrainsCompositeTableReader" - } - } -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/CarbonTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/CarbonTableReader.kt index 226c8806..c5face9f 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/CarbonTableReader.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/CarbonTableReader.kt @@ -23,9 +23,9 @@ package org.opendc.trace.formats.carbon import org.opendc.trace.TableReader -import org.opendc.trace.conv.CARBON_INTENSITY_TIMESTAMP -import org.opendc.trace.conv.CARBON_INTENSITY_VALUE -import org.opendc.trace.formats.carbon.parquet.CarbonIntensityFragment +import org.opendc.trace.conv.CARBON_INTENSITY +import org.opendc.trace.conv.CARBON_TIMESTAMP +import org.opendc.trace.formats.carbon.parquet.CarbonFragment import org.opendc.trace.util.parquet.LocalParquetReader import java.time.Duration import java.time.Instant @@ -34,11 +34,11 @@ import java.util.UUID /** * A [TableReader] implementation for the WTF format. */ -internal class CarbonTableReader(private val reader: LocalParquetReader<CarbonIntensityFragment>) : TableReader { +internal class CarbonTableReader(private val reader: LocalParquetReader<CarbonFragment>) : TableReader { /** * The current record. */ - private var record: CarbonIntensityFragment? = null + private var record: CarbonFragment? = null override fun nextRow(): Boolean { try { @@ -57,8 +57,8 @@ internal class CarbonTableReader(private val reader: LocalParquetReader<CarbonIn override fun resolve(name: String): Int { return when (name) { - CARBON_INTENSITY_TIMESTAMP -> colTimestamp - CARBON_INTENSITY_VALUE -> colCarbonIntensity + CARBON_TIMESTAMP -> colTimestamp + CARBON_INTENSITY -> colCarbonIntensity else -> -1 } } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/CarbonTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/CarbonTraceFormat.kt index d8adc739..764bb349 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/CarbonTraceFormat.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/CarbonTraceFormat.kt @@ -26,10 +26,10 @@ import org.opendc.trace.TableColumn import org.opendc.trace.TableColumnType import org.opendc.trace.TableReader import org.opendc.trace.TableWriter -import org.opendc.trace.conv.CARBON_INTENSITY_TIMESTAMP -import org.opendc.trace.conv.CARBON_INTENSITY_VALUE -import org.opendc.trace.conv.TABLE_CARBON_INTENSITIES -import org.opendc.trace.formats.carbon.parquet.CarbonIntensityReadSupport +import org.opendc.trace.conv.CARBON_INTENSITY +import org.opendc.trace.conv.CARBON_TIMESTAMP +import org.opendc.trace.conv.TABLE_CARBON +import org.opendc.trace.formats.carbon.parquet.CarbonReadSupport import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat import org.opendc.trace.util.parquet.LocalParquetReader @@ -45,18 +45,18 @@ public class CarbonTraceFormat : TraceFormat { throw UnsupportedOperationException("Writing not supported for this format") } - override fun getTables(path: Path): List<String> = listOf(TABLE_CARBON_INTENSITIES) + override fun getTables(path: Path): List<String> = listOf(TABLE_CARBON) override fun getDetails( path: Path, table: String, ): TableDetails { return when (table) { - TABLE_CARBON_INTENSITIES -> + TABLE_CARBON -> TableDetails( listOf( - TableColumn(CARBON_INTENSITY_TIMESTAMP, TableColumnType.Instant), - TableColumn(CARBON_INTENSITY_VALUE, TableColumnType.Double), + TableColumn(CARBON_TIMESTAMP, TableColumnType.Instant), + TableColumn(CARBON_INTENSITY, TableColumnType.Double), ), ) else -> throw IllegalArgumentException("Table $table not supported") @@ -69,8 +69,8 @@ public class CarbonTraceFormat : TraceFormat { projection: List<String>?, ): TableReader { return when (table) { - TABLE_CARBON_INTENSITIES -> { - val reader = LocalParquetReader(path, CarbonIntensityReadSupport(projection)) + TABLE_CARBON -> { + val reader = LocalParquetReader(path, CarbonReadSupport(projection)) CarbonTableReader(reader) } else -> throw IllegalArgumentException("Table $table not supported") diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityFragment.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonFragment.kt index 3211cb6c..fe05876b 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityFragment.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonFragment.kt @@ -27,7 +27,7 @@ import java.time.Instant /** * A task in the Workflow Trace Format. */ -internal data class CarbonIntensityFragment( +internal data class CarbonFragment( val timestamp: Instant, val carbonIntensity: Double, ) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityReadSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonReadSupport.kt index 2f4eac05..53087079 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityReadSupport.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonReadSupport.kt @@ -26,26 +26,24 @@ import org.apache.hadoop.conf.Configuration import org.apache.parquet.hadoop.api.InitContext import org.apache.parquet.hadoop.api.ReadSupport import org.apache.parquet.io.api.RecordMaterializer -import org.apache.parquet.schema.LogicalTypeAnnotation import org.apache.parquet.schema.MessageType -import org.apache.parquet.schema.PrimitiveType import org.apache.parquet.schema.Types -import org.opendc.trace.conv.CARBON_INTENSITY_TIMESTAMP -import org.opendc.trace.conv.CARBON_INTENSITY_VALUE +import org.opendc.trace.conv.CARBON_INTENSITY +import org.opendc.trace.conv.CARBON_TIMESTAMP /** - * A [ReadSupport] instance for [Task] objects. + * A [ReadSupport] instance for [CarbonFragment] objects. * * @param projection The projection of the table to read. */ -internal class CarbonIntensityReadSupport(private val projection: List<String>?) : ReadSupport<CarbonIntensityFragment>() { +internal class CarbonReadSupport(private val projection: List<String>?) : ReadSupport<CarbonFragment>() { /** * Mapping of table columns to their Parquet column names. */ private val colMap = mapOf( - CARBON_INTENSITY_TIMESTAMP to "timestamp", - CARBON_INTENSITY_VALUE to "carbon_intensity", + CARBON_TIMESTAMP to "timestamp", + CARBON_INTENSITY to "carbon_intensity", ) override fun init(context: InitContext): ReadContext { @@ -53,16 +51,16 @@ internal class CarbonIntensityReadSupport(private val projection: List<String>?) if (projection != null) { Types.buildMessage() .apply { - val fieldByName = READ_SCHEMA.fields.associateBy { it.name } + val fieldByName = CARBON_SCHEMA.fields.associateBy { it.name } for (col in projection) { val fieldName = colMap[col] ?: continue addField(fieldByName.getValue(fieldName)) } } - .named(READ_SCHEMA.name) + .named(CARBON_SCHEMA.name) } else { - READ_SCHEMA + CARBON_SCHEMA } return ReadContext(projectedSchema) } @@ -72,24 +70,5 @@ internal class CarbonIntensityReadSupport(private val projection: List<String>?) keyValueMetaData: Map<String, String>, fileSchema: MessageType, readContext: ReadContext, - ): RecordMaterializer<CarbonIntensityFragment> = CarbonIntensityRecordMaterializer(readContext.requestedSchema) - - companion object { - /** - * Parquet read schema for the "tasks" table in the trace. - */ - @JvmStatic - val READ_SCHEMA: MessageType = - Types.buildMessage() - .addFields( - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("timestamp"), - Types - .optional(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("carbon_intensity"), - ) - .named("carbon_intensity_fragment") - } + ): RecordMaterializer<CarbonFragment> = CarbonRecordMaterializer(readContext.requestedSchema) } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityRecordMaterializer.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonRecordMaterializer.kt index f5d68f22..aa915a39 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityRecordMaterializer.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonRecordMaterializer.kt @@ -30,9 +30,9 @@ import org.apache.parquet.schema.MessageType import java.time.Instant /** - * A [RecordMaterializer] for [Task] records. + * A [RecordMaterializer] for [CarbonFragment] records. */ -internal class CarbonIntensityRecordMaterializer(schema: MessageType) : RecordMaterializer<CarbonIntensityFragment>() { +internal class CarbonRecordMaterializer(schema: MessageType) : RecordMaterializer<CarbonFragment>() { /** * State of current record being read. */ @@ -76,8 +76,8 @@ internal class CarbonIntensityRecordMaterializer(schema: MessageType) : RecordMa override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex] } - override fun getCurrentRecord(): CarbonIntensityFragment = - CarbonIntensityFragment( + override fun getCurrentRecord(): CarbonFragment = + CarbonFragment( localTimestamp, localCarbonIntensity, ) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonSchemas.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonSchemas.kt new file mode 100644 index 00000000..c8b11968 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonSchemas.kt @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2025 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.formats.carbon.parquet + +import org.apache.parquet.schema.LogicalTypeAnnotation +import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.PrimitiveType +import org.apache.parquet.schema.Types + +private val CARBON_SCHEMA_v1: MessageType = + Types.buildMessage() + .addFields( + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("timestamp"), + Types + .optional(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("carbon_intensity"), + ) + .named("carbon_intensity_fragment") + +public val CARBON_SCHEMA: MessageType = CARBON_SCHEMA_v1 diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/parquet/FailureReadSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/parquet/FailureReadSupport.kt index d49f86c6..9bd8fd72 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/parquet/FailureReadSupport.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/parquet/FailureReadSupport.kt @@ -27,14 +27,13 @@ import org.apache.parquet.hadoop.api.InitContext import org.apache.parquet.hadoop.api.ReadSupport import org.apache.parquet.io.api.RecordMaterializer import org.apache.parquet.schema.MessageType -import org.apache.parquet.schema.PrimitiveType import org.apache.parquet.schema.Types import org.opendc.trace.conv.FAILURE_DURATION import org.opendc.trace.conv.FAILURE_INTENSITY import org.opendc.trace.conv.FAILURE_INTERVAL /** - * A [ReadSupport] instance for [Task] objects. + * A [ReadSupport] instance for [FailureFragment] objects. * * @param projection The projection of the table to read. */ @@ -54,16 +53,16 @@ internal class FailureReadSupport(private val projection: List<String>?) : ReadS if (projection != null) { Types.buildMessage() .apply { - val fieldByName = READ_SCHEMA.fields.associateBy { it.name } + val fieldByName = FAILURE_SCHEMA.fields.associateBy { it.name } for (col in projection) { val fieldName = colMap[col] ?: continue addField(fieldByName.getValue(fieldName)) } } - .named(READ_SCHEMA.name) + .named(FAILURE_SCHEMA.name) } else { - READ_SCHEMA + FAILURE_SCHEMA } return ReadContext(projectedSchema) } @@ -74,25 +73,4 @@ internal class FailureReadSupport(private val projection: List<String>?) : ReadS fileSchema: MessageType, readContext: ReadContext, ): RecordMaterializer<FailureFragment> = FailureRecordMaterializer(readContext.requestedSchema) - - companion object { - /** - * Parquet read schema for the "tasks" table in the trace. - */ - @JvmStatic - val READ_SCHEMA: MessageType = - Types.buildMessage() - .addFields( - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .named("failure_interval"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .named("failure_duration"), - Types - .optional(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("failure_intensity"), - ) - .named("failure_fragment") - } } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/parquet/FailureRecordMaterializer.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/parquet/FailureRecordMaterializer.kt index 5a00f8c9..83281984 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/parquet/FailureRecordMaterializer.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/parquet/FailureRecordMaterializer.kt @@ -29,7 +29,7 @@ import org.apache.parquet.io.api.RecordMaterializer import org.apache.parquet.schema.MessageType /** - * A [RecordMaterializer] for [Task] records. + * A [RecordMaterializer] for [FailureFragment] records. */ internal class FailureRecordMaterializer(schema: MessageType) : RecordMaterializer<FailureFragment>() { /** diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/parquet/FailureSchemas.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/parquet/FailureSchemas.kt new file mode 100644 index 00000000..bafac387 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/parquet/FailureSchemas.kt @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2025 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.formats.failure.parquet + +import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.PrimitiveType +import org.apache.parquet.schema.Types + +private val FAILURE_SCHEMA_v1: MessageType = + Types.buildMessage() + .addFields( + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .named("failure_interval"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .named("failure_duration"), + Types + .optional(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("failure_intensity"), + ) + .named("failure_fragment") + +public val FAILURE_SCHEMA: MessageType = FAILURE_SCHEMA_v1 diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/gwf/GwfTaskTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/gwf/GwfTaskTableReader.kt deleted file mode 100644 index 8a2a99cb..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/gwf/GwfTaskTableReader.kt +++ /dev/null @@ -1,286 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.gwf - -import com.fasterxml.jackson.core.JsonToken -import com.fasterxml.jackson.dataformat.csv.CsvParser -import com.fasterxml.jackson.dataformat.csv.CsvSchema -import org.opendc.trace.TableColumnType -import org.opendc.trace.TableReader -import org.opendc.trace.conv.TASK_ALLOC_NCPUS -import org.opendc.trace.conv.TASK_ID -import org.opendc.trace.conv.TASK_PARENTS -import org.opendc.trace.conv.TASK_REQ_NCPUS -import org.opendc.trace.conv.TASK_RUNTIME -import org.opendc.trace.conv.TASK_SUBMIT_TIME -import org.opendc.trace.conv.TASK_WORKFLOW_ID -import org.opendc.trace.util.convertTo -import java.time.Duration -import java.time.Instant -import java.util.UUID -import java.util.regex.Pattern - -/** - * A [TableReader] implementation for the GWF format. - */ -internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { - /** - * A flag to indicate whether a single row has been read already. - */ - private var isStarted = false - - init { - parser.schema = schema - } - - override fun nextRow(): Boolean { - if (!isStarted) { - isStarted = true - } - - // Reset the row state - reset() - - if (parser.isClosed || !nextStart()) { - return false - } - - while (true) { - val token = parser.nextValue() - - if (token == null || token == JsonToken.END_OBJECT) { - break - } - - when (parser.currentName) { - "WorkflowID" -> workflowId = parser.text - "JobID" -> jobId = parser.text - "SubmitTime" -> submitTime = Instant.ofEpochSecond(parser.longValue) - "RunTime" -> runtime = Duration.ofSeconds(parser.longValue) - "NProcs" -> nProcs = parser.intValue - "ReqNProcs" -> reqNProcs = parser.intValue - "Dependencies" -> dependencies = parseParents(parser.valueAsString) - } - } - - return true - } - - override fun resolve(name: String): Int { - return when (name) { - TASK_ID -> colJobID - TASK_WORKFLOW_ID -> colWorkflowID - TASK_SUBMIT_TIME -> colSubmitTime - TASK_RUNTIME -> colRuntime - TASK_ALLOC_NCPUS -> colNproc - TASK_REQ_NCPUS -> colReqNproc - TASK_PARENTS -> colDeps - else -> -1 - } - } - - override fun isNull(index: Int): Boolean { - require(index in 0..colDeps) { "Invalid column" } - return false - } - - override fun getBoolean(index: Int): Boolean { - throw IllegalArgumentException("Invalid column") - } - - override fun getInt(index: Int): Int { - checkActive() - return when (index) { - colReqNproc -> reqNProcs - colNproc -> nProcs - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getLong(index: Int): Long { - throw IllegalArgumentException("Invalid column") - } - - override fun getFloat(index: Int): Float { - throw IllegalArgumentException("Invalid column") - } - - override fun getDouble(index: Int): Double { - throw IllegalArgumentException("Invalid column") - } - - override fun getString(index: Int): String? { - checkActive() - return when (index) { - colJobID -> jobId - colWorkflowID -> workflowId - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getUUID(index: Int): UUID? { - throw IllegalArgumentException("Invalid column") - } - - override fun getInstant(index: Int): Instant? { - checkActive() - return when (index) { - colSubmitTime -> submitTime - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getDuration(index: Int): Duration? { - checkActive() - return when (index) { - colRuntime -> runtime - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun <T> getList( - index: Int, - elementType: Class<T>, - ): List<T>? { - throw IllegalArgumentException("Invalid column") - } - - override fun <K, V> getMap( - index: Int, - keyType: Class<K>, - valueType: Class<V>, - ): Map<K, V>? { - throw IllegalArgumentException("Invalid column") - } - - override fun <T> getSet( - index: Int, - elementType: Class<T>, - ): Set<T>? { - checkActive() - return when (index) { - colDeps -> typeDeps.convertTo(dependencies, elementType) - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun close() { - parser.close() - } - - /** - * Helper method to check if the reader is active. - */ - private fun checkActive() { - check(isStarted && !parser.isClosed) { "No active row. Did you call nextRow()?" } - } - - /** - * The pattern used to parse the parents. - */ - private val pattern = Pattern.compile("\\s+") - - /** - * Parse the parents into a set of longs. - */ - private fun parseParents(value: String): Set<String> { - val result = mutableSetOf<String>() - val deps = value.split(pattern) - - for (dep in deps) { - if (dep.isBlank()) { - continue - } - - result.add(dep) - } - - return result - } - - /** - * Advance the parser until the next object start. - */ - private fun nextStart(): Boolean { - var token = parser.nextValue() - - while (token != null && token != JsonToken.START_OBJECT) { - token = parser.nextValue() - } - - return token != null - } - - /** - * Reader state fields. - */ - private var workflowId: String? = null - private var jobId: String? = null - private var submitTime: Instant? = null - private var runtime: Duration? = null - private var nProcs = -1 - private var reqNProcs = -1 - private var dependencies = emptySet<String>() - - /** - * Reset the state. - */ - private fun reset() { - workflowId = null - jobId = null - submitTime = null - runtime = null - nProcs = -1 - reqNProcs = -1 - dependencies = emptySet() - } - - private val colWorkflowID = 0 - private val colJobID = 1 - private val colSubmitTime = 2 - private val colRuntime = 3 - private val colNproc = 4 - private val colReqNproc = 5 - private val colDeps = 6 - - private val typeDeps = TableColumnType.Set(TableColumnType.String) - - companion object { - /** - * The [CsvSchema] that is used to parse the trace. - */ - private val schema = - CsvSchema.builder() - .addColumn("WorkflowID", CsvSchema.ColumnType.NUMBER) - .addColumn("JobID", CsvSchema.ColumnType.NUMBER) - .addColumn("SubmitTime", CsvSchema.ColumnType.NUMBER) - .addColumn("RunTime", CsvSchema.ColumnType.NUMBER) - .addColumn("NProcs", CsvSchema.ColumnType.NUMBER) - .addColumn("ReqNProcs", CsvSchema.ColumnType.NUMBER) - .addColumn("Dependencies", CsvSchema.ColumnType.STRING) - .setAllowComments(true) - .setUseHeader(true) - .setColumnSeparator(',') - .build() - } -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/gwf/GwfTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/gwf/GwfTraceFormat.kt deleted file mode 100644 index 097c5593..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/gwf/GwfTraceFormat.kt +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.gwf - -import com.fasterxml.jackson.dataformat.csv.CsvFactory -import com.fasterxml.jackson.dataformat.csv.CsvParser -import org.opendc.trace.TableColumn -import org.opendc.trace.TableColumnType -import org.opendc.trace.TableReader -import org.opendc.trace.TableWriter -import org.opendc.trace.conv.TABLE_TASKS -import org.opendc.trace.conv.TASK_ALLOC_NCPUS -import org.opendc.trace.conv.TASK_ID -import org.opendc.trace.conv.TASK_PARENTS -import org.opendc.trace.conv.TASK_REQ_NCPUS -import org.opendc.trace.conv.TASK_RUNTIME -import org.opendc.trace.conv.TASK_SUBMIT_TIME -import org.opendc.trace.conv.TASK_WORKFLOW_ID -import org.opendc.trace.spi.TableDetails -import org.opendc.trace.spi.TraceFormat -import java.nio.file.Path - -/** - * A [TraceFormat] implementation for the GWF trace format. - */ -public class GwfTraceFormat : TraceFormat { - /** - * The name of this trace format. - */ - override val name: String = "gwf" - - /** - * The [CsvFactory] used to create the parser. - */ - private val factory = - CsvFactory() - .enable(CsvParser.Feature.ALLOW_COMMENTS) - .enable(CsvParser.Feature.TRIM_SPACES) - - override fun create(path: Path) { - throw UnsupportedOperationException("Writing not supported for this format") - } - - override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS) - - override fun getDetails( - path: Path, - table: String, - ): TableDetails { - return when (table) { - TABLE_TASKS -> - TableDetails( - listOf( - TableColumn(TASK_WORKFLOW_ID, TableColumnType.String), - TableColumn(TASK_ID, TableColumnType.String), - TableColumn(TASK_SUBMIT_TIME, TableColumnType.Instant), - TableColumn(TASK_RUNTIME, TableColumnType.Duration), - TableColumn(TASK_REQ_NCPUS, TableColumnType.Int), - TableColumn(TASK_ALLOC_NCPUS, TableColumnType.Int), - TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)), - ), - ) - else -> throw IllegalArgumentException("Table $table not supported") - } - } - - override fun newReader( - path: Path, - table: String, - projection: List<String>?, - ): TableReader { - return when (table) { - TABLE_TASKS -> GwfTaskTableReader(factory.createParser(path.toFile())) - else -> throw IllegalArgumentException("Table $table not supported") - } - } - - override fun newWriter( - path: Path, - table: String, - ): TableWriter { - throw UnsupportedOperationException("Writing not supported for this format") - } -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmInterferenceJsonTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmInterferenceJsonTableReader.kt deleted file mode 100644 index dba971d7..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmInterferenceJsonTableReader.kt +++ /dev/null @@ -1,225 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.formats.opendc - -import com.fasterxml.jackson.core.JsonParseException -import com.fasterxml.jackson.core.JsonParser -import com.fasterxml.jackson.core.JsonToken -import org.opendc.trace.TableColumnType -import org.opendc.trace.TableReader -import org.opendc.trace.conv.INTERFERENCE_GROUP_MEMBERS -import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE -import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET -import org.opendc.trace.util.convertTo -import java.time.Duration -import java.time.Instant -import java.util.UUID - -/** - * A [TableReader] implementation for the OpenDC VM interference JSON format. - */ -internal class OdcVmInterferenceJsonTableReader(private val parser: JsonParser) : TableReader { - /** - * A flag to indicate whether a single row has been read already. - */ - private var isStarted = false - - override fun nextRow(): Boolean { - if (!isStarted) { - isStarted = true - - parser.nextToken() - - if (!parser.isExpectedStartArrayToken) { - throw JsonParseException(parser, "Expected array at start, but got ${parser.currentToken()}") - } - } - - return if (parser.isClosed || parser.nextToken() == JsonToken.END_ARRAY) { - parser.close() - reset() - false - } else { - parseGroup(parser) - true - } - } - - private val colMembers = 0 - private val colTarget = 1 - private val colScore = 2 - - private val typeMembers = TableColumnType.Set(TableColumnType.String) - - override fun resolve(name: String): Int { - return when (name) { - INTERFERENCE_GROUP_MEMBERS -> colMembers - INTERFERENCE_GROUP_TARGET -> colTarget - INTERFERENCE_GROUP_SCORE -> colScore - else -> -1 - } - } - - override fun isNull(index: Int): Boolean { - return when (index) { - colMembers, colTarget, colScore -> false - else -> throw IllegalArgumentException("Invalid column index $index") - } - } - - override fun getBoolean(index: Int): Boolean { - throw IllegalArgumentException("Invalid column $index") - } - - override fun getInt(index: Int): Int { - throw IllegalArgumentException("Invalid column $index") - } - - override fun getLong(index: Int): Long { - throw IllegalArgumentException("Invalid column $index") - } - - override fun getFloat(index: Int): Float { - throw IllegalArgumentException("Invalid column $index") - } - - override fun getDouble(index: Int): Double { - checkActive() - return when (index) { - colTarget -> targetLoad - colScore -> score - else -> throw IllegalArgumentException("Invalid column $index") - } - } - - override fun getString(index: Int): String? { - throw IllegalArgumentException("Invalid column $index") - } - - override fun getUUID(index: Int): UUID? { - throw IllegalArgumentException("Invalid column $index") - } - - override fun getInstant(index: Int): Instant? { - throw IllegalArgumentException("Invalid column $index") - } - - override fun getDuration(index: Int): Duration? { - throw IllegalArgumentException("Invalid column $index") - } - - override fun <T> getList( - index: Int, - elementType: Class<T>, - ): List<T>? { - throw IllegalArgumentException("Invalid column $index") - } - - override fun <T> getSet( - index: Int, - elementType: Class<T>, - ): Set<T>? { - checkActive() - return when (index) { - colMembers -> typeMembers.convertTo(members, elementType) - else -> throw IllegalArgumentException("Invalid column $index") - } - } - - override fun <K, V> getMap( - index: Int, - keyType: Class<K>, - valueType: Class<V>, - ): Map<K, V>? { - throw IllegalArgumentException("Invalid column $index") - } - - override fun close() { - parser.close() - } - - private var members = emptySet<String>() - private var targetLoad = Double.POSITIVE_INFINITY - private var score = 1.0 - - /** - * Helper method to check if the reader is active. - */ - private fun checkActive() { - check(isStarted && !parser.isClosed) { "No active row. Did you call nextRow()?" } - } - - /** - * Reset the state. - */ - private fun reset() { - members = emptySet() - targetLoad = Double.POSITIVE_INFINITY - score = 1.0 - } - - /** - * Parse a group an interference JSON file. - */ - private fun parseGroup(parser: JsonParser) { - var targetLoad = Double.POSITIVE_INFINITY - var score = 1.0 - val members = mutableSetOf<String>() - - if (!parser.isExpectedStartObjectToken) { - throw JsonParseException(parser, "Expected object, but got ${parser.currentToken()}") - } - - while (parser.nextValue() != JsonToken.END_OBJECT) { - when (parser.currentName) { - "vms" -> parseGroupMembers(parser, members) - "minServerLoad" -> targetLoad = parser.doubleValue - "performanceScore" -> score = parser.doubleValue - } - } - - this.members = members - this.targetLoad = targetLoad - this.score = score - } - - /** - * Parse the members of a group. - */ - private fun parseGroupMembers( - parser: JsonParser, - members: MutableSet<String>, - ) { - if (!parser.isExpectedStartArrayToken) { - throw JsonParseException(parser, "Expected array for group members, but got ${parser.currentToken()}") - } - - while (parser.nextValue() != JsonToken.END_ARRAY) { - if (parser.currentToken() != JsonToken.VALUE_STRING) { - throw JsonParseException(parser, "Expected string value for group member") - } - - members.add(parser.text) - } - } -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmInterferenceJsonTableWriter.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmInterferenceJsonTableWriter.kt deleted file mode 100644 index b3286a1c..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmInterferenceJsonTableWriter.kt +++ /dev/null @@ -1,192 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.formats.opendc - -import com.fasterxml.jackson.core.JsonGenerator -import org.opendc.trace.TableWriter -import org.opendc.trace.conv.INTERFERENCE_GROUP_MEMBERS -import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE -import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET -import java.time.Duration -import java.time.Instant -import java.util.UUID - -/** - * A [TableWriter] implementation for the OpenDC VM interference JSON format. - */ -internal class OdcVmInterferenceJsonTableWriter(private val generator: JsonGenerator) : TableWriter { - /** - * A flag to indicate whether a row has been started. - */ - private var isRowActive = false - - init { - generator.writeStartArray() - } - - override fun startRow() { - // Reset state - members = emptySet() - targetLoad = Double.POSITIVE_INFINITY - score = 1.0 - - // Mark row as active - isRowActive = true - } - - override fun endRow() { - check(isRowActive) { "No active row" } - - generator.writeStartObject() - generator.writeArrayFieldStart("vms") - for (member in members) { - generator.writeString(member) - } - generator.writeEndArray() - generator.writeNumberField("minServerLoad", targetLoad) - generator.writeNumberField("performanceScore", score) - generator.writeEndObject() - } - - override fun resolve(name: String): Int { - return when (name) { - INTERFERENCE_GROUP_MEMBERS -> colMembers - INTERFERENCE_GROUP_TARGET -> colTarget - INTERFERENCE_GROUP_SCORE -> colScore - else -> -1 - } - } - - override fun setBoolean( - index: Int, - value: Boolean, - ) { - throw IllegalArgumentException("Invalid column $index") - } - - override fun setInt( - index: Int, - value: Int, - ) { - throw IllegalArgumentException("Invalid column $index") - } - - override fun setLong( - index: Int, - value: Long, - ) { - throw IllegalArgumentException("Invalid column $index") - } - - override fun setFloat( - index: Int, - value: Float, - ) { - throw IllegalArgumentException("Invalid column $index") - } - - override fun setDouble( - index: Int, - value: Double, - ) { - check(isRowActive) { "No active row" } - - when (index) { - colTarget -> targetLoad = (value as Number).toDouble() - colScore -> score = (value as Number).toDouble() - else -> throw IllegalArgumentException("Invalid column $index") - } - } - - override fun setString( - index: Int, - value: String, - ) { - throw IllegalArgumentException("Invalid column $index") - } - - override fun setUUID( - index: Int, - value: UUID, - ) { - throw IllegalArgumentException("Invalid column $index") - } - - override fun setInstant( - index: Int, - value: Instant, - ) { - throw IllegalArgumentException("Invalid column $index") - } - - override fun setDuration( - index: Int, - value: Duration, - ) { - throw IllegalArgumentException("Invalid column $index") - } - - override fun <T> setList( - index: Int, - value: List<T>, - ) { - throw IllegalArgumentException("Invalid column $index") - } - - override fun <T> setSet( - index: Int, - value: Set<T>, - ) { - check(isRowActive) { "No active row" } - - @Suppress("UNCHECKED_CAST") - when (index) { - colMembers -> members = value as Set<String> - else -> throw IllegalArgumentException("Invalid column index $index") - } - } - - override fun <K, V> setMap( - index: Int, - value: Map<K, V>, - ) { - throw IllegalArgumentException("Invalid column $index") - } - - override fun flush() { - generator.flush() - } - - override fun close() { - generator.writeEndArray() - generator.close() - } - - private val colMembers = 0 - private val colTarget = 1 - private val colScore = 2 - - private var members = emptySet<String>() - private var targetLoad = Double.POSITIVE_INFINITY - private var score = 1.0 -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmTraceFormat.kt deleted file mode 100644 index 74e880be..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmTraceFormat.kt +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.formats.opendc - -import com.fasterxml.jackson.core.JsonEncoding -import com.fasterxml.jackson.core.JsonFactory -import org.apache.parquet.column.ParquetProperties -import org.apache.parquet.hadoop.ParquetFileWriter -import org.apache.parquet.hadoop.metadata.CompressionCodecName -import org.opendc.trace.TableColumn -import org.opendc.trace.TableColumnType -import org.opendc.trace.TableReader -import org.opendc.trace.TableWriter -import org.opendc.trace.conv.INTERFERENCE_GROUP_MEMBERS -import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE -import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET -import org.opendc.trace.conv.TABLE_INTERFERENCE_GROUPS -import org.opendc.trace.conv.TABLE_RESOURCES -import org.opendc.trace.conv.TABLE_RESOURCE_STATES -import org.opendc.trace.conv.resourceChildren -import org.opendc.trace.conv.resourceCpuCapacity -import org.opendc.trace.conv.resourceCpuCount -import org.opendc.trace.conv.resourceDeadline -import org.opendc.trace.conv.resourceDuration -import org.opendc.trace.conv.resourceGpuCapacity -import org.opendc.trace.conv.resourceGpuCount -import org.opendc.trace.conv.resourceID -import org.opendc.trace.conv.resourceMemCapacity -import org.opendc.trace.conv.resourceNature -import org.opendc.trace.conv.resourceParents -import org.opendc.trace.conv.resourceStateCpuUsage -import org.opendc.trace.conv.resourceStateDuration -import org.opendc.trace.conv.resourceStateTimestamp -import org.opendc.trace.conv.resourceSubmissionTime -import org.opendc.trace.formats.opendc.parquet.ResourceReadSupport -import org.opendc.trace.formats.opendc.parquet.ResourceStateReadSupport -import org.opendc.trace.formats.opendc.parquet.ResourceStateWriteSupport -import org.opendc.trace.formats.opendc.parquet.ResourceWriteSupport -import org.opendc.trace.spi.TableDetails -import org.opendc.trace.spi.TraceFormat -import org.opendc.trace.util.parquet.LocalParquetReader -import org.opendc.trace.util.parquet.LocalParquetWriter -import java.nio.file.Files -import java.nio.file.Path -import kotlin.io.path.exists - -/** - * A [TraceFormat] implementation of the OpenDC virtual machine trace format. - */ -public class OdcVmTraceFormat : TraceFormat { - /** - * A [JsonFactory] that is used to parse the JSON-based interference model. - */ - private val jsonFactory = JsonFactory() - - /** - * The name of this trace format. - */ - override val name: String = "opendc-vm" - - override fun create(path: Path) { - // Construct directory containing the trace files - Files.createDirectories(path) - - val tables = getTables(path) - - for (table in tables) { - val writer = newWriter(path, table) - writer.close() - } - } - - override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES, TABLE_INTERFERENCE_GROUPS) - - override fun getDetails( - path: Path, - table: String, - ): TableDetails { - return when (table) { - TABLE_RESOURCES -> - TableDetails( - listOf( - TableColumn(resourceID, TableColumnType.String), - TableColumn(resourceSubmissionTime, TableColumnType.Instant), - TableColumn(resourceDuration, TableColumnType.Long), - TableColumn(resourceCpuCount, TableColumnType.Int), - TableColumn(resourceCpuCapacity, TableColumnType.Double), - TableColumn(resourceMemCapacity, TableColumnType.Double), - TableColumn(resourceGpuCount, TableColumnType.Int), - TableColumn(resourceGpuCapacity, TableColumnType.Double), - TableColumn(resourceParents, TableColumnType.Set(TableColumnType.String)), - TableColumn(resourceChildren, TableColumnType.Set(TableColumnType.String)), - TableColumn(resourceNature, TableColumnType.String), - TableColumn(resourceDeadline, TableColumnType.Long), - ), - ) - TABLE_RESOURCE_STATES -> - TableDetails( - listOf( - TableColumn(resourceID, TableColumnType.String), - TableColumn(resourceStateTimestamp, TableColumnType.Instant), - TableColumn(resourceStateDuration, TableColumnType.Duration), - TableColumn(resourceCpuCount, TableColumnType.Int), - TableColumn(resourceStateCpuUsage, TableColumnType.Double), - ), - ) - TABLE_INTERFERENCE_GROUPS -> - TableDetails( - listOf( - TableColumn(INTERFERENCE_GROUP_MEMBERS, TableColumnType.Set(TableColumnType.String)), - TableColumn(INTERFERENCE_GROUP_TARGET, TableColumnType.Double), - TableColumn(INTERFERENCE_GROUP_SCORE, TableColumnType.Double), - ), - ) - else -> throw IllegalArgumentException("Table $table not supported") - } - } - - override fun newReader( - path: Path, - table: String, - projection: List<String>?, - ): TableReader { - return when (table) { - TABLE_RESOURCES -> { - val reader = LocalParquetReader(path.resolve("tasks.parquet"), ResourceReadSupport(projection)) - OdcVmResourceTableReader(reader) - } - TABLE_RESOURCE_STATES -> { - val reader = LocalParquetReader(path.resolve("fragments.parquet"), ResourceStateReadSupport(projection)) - OdcVmResourceStateTableReader(reader) - } - TABLE_INTERFERENCE_GROUPS -> { - val modelPath = path.resolve("interference-model.json") - val parser = - if (modelPath.exists()) { - jsonFactory.createParser(modelPath.toFile()) - } else { - jsonFactory.createParser("[]") // If model does not exist, return empty model - } - - OdcVmInterferenceJsonTableReader(parser) - } - else -> throw IllegalArgumentException("Table $table not supported") - } - } - - override fun newWriter( - path: Path, - table: String, - ): TableWriter { - return when (table) { - TABLE_RESOURCES -> { - val writer = - LocalParquetWriter.builder(path.resolve("tasks.parquet"), ResourceWriteSupport()) - .withCompressionCodec(CompressionCodecName.ZSTD) - .withPageWriteChecksumEnabled(true) - .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0) - .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) - .build() - OdcVmResourceTableWriter(writer) - } - TABLE_RESOURCE_STATES -> { - val writer = - LocalParquetWriter.builder(path.resolve("fragments.parquet"), ResourceStateWriteSupport()) - .withCompressionCodec(CompressionCodecName.ZSTD) - .withDictionaryEncoding("id", true) - .withBloomFilterEnabled("id", true) - .withPageWriteChecksumEnabled(true) - .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0) - .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) - .build() - OdcVmResourceStateTableWriter(writer) - } - TABLE_INTERFERENCE_GROUPS -> { - val generator = jsonFactory.createGenerator(path.resolve("interference-model.json").toFile(), JsonEncoding.UTF8) - OdcVmInterferenceJsonTableWriter(generator) - } - else -> throw IllegalArgumentException("Table $table not supported") - } - } -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceReadSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceReadSupport.kt deleted file mode 100644 index cd2ccef7..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceReadSupport.kt +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.formats.opendc.parquet - -import org.apache.hadoop.conf.Configuration -import org.apache.parquet.hadoop.api.InitContext -import org.apache.parquet.hadoop.api.ReadSupport -import org.apache.parquet.io.api.RecordMaterializer -import org.apache.parquet.schema.LogicalTypeAnnotation -import org.apache.parquet.schema.MessageType -import org.apache.parquet.schema.PrimitiveType -import org.apache.parquet.schema.Type -import org.apache.parquet.schema.Types -import org.opendc.trace.TableColumn -import org.opendc.trace.conv.resourceChildren -import org.opendc.trace.conv.resourceCpuCapacity -import org.opendc.trace.conv.resourceCpuCount -import org.opendc.trace.conv.resourceDeadline -import org.opendc.trace.conv.resourceDuration -import org.opendc.trace.conv.resourceGpuCapacity -import org.opendc.trace.conv.resourceGpuCount -import org.opendc.trace.conv.resourceID -import org.opendc.trace.conv.resourceMemCapacity -import org.opendc.trace.conv.resourceNature -import org.opendc.trace.conv.resourceParents -import org.opendc.trace.conv.resourceSubmissionTime - -/** - * A [ReadSupport] instance for [Resource] objects. - */ -internal class ResourceReadSupport(private val projection: List<String>?) : ReadSupport<Resource>() { - /** - * Mapping from field names to [TableColumn]s. - */ - private val fieldMap = - mapOf( - "id" to resourceID, - "submissionTime" to resourceSubmissionTime, - "submission_time" to resourceSubmissionTime, - "duration" to resourceDuration, - "maxCores" to resourceCpuCount, - "cpu_count" to resourceCpuCount, - "cpu_capacity" to resourceCpuCapacity, - "requiredMemory" to resourceMemCapacity, - "mem_capacity" to resourceMemCapacity, - "gpu_count" to resourceGpuCount, - "gpu_capacity" to resourceGpuCapacity, - "parents" to resourceParents, - "children" to resourceChildren, - "nature" to resourceNature, - "deadline" to resourceDeadline, - ) - - override fun init(context: InitContext): ReadContext { - val projectedSchema = - if (projection != null) { - Types.buildMessage() - .apply { - val projectionSet = projection.toSet() - - for (field in READ_SCHEMA.fields) { - val col = fieldMap[field.name] ?: continue - if (col in projectionSet) { - addField(field) - } - } - } - .named(READ_SCHEMA.name) - } else { - READ_SCHEMA - } - - return ReadContext(projectedSchema) - } - - override fun prepareForRead( - configuration: Configuration, - keyValueMetaData: Map<String, String>, - fileSchema: MessageType, - readContext: ReadContext, - ): RecordMaterializer<Resource> = ResourceRecordMaterializer(readContext.requestedSchema) - - companion object { - /** - * Parquet read schema (version 2.0) for the "resources" table in the trace. - */ - @JvmStatic - val READ_SCHEMA_V2_0: MessageType = - Types.buildMessage() - .addFields( - Types - .required(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("id"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("submissionTime"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("duration"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("maxCores"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("requiredMemory"), - Types - .optional(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("nature"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .named("deadline"), - ) - .named("resource") - - /** - * Parquet read schema (version 2.1) for the "resources" table in the trace. - */ - @JvmStatic - val READ_SCHEMA_V2_2: MessageType = - Types.buildMessage() - .addFields( - Types - .required(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("id"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("submission_time"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("duration"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("cpu_count"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("cpu_capacity"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("mem_capacity"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT32) - .named("gpu_count"), - Types - .optional(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("gpu_capacity"), - Types - .buildGroup(Type.Repetition.OPTIONAL) - .addField( - Types.repeatedGroup() - .addField( - Types.optional(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("element"), - ) - .named("list"), - ) - .`as`(LogicalTypeAnnotation.listType()) - .named("parents"), - Types - .buildGroup(Type.Repetition.OPTIONAL) - .addField( - Types.repeatedGroup() - .addField( - Types.optional(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("element"), - ) - .named("list"), - ) - .`as`(LogicalTypeAnnotation.listType()) - .named("children"), - Types - .optional(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("nature"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .named("deadline"), - ) - .named("resource") - - /** - * Parquet read schema for the "resources" table in the trace. - */ - @JvmStatic - val READ_SCHEMA: MessageType = - READ_SCHEMA_V2_2 - } -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateReadSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateReadSupport.kt deleted file mode 100644 index 53e594de..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateReadSupport.kt +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.formats.opendc.parquet - -import org.apache.hadoop.conf.Configuration -import org.apache.parquet.hadoop.api.InitContext -import org.apache.parquet.hadoop.api.ReadSupport -import org.apache.parquet.io.api.RecordMaterializer -import org.apache.parquet.schema.LogicalTypeAnnotation -import org.apache.parquet.schema.MessageType -import org.apache.parquet.schema.PrimitiveType -import org.apache.parquet.schema.Types -import org.opendc.trace.TableColumn -import org.opendc.trace.conv.resourceCpuCount -import org.opendc.trace.conv.resourceID -import org.opendc.trace.conv.resourceStateCpuUsage -import org.opendc.trace.conv.resourceStateDuration -import org.opendc.trace.conv.resourceStateTimestamp - -/** - * A [ReadSupport] instance for [ResourceState] objects. - */ -internal class ResourceStateReadSupport(private val projection: List<String>?) : ReadSupport<ResourceState>() { - /** - * Mapping from field names to [TableColumn]s. - */ - private val fieldMap = - mapOf( - "id" to resourceID, - "time" to resourceStateTimestamp, - "timestamp" to resourceStateTimestamp, - "duration" to resourceStateDuration, - "cores" to resourceCpuCount, - "cpu_count" to resourceCpuCount, - "cpuUsage" to resourceStateCpuUsage, - "cpu_usage" to resourceStateCpuUsage, - ) - - override fun init(context: InitContext): ReadContext { - val projectedSchema = - if (projection != null) { - Types.buildMessage() - .apply { - val projectionSet = projection.toSet() - - for (field in READ_SCHEMA.fields) { - val col = fieldMap[field.name] ?: continue - if (col in projectionSet) { - addField(field) - } - } - } - .named(READ_SCHEMA.name) - } else { - READ_SCHEMA - } - - return ReadContext(projectedSchema) - } - - override fun prepareForRead( - configuration: Configuration, - keyValueMetaData: Map<String, String>, - fileSchema: MessageType, - readContext: ReadContext, - ): RecordMaterializer<ResourceState> = ResourceStateRecordMaterializer(readContext.requestedSchema) - - companion object { - /** - * Parquet read schema (version 2.0) for the "resource states" table in the trace. - */ - @JvmStatic - val READ_SCHEMA_V2_0: MessageType = - Types.buildMessage() - .addFields( - Types - .required(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("id"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("time"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("duration"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("cores"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("cpuUsage"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT32) - .named("gpuCount"), - Types - .optional(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("gpuUsage"), - ) - .named("resource_state") - - /** - * Parquet read schema (version 2.1) for the "resource states" table in the trace. - */ - @JvmStatic - val READ_SCHEMA_V2_1: MessageType = - Types.buildMessage() - .addFields( - Types - .required(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("id"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("timestamp"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("duration"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("cpu_count"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("cpu_usage"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT32) - .named("gpu_count"), - Types - .optional(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("gpu_usage"), - ) - .named("resource_state") - - /** - * Parquet read schema for the "resource states" table in the trace. - */ - @JvmStatic - val READ_SCHEMA: MessageType = READ_SCHEMA_V2_0.union(READ_SCHEMA_V2_1) - } -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/swf/SwfTaskTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/swf/SwfTaskTableReader.kt deleted file mode 100644 index 5a79fd6f..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/swf/SwfTaskTableReader.kt +++ /dev/null @@ -1,236 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.swf - -import org.opendc.trace.TableReader -import org.opendc.trace.conv.TASK_ALLOC_NCPUS -import org.opendc.trace.conv.TASK_GROUP_ID -import org.opendc.trace.conv.TASK_ID -import org.opendc.trace.conv.TASK_PARENTS -import org.opendc.trace.conv.TASK_REQ_NCPUS -import org.opendc.trace.conv.TASK_RUNTIME -import org.opendc.trace.conv.TASK_STATUS -import org.opendc.trace.conv.TASK_SUBMIT_TIME -import org.opendc.trace.conv.TASK_USER_ID -import org.opendc.trace.conv.TASK_WAIT_TIME -import java.io.BufferedReader -import java.time.Duration -import java.time.Instant -import java.util.UUID - -/** - * A [TableReader] implementation for the SWF format. - */ -internal class SwfTaskTableReader(private val reader: BufferedReader) : TableReader { - /** - * A flag to indicate the state of the reader - */ - private var state = State.Pending - - /** - * The current row. - */ - private var fields = emptyList<String>() - - /** - * A [Regex] object to match whitespace. - */ - private val whitespace = "\\s+".toRegex() - - override fun nextRow(): Boolean { - var line: String? - var num = 0 - - val state = state - if (state == State.Closed) { - return false - } else if (state == State.Pending) { - this.state = State.Active - } - - while (true) { - line = reader.readLine() - - if (line == null) { - this.state = State.Closed - return false - } - num++ - - if (line.isBlank()) { - // Ignore empty lines - continue - } else if (line.startsWith(";")) { - // Ignore comments for now - continue - } - - break - } - - fields = line!!.trim().split(whitespace) - - if (fields.size < 18) { - throw IllegalArgumentException("Invalid format at line $line") - } - - return true - } - - override fun resolve(name: String): Int { - return when (name) { - TASK_ID -> colJobID - TASK_SUBMIT_TIME -> colSubmitTime - TASK_WAIT_TIME -> colWaitTime - TASK_RUNTIME -> colRunTime - TASK_ALLOC_NCPUS -> colAllocNcpus - TASK_REQ_NCPUS -> colReqNcpus - TASK_STATUS -> colStatus - TASK_USER_ID -> colUserID - TASK_GROUP_ID -> colGroupID - TASK_PARENTS -> colParentJob - else -> -1 - } - } - - override fun isNull(index: Int): Boolean { - require(index in colJobID..colParentThinkTime) { "Invalid column index" } - return false - } - - override fun getBoolean(index: Int): Boolean { - throw IllegalArgumentException("Invalid column") - } - - override fun getInt(index: Int): Int { - check(state == State.Active) { "No active row" } - return when (index) { - colReqNcpus, colAllocNcpus, colStatus, colGroupID, colUserID -> fields[index].toInt(10) - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getLong(index: Int): Long { - throw IllegalArgumentException("Invalid column") - } - - override fun getFloat(index: Int): Float { - throw IllegalArgumentException("Invalid column") - } - - override fun getDouble(index: Int): Double { - throw IllegalArgumentException("Invalid column") - } - - override fun getString(index: Int): String { - check(state == State.Active) { "No active row" } - return when (index) { - colJobID -> fields[index] - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getUUID(index: Int): UUID? { - throw IllegalArgumentException("Invalid column") - } - - override fun getInstant(index: Int): Instant? { - check(state == State.Active) { "No active row" } - return when (index) { - colSubmitTime -> Instant.ofEpochSecond(fields[index].toLong(10)) - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getDuration(index: Int): Duration? { - check(state == State.Active) { "No active row" } - return when (index) { - colWaitTime, colRunTime -> Duration.ofSeconds(fields[index].toLong(10)) - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun <T> getList( - index: Int, - elementType: Class<T>, - ): List<T>? { - throw IllegalArgumentException("Invalid column") - } - - override fun <T> getSet( - index: Int, - elementType: Class<T>, - ): Set<T>? { - check(state == State.Active) { "No active row" } - @Suppress("UNCHECKED_CAST") - return when (index) { - colParentJob -> { - require(elementType.isAssignableFrom(String::class.java)) - val parent = fields[index].toLong(10) - if (parent < 0) emptySet() else setOf(parent) - } - else -> throw IllegalArgumentException("Invalid column") - } as Set<T>? - } - - override fun <K, V> getMap( - index: Int, - keyType: Class<K>, - valueType: Class<V>, - ): Map<K, V>? { - throw IllegalArgumentException("Invalid column") - } - - override fun close() { - reader.close() - state = State.Closed - } - - /** - * Default column indices for the SWF format. - */ - private val colJobID = 0 - private val colSubmitTime = 1 - private val colWaitTime = 2 - private val colRunTime = 3 - private val colAllocNcpus = 4 - private val colAvgCpuTime = 5 - private val colUsedMem = 6 - private val colReqNcpus = 7 - private val colReqTime = 8 - private val colReqMem = 9 - private val colStatus = 10 - private val colUserID = 11 - private val colGroupID = 12 - private val colExecNum = 13 - private val colQueueNum = 14 - private val colPartNum = 15 - private val colParentJob = 16 - private val colParentThinkTime = 17 - - private enum class State { - Pending, - Active, - Closed, - } -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/swf/SwfTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/swf/SwfTraceFormat.kt deleted file mode 100644 index d59b07b4..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/swf/SwfTraceFormat.kt +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.swf - -import org.opendc.trace.TableColumn -import org.opendc.trace.TableColumnType -import org.opendc.trace.TableReader -import org.opendc.trace.TableWriter -import org.opendc.trace.conv.TABLE_TASKS -import org.opendc.trace.conv.TASK_ALLOC_NCPUS -import org.opendc.trace.conv.TASK_GROUP_ID -import org.opendc.trace.conv.TASK_ID -import org.opendc.trace.conv.TASK_PARENTS -import org.opendc.trace.conv.TASK_REQ_NCPUS -import org.opendc.trace.conv.TASK_RUNTIME -import org.opendc.trace.conv.TASK_STATUS -import org.opendc.trace.conv.TASK_SUBMIT_TIME -import org.opendc.trace.conv.TASK_USER_ID -import org.opendc.trace.conv.TASK_WAIT_TIME -import org.opendc.trace.spi.TableDetails -import org.opendc.trace.spi.TraceFormat -import java.nio.file.Path -import kotlin.io.path.bufferedReader - -/** - * Support for the Standard Workload Format (SWF) in OpenDC. - * - * The standard is defined by the PWA, see here: https://www.cse.huji.ac.il/labs/parallel/workload/swf.html - */ -public class SwfTraceFormat : TraceFormat { - override val name: String = "swf" - - override fun create(path: Path) { - throw UnsupportedOperationException("Writing not supported for this format") - } - - override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS) - - override fun getDetails( - path: Path, - table: String, - ): TableDetails { - return when (table) { - TABLE_TASKS -> - TableDetails( - listOf( - TableColumn(TASK_ID, TableColumnType.String), - TableColumn(TASK_SUBMIT_TIME, TableColumnType.Instant), - TableColumn(TASK_WAIT_TIME, TableColumnType.Duration), - TableColumn(TASK_RUNTIME, TableColumnType.Duration), - TableColumn(TASK_REQ_NCPUS, TableColumnType.Int), - TableColumn(TASK_ALLOC_NCPUS, TableColumnType.Int), - TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)), - TableColumn(TASK_STATUS, TableColumnType.Int), - TableColumn(TASK_GROUP_ID, TableColumnType.Int), - TableColumn(TASK_USER_ID, TableColumnType.Int), - ), - ) - else -> throw IllegalArgumentException("Table $table not supported") - } - } - - override fun newReader( - path: Path, - table: String, - projection: List<String>?, - ): TableReader { - return when (table) { - TABLE_TASKS -> SwfTaskTableReader(path.bufferedReader()) - else -> throw IllegalArgumentException("Table $table not supported") - } - } - - override fun newWriter( - path: Path, - table: String, - ): TableWriter { - throw UnsupportedOperationException("Writing not supported for this format") - } -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wfformat/WfFormatTaskTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wfformat/WfFormatTaskTableReader.kt deleted file mode 100644 index 8f84e51f..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wfformat/WfFormatTaskTableReader.kt +++ /dev/null @@ -1,314 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.wfformat - -import com.fasterxml.jackson.core.JsonParseException -import com.fasterxml.jackson.core.JsonParser -import com.fasterxml.jackson.core.JsonToken -import org.opendc.trace.TableColumnType -import org.opendc.trace.TableReader -import org.opendc.trace.conv.TASK_CHILDREN -import org.opendc.trace.conv.TASK_ID -import org.opendc.trace.conv.TASK_PARENTS -import org.opendc.trace.conv.TASK_REQ_NCPUS -import org.opendc.trace.conv.TASK_RUNTIME -import org.opendc.trace.conv.TASK_WORKFLOW_ID -import org.opendc.trace.util.convertTo -import java.time.Duration -import java.time.Instant -import java.util.UUID -import kotlin.math.roundToInt - -/** - * A [TableReader] implementation for the WfCommons workload trace format. - */ -internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableReader { - /** - * The current nesting of the parser. - */ - private var level: ParserLevel = ParserLevel.TOP - - override fun nextRow(): Boolean { - reset() - - var hasJob = false - - while (!hasJob) { - when (level) { - ParserLevel.TOP -> { - val token = parser.nextToken() - - // Check whether the document is not empty and starts with an object - if (token == null) { - parser.close() - break - } else if (token != JsonToken.START_OBJECT) { - throw JsonParseException(parser, "Expected object", parser.currentLocation) - } else { - level = ParserLevel.TRACE - } - } - ParserLevel.TRACE -> { - // Seek for the workflow object in the file - if (!seekWorkflow()) { - parser.close() - break - } else if (!parser.isExpectedStartObjectToken) { - throw JsonParseException(parser, "Expected object", parser.currentLocation) - } else { - level = ParserLevel.WORKFLOW - } - } - ParserLevel.WORKFLOW -> { - // Seek for the jobs object in the file - level = - if (!seekJobs()) { - ParserLevel.TRACE - } else if (!parser.isExpectedStartArrayToken) { - throw JsonParseException(parser, "Expected array", parser.currentLocation) - } else { - ParserLevel.JOB - } - } - ParserLevel.JOB -> { - when (parser.nextToken()) { - JsonToken.END_ARRAY -> level = ParserLevel.WORKFLOW - JsonToken.START_OBJECT -> { - parseJob() - hasJob = true - break - } - else -> throw JsonParseException(parser, "Unexpected token", parser.currentLocation) - } - } - } - } - - return hasJob - } - - override fun resolve(name: String): Int { - return when (name) { - TASK_ID -> colID - TASK_WORKFLOW_ID -> colWorkflowID - TASK_RUNTIME -> colRuntime - TASK_REQ_NCPUS -> colNproc - TASK_PARENTS -> colParents - TASK_CHILDREN -> colChildren - else -> -1 - } - } - - override fun isNull(index: Int): Boolean { - require(index in 0..colChildren) { "Invalid column value" } - return false - } - - override fun getBoolean(index: Int): Boolean { - throw IllegalArgumentException("Invalid column") - } - - override fun getInt(index: Int): Int { - checkActive() - return when (index) { - colNproc -> cores - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getLong(index: Int): Long { - throw IllegalArgumentException("Invalid column") - } - - override fun getFloat(index: Int): Float { - throw IllegalArgumentException("Invalid column") - } - - override fun getDouble(index: Int): Double { - throw IllegalArgumentException("Invalid column") - } - - override fun getString(index: Int): String? { - checkActive() - return when (index) { - colID -> id - colWorkflowID -> workflowId - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getUUID(index: Int): UUID? { - throw IllegalArgumentException("Invalid column") - } - - override fun getInstant(index: Int): Instant? { - throw IllegalArgumentException("Invalid column") - } - - override fun getDuration(index: Int): Duration? { - checkActive() - return when (index) { - colRuntime -> runtime - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun <T> getList( - index: Int, - elementType: Class<T>, - ): List<T>? { - throw IllegalArgumentException("Invalid column") - } - - override fun <T> getSet( - index: Int, - elementType: Class<T>, - ): Set<T>? { - checkActive() - return when (index) { - colParents -> typeParents.convertTo(parents, elementType) - colChildren -> typeChildren.convertTo(children, elementType) - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun <K, V> getMap( - index: Int, - keyType: Class<K>, - valueType: Class<V>, - ): Map<K, V>? { - throw IllegalArgumentException("Invalid column") - } - - override fun close() { - parser.close() - } - - /** - * Helper method to check if the reader is active. - */ - private fun checkActive() { - check(level != ParserLevel.TOP && !parser.isClosed) { "No active row. Did you call nextRow()?" } - } - - /** - * Parse the trace and seek until the workflow description. - */ - private fun seekWorkflow(): Boolean { - while (parser.nextValue() != JsonToken.END_OBJECT && !parser.isClosed) { - when (parser.currentName) { - "name" -> workflowId = parser.text - "workflow" -> return true - else -> parser.skipChildren() - } - } - - return false - } - - /** - * Parse the workflow description in the file and seek until the first job. - */ - private fun seekJobs(): Boolean { - while (parser.nextValue() != JsonToken.END_OBJECT) { - when (parser.currentName) { - "jobs" -> return true - else -> parser.skipChildren() - } - } - - return false - } - - /** - * Parse a single job in the file. - */ - private fun parseJob() { - while (parser.nextValue() != JsonToken.END_OBJECT) { - when (parser.currentName) { - "name" -> id = parser.text - "parents" -> parents = parseIds() - "children" -> children = parseIds() - "runtime" -> runtime = Duration.ofSeconds(parser.numberValue.toLong()) - "cores" -> cores = parser.floatValue.roundToInt() - else -> parser.skipChildren() - } - } - } - - /** - * Parse the parents/children of a job. - */ - private fun parseIds(): Set<String> { - if (!parser.isExpectedStartArrayToken) { - throw JsonParseException(parser, "Expected array", parser.currentLocation) - } - - val ids = mutableSetOf<String>() - - while (parser.nextToken() != JsonToken.END_ARRAY) { - if (parser.currentToken != JsonToken.VALUE_STRING) { - throw JsonParseException(parser, "Expected token", parser.currentLocation) - } - - ids.add(parser.valueAsString) - } - - return ids - } - - private enum class ParserLevel { - TOP, - TRACE, - WORKFLOW, - JOB, - } - - /** - * State fields for the parser. - */ - private var id: String? = null - private var workflowId: String? = null - private var runtime: Duration? = null - private var parents: Set<String>? = null - private var children: Set<String>? = null - private var cores = -1 - - private fun reset() { - id = null - runtime = null - parents = null - children = null - cores = -1 - } - - private val colID = 0 - private val colWorkflowID = 1 - private val colRuntime = 3 - private val colNproc = 4 - private val colParents = 5 - private val colChildren = 6 - - private val typeParents = TableColumnType.Set(TableColumnType.String) - private val typeChildren = TableColumnType.Set(TableColumnType.String) -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wfformat/WfFormatTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wfformat/WfFormatTraceFormat.kt deleted file mode 100644 index 2178fac6..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wfformat/WfFormatTraceFormat.kt +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.wfformat - -import com.fasterxml.jackson.core.JsonFactory -import org.opendc.trace.TableColumn -import org.opendc.trace.TableColumnType -import org.opendc.trace.TableReader -import org.opendc.trace.TableWriter -import org.opendc.trace.conv.TABLE_TASKS -import org.opendc.trace.conv.TASK_CHILDREN -import org.opendc.trace.conv.TASK_ID -import org.opendc.trace.conv.TASK_PARENTS -import org.opendc.trace.conv.TASK_REQ_NCPUS -import org.opendc.trace.conv.TASK_RUNTIME -import org.opendc.trace.conv.TASK_WORKFLOW_ID -import org.opendc.trace.spi.TableDetails -import org.opendc.trace.spi.TraceFormat -import java.nio.file.Path - -/** - * A [TraceFormat] implementation for the WfCommons workload trace format. - */ -public class WfFormatTraceFormat : TraceFormat { - /** - * The [JsonFactory] that is used to created JSON parsers. - */ - private val factory = JsonFactory() - - override val name: String = "wfformat" - - override fun create(path: Path) { - throw UnsupportedOperationException("Writing not supported for this format") - } - - override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS) - - override fun getDetails( - path: Path, - table: String, - ): TableDetails { - return when (table) { - TABLE_TASKS -> - TableDetails( - listOf( - TableColumn(TASK_ID, TableColumnType.String), - TableColumn(TASK_WORKFLOW_ID, TableColumnType.String), - TableColumn(TASK_RUNTIME, TableColumnType.Duration), - TableColumn(TASK_REQ_NCPUS, TableColumnType.Int), - TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)), - TableColumn(TASK_CHILDREN, TableColumnType.Set(TableColumnType.String)), - ), - ) - else -> throw IllegalArgumentException("Table $table not supported") - } - } - - override fun newReader( - path: Path, - table: String, - projection: List<String>?, - ): TableReader { - return when (table) { - TABLE_TASKS -> WfFormatTaskTableReader(factory.createParser(path.toFile())) - else -> throw IllegalArgumentException("Table $table not supported") - } - } - - override fun newWriter( - path: Path, - table: String, - ): TableWriter { - throw UnsupportedOperationException("Writing not supported for this format") - } -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceStateTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/FragmentTableReader.kt index d474e0ec..947746c6 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/FragmentTableReader.kt @@ -20,17 +20,14 @@ * SOFTWARE. */ -package org.opendc.trace.formats.opendc +package org.opendc.trace.formats.workload import org.opendc.trace.TableReader -import org.opendc.trace.conv.resourceCpuCount -import org.opendc.trace.conv.resourceGpuCount -import org.opendc.trace.conv.resourceID -import org.opendc.trace.conv.resourceStateCpuUsage -import org.opendc.trace.conv.resourceStateDuration -import org.opendc.trace.conv.resourceStateGpuUsage -import org.opendc.trace.conv.resourceStateTimestamp -import org.opendc.trace.formats.opendc.parquet.ResourceState +import org.opendc.trace.conv.FRAGMENT_CPU_USAGE +import org.opendc.trace.conv.FRAGMENT_DURATION +import org.opendc.trace.conv.FRAGMENT_GPU_USAGE +import org.opendc.trace.conv.TASK_ID +import org.opendc.trace.formats.workload.parquet.Fragment import org.opendc.trace.util.parquet.LocalParquetReader import java.time.Duration import java.time.Instant @@ -39,11 +36,11 @@ import java.util.UUID /** * A [TableReader] implementation for the OpenDC virtual machine trace format. */ -internal class OdcVmResourceStateTableReader(private val reader: LocalParquetReader<ResourceState>) : TableReader { +internal class FragmentTableReader(private val reader: LocalParquetReader<Fragment>) : TableReader { /** * The current record. */ - private var record: ResourceState? = null + private var record: Fragment? = null override fun nextRow(): Boolean { try { @@ -58,23 +55,16 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea } private val colID = 0 - private val colTimestamp = 1 - private val colDuration = 2 - private val colCpuCount = 3 - private val colCpuUsage = 4 - private val colGpuCount = 5 - private val colGpuUsage = 6 - private val colMemoryCapacity = 7 + private val colDuration = 1 + private val colCpuUsage = 2 + private val colGpuUsage = 3 override fun resolve(name: String): Int { return when (name) { - resourceID -> colID - resourceStateTimestamp -> colTimestamp - resourceStateDuration -> colDuration - resourceCpuCount -> colCpuCount - resourceStateCpuUsage -> colCpuUsage - resourceGpuCount -> colGpuCount - resourceStateGpuUsage -> colGpuUsage + TASK_ID -> colID + FRAGMENT_DURATION -> colDuration + FRAGMENT_CPU_USAGE -> colCpuUsage + FRAGMENT_GPU_USAGE -> colGpuUsage else -> -1 } } @@ -91,8 +81,7 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea override fun getInt(index: Int): Int { val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { - colCpuCount -> record.cpuCount - colGpuCount -> record.gpuCount + colID -> record.id else -> throw IllegalArgumentException("Invalid column or type [index $index]") } } @@ -115,12 +104,7 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea } override fun getString(index: Int): String { - val record = checkNotNull(record) { "Reader in invalid state" } - - return when (index) { - colID -> record.id - else -> throw IllegalArgumentException("Invalid column index $index") - } + throw IllegalArgumentException("Invalid column index $index") } override fun getUUID(index: Int): UUID? { @@ -128,12 +112,7 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea } override fun getInstant(index: Int): Instant { - val record = checkNotNull(record) { "Reader in invalid state" } - - return when (index) { - colTimestamp -> record.timestamp - else -> throw IllegalArgumentException("Invalid column index $index") - } + throw IllegalArgumentException("Invalid column index $index") } override fun getDuration(index: Int): Duration { diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceStateTableWriter.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/FragmentTableWriter.kt index c6f117d2..33cd9e17 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceStateTableWriter.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/FragmentTableWriter.kt @@ -20,18 +20,15 @@ * SOFTWARE. */ -package org.opendc.trace.formats.opendc +package org.opendc.trace.formats.workload import org.apache.parquet.hadoop.ParquetWriter import org.opendc.trace.TableWriter -import org.opendc.trace.conv.resourceCpuCount -import org.opendc.trace.conv.resourceGpuCount -import org.opendc.trace.conv.resourceID -import org.opendc.trace.conv.resourceStateCpuUsage -import org.opendc.trace.conv.resourceStateDuration -import org.opendc.trace.conv.resourceStateGpuUsage -import org.opendc.trace.conv.resourceStateTimestamp -import org.opendc.trace.formats.opendc.parquet.ResourceState +import org.opendc.trace.conv.FRAGMENT_CPU_USAGE +import org.opendc.trace.conv.FRAGMENT_DURATION +import org.opendc.trace.conv.FRAGMENT_GPU_USAGE +import org.opendc.trace.conv.TASK_ID +import org.opendc.trace.formats.workload.parquet.Fragment import java.time.Duration import java.time.Instant import java.util.UUID @@ -39,27 +36,21 @@ import java.util.UUID /** * A [TableWriter] implementation for the OpenDC virtual machine trace format. */ -internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter<ResourceState>) : TableWriter { +internal class FragmentTableWriter(private val writer: ParquetWriter<Fragment>) : TableWriter { /** * The current state for the record that is being written. */ private var localIsActive = false - private var localID: String = "" - private var localTimestamp: Instant = Instant.MIN + private var localID: Int = -99 private var localDuration: Duration = Duration.ZERO - private var localCpuCount: Int = 0 private var localCpuUsage: Double = Double.NaN - private var localGpuCount: Int = 0 private var localGpuUsage: Double = Double.NaN override fun startRow() { localIsActive = true - localID = "" - localTimestamp = Instant.MIN + localID = -99 localDuration = Duration.ZERO - localCpuCount = 0 localCpuUsage = Double.NaN - localGpuCount = 0 localGpuUsage = Double.NaN } @@ -67,23 +58,19 @@ internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter<R check(localIsActive) { "No active row" } localIsActive = false - check(lastId != localID || localTimestamp >= lastTimestamp) { "Records need to be ordered by (id, timestamp)" } + check(lastId != localID) { "Records need to be ordered by (id, timestamp)" } - writer.write(ResourceState(localID, localTimestamp, localDuration, localCpuCount, localCpuUsage, localGpuCount, localGpuUsage)) + writer.write(Fragment(localID, localDuration, localCpuUsage, localGpuUsage)) lastId = localID - lastTimestamp = localTimestamp } override fun resolve(name: String): Int { return when (name) { - resourceID -> colID - resourceStateTimestamp -> colTimestamp - resourceStateDuration -> colDuration - resourceCpuCount -> colCpuCount - resourceStateCpuUsage -> colCpuUsage - resourceGpuCount -> colGpuCount - resourceStateGpuUsage -> colGpuUsage + TASK_ID -> colID + FRAGMENT_DURATION -> colDuration + FRAGMENT_CPU_USAGE -> colCpuUsage + FRAGMENT_GPU_USAGE -> colGpuUsage else -> -1 } } @@ -101,8 +88,7 @@ internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter<R ) { check(localIsActive) { "No active row" } when (index) { - colCpuCount -> localCpuCount = value - colGpuCount -> localGpuCount = value + colID -> localID = value else -> throw IllegalArgumentException("Invalid column or type [index $index]") } } @@ -137,12 +123,7 @@ internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter<R index: Int, value: String, ) { - check(localIsActive) { "No active row" } - - when (index) { - colID -> localID = value - else -> throw IllegalArgumentException("Invalid column or type [index $index]") - } + throw IllegalArgumentException("Invalid column or type [index $index]") } override fun setUUID( @@ -156,12 +137,7 @@ internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter<R index: Int, value: Instant, ) { - check(localIsActive) { "No active row" } - - when (index) { - colTimestamp -> localTimestamp = value - else -> throw IllegalArgumentException("Invalid column or type [index $index]") - } + throw IllegalArgumentException("Invalid column or type [index $index]") } override fun setDuration( @@ -208,14 +184,10 @@ internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter<R /** * Last column values that are used to check for correct partitioning. */ - private var lastId: String? = null - private var lastTimestamp: Instant = Instant.MAX + private var lastId: Int? = null private val colID = 0 - private val colTimestamp = 1 - private val colDuration = 2 - private val colCpuCount = 3 - private val colCpuUsage = 4 - private val colGpuCount = 5 - private val colGpuUsage = 6 + private val colDuration = 1 + private val colCpuUsage = 2 + private val colGpuUsage = 3 } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableReader.kt index 495a5d75..6c700b0c 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableReader.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableReader.kt @@ -20,23 +20,24 @@ * SOFTWARE. */ -package org.opendc.trace.formats.opendc +package org.opendc.trace.formats.workload import org.opendc.trace.TableColumnType import org.opendc.trace.TableReader -import org.opendc.trace.conv.resourceChildren -import org.opendc.trace.conv.resourceCpuCapacity -import org.opendc.trace.conv.resourceCpuCount -import org.opendc.trace.conv.resourceDeadline -import org.opendc.trace.conv.resourceDuration -import org.opendc.trace.conv.resourceGpuCapacity -import org.opendc.trace.conv.resourceGpuCount -import org.opendc.trace.conv.resourceID -import org.opendc.trace.conv.resourceMemCapacity -import org.opendc.trace.conv.resourceNature -import org.opendc.trace.conv.resourceParents -import org.opendc.trace.conv.resourceSubmissionTime -import org.opendc.trace.formats.opendc.parquet.Resource +import org.opendc.trace.conv.TASK_CHILDREN +import org.opendc.trace.conv.TASK_CPU_CAPACITY +import org.opendc.trace.conv.TASK_CPU_COUNT +import org.opendc.trace.conv.TASK_DEADLINE +import org.opendc.trace.conv.TASK_DURATION +import org.opendc.trace.conv.TASK_GPU_CAPACITY +import org.opendc.trace.conv.TASK_GPU_COUNT +import org.opendc.trace.conv.TASK_ID +import org.opendc.trace.conv.TASK_MEM_CAPACITY +import org.opendc.trace.conv.TASK_NAME +import org.opendc.trace.conv.TASK_NATURE +import org.opendc.trace.conv.TASK_PARENTS +import org.opendc.trace.conv.TASK_SUBMISSION_TIME +import org.opendc.trace.formats.workload.parquet.Task import org.opendc.trace.util.convertTo import org.opendc.trace.util.parquet.LocalParquetReader import java.time.Duration @@ -46,11 +47,11 @@ import java.util.UUID /** * A [TableReader] implementation for the "resources table" in the OpenDC virtual machine trace format. */ -internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<Resource>) : TableReader { +internal class TaskTableReader(private val reader: LocalParquetReader<Task>) : TableReader { /** * The current record. */ - private var record: Resource? = null + private var record: Task? = null override fun nextRow(): Boolean { try { @@ -65,35 +66,37 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R } private val colID = 0 - private val colSubmissionTime = 1 - private val colDurationTime = 2 - private val colCpuCount = 3 - private val colCpuCapacity = 4 - private val colMemCapacity = 5 - private val colGpuCapacity = 6 - private val colGpuCount = 7 - private val colParents = 8 - private val colChildren = 9 - private val colNature = 10 - private val colDeadline = 11 - - private val typeParents = TableColumnType.Set(TableColumnType.String) - private val typeChildren = TableColumnType.Set(TableColumnType.String) + private val colName = 1 + private val colSubmissionTime = 2 + private val colDurationTime = 3 + private val colCpuCount = 4 + private val colCpuCapacity = 5 + private val colMemCapacity = 6 + private val colGpuCapacity = 7 + private val colGpuCount = 8 + private val colParents = 9 + private val colChildren = 10 + private val colNature = 11 + private val colDeadline = 12 + + private val typeParents = TableColumnType.Set(TableColumnType.Int) + private val typeChildren = TableColumnType.Set(TableColumnType.Int) override fun resolve(name: String): Int { return when (name) { - resourceID -> colID - resourceSubmissionTime -> colSubmissionTime - resourceDuration -> colDurationTime - resourceCpuCount -> colCpuCount - resourceCpuCapacity -> colCpuCapacity - resourceMemCapacity -> colMemCapacity - resourceGpuCount -> colGpuCount - resourceGpuCapacity -> colGpuCapacity - resourceParents -> colParents - resourceChildren -> colChildren - resourceNature -> colNature - resourceDeadline -> colDeadline + TASK_ID -> colID + TASK_NAME -> colName + TASK_SUBMISSION_TIME -> colSubmissionTime + TASK_DURATION -> colDurationTime + TASK_CPU_COUNT -> colCpuCount + TASK_CPU_CAPACITY -> colCpuCapacity + TASK_MEM_CAPACITY -> colMemCapacity + TASK_GPU_COUNT -> colGpuCount + TASK_GPU_CAPACITY -> colGpuCapacity + TASK_PARENTS -> colParents + TASK_CHILDREN -> colChildren + TASK_NATURE -> colNature + TASK_DEADLINE -> colDeadline else -> -1 } } @@ -117,6 +120,7 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { + colID -> record.id colCpuCount -> record.cpuCount colGpuCount -> record.gpuCount else -> throw IllegalArgumentException("Invalid column") @@ -151,7 +155,7 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { - colID -> record.id + colName -> record.name colNature -> record.nature else -> throw IllegalArgumentException("Invalid column") } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableWriter.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableWriter.kt index 022e288a..39be36c1 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableWriter.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableWriter.kt @@ -20,23 +20,24 @@ * SOFTWARE. */ -package org.opendc.trace.formats.opendc +package org.opendc.trace.formats.workload import org.apache.parquet.hadoop.ParquetWriter import org.opendc.trace.TableWriter -import org.opendc.trace.conv.resourceChildren -import org.opendc.trace.conv.resourceCpuCapacity -import org.opendc.trace.conv.resourceCpuCount -import org.opendc.trace.conv.resourceDeadline -import org.opendc.trace.conv.resourceDuration -import org.opendc.trace.conv.resourceGpuCapacity -import org.opendc.trace.conv.resourceGpuCount -import org.opendc.trace.conv.resourceID -import org.opendc.trace.conv.resourceMemCapacity -import org.opendc.trace.conv.resourceNature -import org.opendc.trace.conv.resourceParents -import org.opendc.trace.conv.resourceSubmissionTime -import org.opendc.trace.formats.opendc.parquet.Resource +import org.opendc.trace.conv.TASK_CHILDREN +import org.opendc.trace.conv.TASK_CPU_CAPACITY +import org.opendc.trace.conv.TASK_CPU_COUNT +import org.opendc.trace.conv.TASK_DEADLINE +import org.opendc.trace.conv.TASK_DURATION +import org.opendc.trace.conv.TASK_GPU_CAPACITY +import org.opendc.trace.conv.TASK_GPU_COUNT +import org.opendc.trace.conv.TASK_ID +import org.opendc.trace.conv.TASK_MEM_CAPACITY +import org.opendc.trace.conv.TASK_NAME +import org.opendc.trace.conv.TASK_NATURE +import org.opendc.trace.conv.TASK_PARENTS +import org.opendc.trace.conv.TASK_SUBMISSION_TIME +import org.opendc.trace.formats.workload.parquet.Task import java.time.Duration import java.time.Instant import java.util.UUID @@ -44,12 +45,13 @@ import java.util.UUID /** * A [TableWriter] implementation for the OpenDC virtual machine trace format. */ -internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resource>) : TableWriter { +internal class TaskTableWriter(private val writer: ParquetWriter<Task>) : TableWriter { /** * The current state for the record that is being written. */ private var localIsActive = false - private var localId: String = "" + private var localId: Int = -99 + private var localName: String = "" private var localSubmissionTime: Instant = Instant.MIN private var localDuration: Long = 0L private var localCpuCount: Int = 0 @@ -57,14 +59,15 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour private var localMemCapacity: Double = Double.NaN private var localGpuCount: Int = 0 private var localGpuCapacity: Double = Double.NaN - private var localParents = mutableSetOf<String>() - private var localChildren = mutableSetOf<String>() + private var localParents = mutableSetOf<Int>() + private var localChildren = mutableSetOf<Int>() private var localNature: String? = null private var localDeadline: Long = -1 override fun startRow() { localIsActive = true - localId = "" + localId = -99 + localName = "" localSubmissionTime = Instant.MIN localDuration = 0L localCpuCount = 0 @@ -82,8 +85,9 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour check(localIsActive) { "No active row" } localIsActive = false writer.write( - Resource( + Task( localId, + localName, localSubmissionTime, localDuration, localCpuCount, @@ -101,18 +105,19 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour override fun resolve(name: String): Int { return when (name) { - resourceID -> colID - resourceSubmissionTime -> colSubmissionTime - resourceDuration -> colDuration - resourceCpuCount -> colCpuCount - resourceCpuCapacity -> colCpuCapacity - resourceMemCapacity -> colMemCapacity - resourceGpuCount -> colGpuCount - resourceGpuCapacity -> colGpuCapacity - resourceParents -> colParents - resourceChildren -> colChildren - resourceNature -> colNature - resourceDeadline -> colDeadline + TASK_ID -> colID + TASK_NAME -> colID + TASK_SUBMISSION_TIME -> colSubmissionTime + TASK_DURATION -> colDuration + TASK_CPU_COUNT -> colCpuCount + TASK_CPU_CAPACITY -> colCpuCapacity + TASK_MEM_CAPACITY -> colMemCapacity + TASK_GPU_COUNT -> colGpuCount + TASK_GPU_CAPACITY -> colGpuCapacity + TASK_PARENTS -> colParents + TASK_CHILDREN -> colChildren + TASK_NATURE -> colNature + TASK_DEADLINE -> colDeadline else -> -1 } } @@ -130,6 +135,7 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour ) { check(localIsActive) { "No active row" } when (index) { + colID -> localId = value colCpuCount -> localCpuCount = value colGpuCount -> localGpuCount = value else -> throw IllegalArgumentException("Invalid column or type [index $index]") @@ -174,7 +180,7 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour ) { check(localIsActive) { "No active row" } when (index) { - colID -> localId = value + colName -> localName = value colNature -> localNature = value else -> throw IllegalArgumentException("Invalid column index $index") } @@ -235,15 +241,16 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour } private val colID = 0 - private val colSubmissionTime = 1 - private val colDuration = 2 - private val colCpuCount = 3 - private val colCpuCapacity = 4 - private val colMemCapacity = 5 - private val colGpuCount = 6 - private val colGpuCapacity = 7 - private val colParents = 8 - private val colChildren = 9 - private val colNature = 10 - private val colDeadline = 11 + private val colName = 1 + private val colSubmissionTime = 2 + private val colDuration = 3 + private val colCpuCount = 4 + private val colCpuCapacity = 5 + private val colMemCapacity = 6 + private val colGpuCount = 7 + private val colGpuCapacity = 8 + private val colParents = 9 + private val colChildren = 10 + private val colNature = 11 + private val colDeadline = 12 } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/WorkloadTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/WorkloadTraceFormat.kt new file mode 100644 index 00000000..7af0650e --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/WorkloadTraceFormat.kt @@ -0,0 +1,165 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.formats.workload + +import org.apache.parquet.column.ParquetProperties +import org.apache.parquet.hadoop.ParquetFileWriter +import org.apache.parquet.hadoop.metadata.CompressionCodecName +import org.opendc.trace.TableColumn +import org.opendc.trace.TableColumnType +import org.opendc.trace.TableReader +import org.opendc.trace.TableWriter +import org.opendc.trace.conv.FRAGMENT_CPU_USAGE +import org.opendc.trace.conv.FRAGMENT_DURATION +import org.opendc.trace.conv.TABLE_FRAGMENTS +import org.opendc.trace.conv.TABLE_TASKS +import org.opendc.trace.conv.TASK_CHILDREN +import org.opendc.trace.conv.TASK_CPU_CAPACITY +import org.opendc.trace.conv.TASK_CPU_COUNT +import org.opendc.trace.conv.TASK_DEADLINE +import org.opendc.trace.conv.TASK_DURATION +import org.opendc.trace.conv.TASK_GPU_CAPACITY +import org.opendc.trace.conv.TASK_GPU_COUNT +import org.opendc.trace.conv.TASK_ID +import org.opendc.trace.conv.TASK_MEM_CAPACITY +import org.opendc.trace.conv.TASK_NATURE +import org.opendc.trace.conv.TASK_PARENTS +import org.opendc.trace.conv.TASK_SUBMISSION_TIME +import org.opendc.trace.formats.workload.parquet.FragmentReadSupport +import org.opendc.trace.formats.workload.parquet.FragmentWriteSupport +import org.opendc.trace.formats.workload.parquet.TaskReadSupport +import org.opendc.trace.formats.workload.parquet.TaskWriteSupport +import org.opendc.trace.spi.TableDetails +import org.opendc.trace.spi.TraceFormat +import org.opendc.trace.util.parquet.LocalParquetReader +import org.opendc.trace.util.parquet.LocalParquetWriter +import java.nio.file.Files +import java.nio.file.Path + +/** + * A [TraceFormat] implementation of the OpenDC virtual machine trace format. + */ +public class WorkloadTraceFormat : TraceFormat { + /** + * The name of this trace format. + */ + override val name: String = "workload" + + override fun create(path: Path) { + // Construct directory containing the trace files + Files.createDirectories(path) + + val tables = getTables(path) + + for (table in tables) { + val writer = newWriter(path, table) + writer.close() + } + } + + override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS, TABLE_FRAGMENTS) + + override fun getDetails( + path: Path, + table: String, + ): TableDetails { + return when (table) { + TABLE_TASKS -> + TableDetails( + listOf( + TableColumn(TASK_ID, TableColumnType.String), + TableColumn(TASK_SUBMISSION_TIME, TableColumnType.Instant), + TableColumn(TASK_DURATION, TableColumnType.Long), + TableColumn(TASK_CPU_COUNT, TableColumnType.Int), + TableColumn(TASK_CPU_CAPACITY, TableColumnType.Double), + TableColumn(TASK_MEM_CAPACITY, TableColumnType.Double), + TableColumn(TASK_GPU_COUNT, TableColumnType.Int), + TableColumn(TASK_GPU_CAPACITY, TableColumnType.Double), + TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)), + TableColumn(TASK_CHILDREN, TableColumnType.Set(TableColumnType.String)), + TableColumn(TASK_NATURE, TableColumnType.String), + TableColumn(TASK_DEADLINE, TableColumnType.Long), + ), + ) + TABLE_FRAGMENTS -> + TableDetails( + listOf( + TableColumn(TASK_ID, TableColumnType.String), + TableColumn(FRAGMENT_DURATION, TableColumnType.Duration), + TableColumn(TASK_CPU_COUNT, TableColumnType.Int), + TableColumn(FRAGMENT_CPU_USAGE, TableColumnType.Double), + ), + ) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newReader( + path: Path, + table: String, + projection: List<String>?, + ): TableReader { + return when (table) { + TABLE_TASKS -> { + val reader = LocalParquetReader(path.resolve("tasks.parquet"), TaskReadSupport(projection)) + TaskTableReader(reader) + } + TABLE_FRAGMENTS -> { + val reader = LocalParquetReader(path.resolve("fragments.parquet"), FragmentReadSupport(projection)) + FragmentTableReader(reader) + } + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newWriter( + path: Path, + table: String, + ): TableWriter { + return when (table) { + TABLE_TASKS -> { + val writer = + LocalParquetWriter.builder(path.resolve("tasks.parquet"), TaskWriteSupport()) + .withCompressionCodec(CompressionCodecName.ZSTD) + .withPageWriteChecksumEnabled(true) + .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .build() + TaskTableWriter(writer) + } + TABLE_FRAGMENTS -> { + val writer = + LocalParquetWriter.builder(path.resolve("fragments.parquet"), FragmentWriteSupport()) + .withCompressionCodec(CompressionCodecName.ZSTD) + .withDictionaryEncoding("id", true) + .withBloomFilterEnabled("id", true) + .withPageWriteChecksumEnabled(true) + .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .build() + FragmentTableWriter(writer) + } + else -> throw IllegalArgumentException("Table $table not supported") + } + } +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceState.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/Fragment.kt index 10fc6be4..44385088 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceState.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/Fragment.kt @@ -20,17 +20,13 @@ * SOFTWARE. */ -package org.opendc.trace.formats.opendc.parquet +package org.opendc.trace.formats.workload.parquet import java.time.Duration -import java.time.Instant -internal class ResourceState( - val id: String, - val timestamp: Instant, +internal class Fragment( + val id: Int, val duration: Duration, - val cpuCount: Int, val cpuUsage: Double, - val gpuCount: Int, val gpuUsage: Double, ) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentReadSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentReadSupport.kt new file mode 100644 index 00000000..3fa914bc --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentReadSupport.kt @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.formats.workload.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.api.InitContext +import org.apache.parquet.hadoop.api.ReadSupport +import org.apache.parquet.io.api.RecordMaterializer +import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.Types +import org.opendc.trace.TableColumn +import org.opendc.trace.conv.FRAGMENT_CPU_USAGE +import org.opendc.trace.conv.FRAGMENT_DURATION +import org.opendc.trace.conv.TASK_ID + +/** + * A [ReadSupport] instance for [Fragment] objects. + */ +internal class FragmentReadSupport(private val projection: List<String>?) : ReadSupport<Fragment>() { + /** + * Mapping from field names to [TableColumn]s. + */ + private val fieldMap = + mapOf( + "id" to TASK_ID, + "duration" to FRAGMENT_DURATION, + "cpuUsage" to FRAGMENT_CPU_USAGE, + "cpu_usage" to FRAGMENT_CPU_USAGE, + ) + + override fun init(context: InitContext): ReadContext { + val projectedSchema = + if (projection != null) { + Types.buildMessage() + .apply { + val projectionSet = projection.toSet() + + for (field in FRAGMENT_SCHEMA.fields) { + val col = fieldMap[field.name] ?: continue + if (col in projectionSet) { + addField(field) + } + } + } + .named(FRAGMENT_SCHEMA.name) + } else { + FRAGMENT_SCHEMA + } + + return ReadContext(projectedSchema) + } + + override fun prepareForRead( + configuration: Configuration, + keyValueMetaData: Map<String, String>, + fileSchema: MessageType, + readContext: ReadContext, + ): RecordMaterializer<Fragment> = FragmentRecordMaterializer(readContext.requestedSchema) +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateRecordMaterializer.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentRecordMaterializer.kt index ee5e56aa..7902cab1 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateRecordMaterializer.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentRecordMaterializer.kt @@ -20,9 +20,8 @@ * SOFTWARE. */ -package org.opendc.trace.formats.opendc.parquet +package org.opendc.trace.formats.workload.parquet -import org.apache.parquet.io.api.Binary import org.apache.parquet.io.api.Converter import org.apache.parquet.io.api.GroupConverter import org.apache.parquet.io.api.PrimitiveConverter @@ -32,13 +31,13 @@ import java.time.Duration import java.time.Instant /** - * A [RecordMaterializer] for [ResourceState] records. + * A [RecordMaterializer] for [Fragment] records. */ -internal class ResourceStateRecordMaterializer(schema: MessageType) : RecordMaterializer<ResourceState>() { +internal class FragmentRecordMaterializer(schema: MessageType) : RecordMaterializer<Fragment>() { /** * State of current record being read. */ - private var localId = "" + private var localId = -99 private var localTimestamp = Instant.MIN private var localDuration = Duration.ZERO private var localCpuCount = 0 @@ -59,8 +58,8 @@ internal class ResourceStateRecordMaterializer(schema: MessageType) : RecordMate when (type.name) { "id" -> object : PrimitiveConverter() { - override fun addBinary(value: Binary) { - localId = value.toStringUsingUTF8() + override fun addInt(value: Int) { + localId = value } } "timestamp", "time" -> @@ -104,8 +103,7 @@ internal class ResourceStateRecordMaterializer(schema: MessageType) : RecordMate } override fun start() { - localId = "" - localTimestamp = Instant.MIN + localId = -99 localDuration = Duration.ZERO localCpuCount = 0 localCpuUsage = 0.0 @@ -118,14 +116,11 @@ internal class ResourceStateRecordMaterializer(schema: MessageType) : RecordMate override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex] } - override fun getCurrentRecord(): ResourceState = - ResourceState( + override fun getCurrentRecord(): Fragment = + Fragment( localId, - localTimestamp, localDuration, - localCpuCount, localCpuUsage, - localGpuCount, localGpuUsage, ) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentSchemas.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentSchemas.kt new file mode 100644 index 00000000..cd499e7e --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentSchemas.kt @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2025 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.formats.workload.parquet + +import org.apache.parquet.schema.LogicalTypeAnnotation +import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.PrimitiveType +import org.apache.parquet.schema.Types + +private val FRAGMENT_SCHEMA_v1: MessageType = + Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("timestamp"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("duration"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("cpu_count"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_usage"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT32) + .named("gpu_count"), + Types + .optional(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("gpu_usage"), + ) + .named("resource_state") + +private val FRAGMENT_SCHEMA_v2: MessageType = + Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("duration"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_usage"), + Types + .optional(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("gpu_usage"), + ) + .named("resource_state") + +/** + * Parquet read schema for the "resource states" table in the trace. + */ +public val FRAGMENT_SCHEMA: MessageType = FRAGMENT_SCHEMA_v2 diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateWriteSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentWriteSupport.kt index 58c43916..e6b7ba4f 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateWriteSupport.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentWriteSupport.kt @@ -20,11 +20,10 @@ * SOFTWARE. */ -package org.opendc.trace.formats.opendc.parquet +package org.opendc.trace.formats.workload.parquet import org.apache.hadoop.conf.Configuration import org.apache.parquet.hadoop.api.WriteSupport -import org.apache.parquet.io.api.Binary import org.apache.parquet.io.api.RecordConsumer import org.apache.parquet.schema.LogicalTypeAnnotation import org.apache.parquet.schema.MessageType @@ -32,9 +31,9 @@ import org.apache.parquet.schema.PrimitiveType import org.apache.parquet.schema.Types /** - * Support for writing [Resource] instances to Parquet format. + * Support for writing [Task] instances to Parquet format. */ -internal class ResourceStateWriteSupport : WriteSupport<ResourceState>() { +internal class FragmentWriteSupport : WriteSupport<Fragment>() { /** * The current active record consumer. */ @@ -48,32 +47,24 @@ internal class ResourceStateWriteSupport : WriteSupport<ResourceState>() { this.recordConsumer = recordConsumer } - override fun write(record: ResourceState) { + override fun write(record: Fragment) { write(recordConsumer, record) } private fun write( consumer: RecordConsumer, - record: ResourceState, + record: Fragment, ) { consumer.startMessage() consumer.startField("id", 0) - consumer.addBinary(Binary.fromCharSequence(record.id)) + consumer.addInteger(record.id) consumer.endField("id", 0) - consumer.startField("timestamp", 1) - consumer.addLong(record.timestamp.toEpochMilli()) - consumer.endField("timestamp", 1) - consumer.startField("duration", 2) consumer.addLong(record.duration.toMillis()) consumer.endField("duration", 2) - consumer.startField("cpu_count", 3) - consumer.addInteger(record.cpuCount) - consumer.endField("cpu_count", 3) - consumer.startField("cpu_usage", 4) consumer.addDouble(record.cpuUsage) consumer.endField("cpu_usage", 4) @@ -101,9 +92,6 @@ internal class ResourceStateWriteSupport : WriteSupport<ResourceState>() { .required(PrimitiveType.PrimitiveTypeName.INT64) .named("duration"), Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("cpu_count"), - Types .required(PrimitiveType.PrimitiveTypeName.DOUBLE) .named("cpu_usage"), ) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/Resource.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/Task.kt index d727920a..f661d5a9 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/Resource.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/Task.kt @@ -20,15 +20,16 @@ * SOFTWARE. */ -package org.opendc.trace.formats.opendc.parquet +package org.opendc.trace.formats.workload.parquet import java.time.Instant /** * A description of a resource in a trace. */ -internal data class Resource( - val id: String, +internal data class Task( + val id: Int, + val name: String, val submissionTime: Instant, val durationTime: Long, val cpuCount: Int, @@ -36,8 +37,8 @@ internal data class Resource( val memCapacity: Double, val gpuCount: Int = 0, val gpuCapacity: Double = 0.0, - val parents: Set<String> = emptySet(), - val children: Set<String> = emptySet(), + val parents: Set<Int> = emptySet(), + val children: Set<Int> = emptySet(), val nature: String? = null, val deadline: Long = -1, ) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskReadSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskReadSupport.kt new file mode 100644 index 00000000..4bbb18ac --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskReadSupport.kt @@ -0,0 +1,101 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.formats.workload.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.api.InitContext +import org.apache.parquet.hadoop.api.ReadSupport +import org.apache.parquet.io.api.RecordMaterializer +import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.Types +import org.opendc.trace.TableColumn +import org.opendc.trace.conv.TASK_CHILDREN +import org.opendc.trace.conv.TASK_CPU_CAPACITY +import org.opendc.trace.conv.TASK_CPU_COUNT +import org.opendc.trace.conv.TASK_DEADLINE +import org.opendc.trace.conv.TASK_DURATION +import org.opendc.trace.conv.TASK_GPU_CAPACITY +import org.opendc.trace.conv.TASK_GPU_COUNT +import org.opendc.trace.conv.TASK_ID +import org.opendc.trace.conv.TASK_MEM_CAPACITY +import org.opendc.trace.conv.TASK_NAME +import org.opendc.trace.conv.TASK_NATURE +import org.opendc.trace.conv.TASK_PARENTS +import org.opendc.trace.conv.TASK_SUBMISSION_TIME + +/** + * A [ReadSupport] instance for [Task] objects. + */ +internal class TaskReadSupport(private val projection: List<String>?) : ReadSupport<Task>() { + /** + * Mapping from field names to [TableColumn]s. + */ + private val fieldMap = + mapOf( + "id" to TASK_ID, + "name" to TASK_NAME, + "submissionTime" to TASK_SUBMISSION_TIME, + "submission_time" to TASK_SUBMISSION_TIME, + "duration" to TASK_DURATION, + "maxCores" to TASK_CPU_COUNT, + "cpu_count" to TASK_CPU_COUNT, + "cpu_capacity" to TASK_CPU_CAPACITY, + "requiredMemory" to TASK_MEM_CAPACITY, + "mem_capacity" to TASK_MEM_CAPACITY, + "gpu_count" to TASK_GPU_COUNT, + "gpu_capacity" to TASK_GPU_CAPACITY, + "parents" to TASK_PARENTS, + "children" to TASK_CHILDREN, + "nature" to TASK_NATURE, + "deadline" to TASK_DEADLINE, + ) + + override fun init(context: InitContext): ReadContext { + val projectedSchema = + if (projection != null) { + Types.buildMessage() + .apply { + val projectionSet = projection.toSet() + + for (field in TASK_SCHEMA.fields) { + val col = fieldMap[field.name] ?: continue + if (col in projectionSet) { + addField(field) + } + } + } + .named(TASK_SCHEMA.name) + } else { + TASK_SCHEMA + } + + return ReadContext(projectedSchema) + } + + override fun prepareForRead( + configuration: Configuration, + keyValueMetaData: Map<String, String>, + fileSchema: MessageType, + readContext: ReadContext, + ): RecordMaterializer<Task> = TaskRecordMaterializer(readContext.requestedSchema) +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceRecordMaterializer.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskRecordMaterializer.kt index f9493721..12dc54b7 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceRecordMaterializer.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskRecordMaterializer.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.trace.formats.opendc.parquet +package org.opendc.trace.formats.workload.parquet import org.apache.parquet.io.api.Binary import org.apache.parquet.io.api.Converter @@ -31,13 +31,14 @@ import org.apache.parquet.schema.MessageType import java.time.Instant /** - * A [RecordMaterializer] for [Resource] records. + * A [RecordMaterializer] for [Task] records. */ -internal class ResourceRecordMaterializer(schema: MessageType) : RecordMaterializer<Resource>() { +internal class TaskRecordMaterializer(schema: MessageType) : RecordMaterializer<Task>() { /** * State of current record being read. */ - private var localId = "" + private var localId = -99 + private var localName = "" private var localSubmissionTime = Instant.MIN private var localDuration = 0L private var localCpuCount = 0 @@ -45,8 +46,8 @@ internal class ResourceRecordMaterializer(schema: MessageType) : RecordMateriali private var localMemCapacity = 0.0 private var localGpuCount = 0 private var localGpuCapacity = 0.0 - private var localParents = mutableSetOf<String>() - private var localChildren = mutableSetOf<String>() + private var localParents = mutableSetOf<Int>() + private var localChildren = mutableSetOf<Int>() private var localNature: String? = null private var localDeadline = -1L @@ -63,8 +64,14 @@ internal class ResourceRecordMaterializer(schema: MessageType) : RecordMateriali when (type.name) { "id" -> object : PrimitiveConverter() { + override fun addInt(value: Int) { + localId = value + } + } + "name" -> + object : PrimitiveConverter() { override fun addBinary(value: Binary) { - localId = value.toStringUsingUTF8() + localName = value.toStringUsingUTF8() } } "submission_time", "submissionTime" -> @@ -132,7 +139,8 @@ internal class ResourceRecordMaterializer(schema: MessageType) : RecordMateriali } override fun start() { - localId = "" + localId = -99 + localName = "" localSubmissionTime = Instant.MIN localDuration = 0L localCpuCount = 0 @@ -151,9 +159,10 @@ internal class ResourceRecordMaterializer(schema: MessageType) : RecordMateriali override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex] } - override fun getCurrentRecord(): Resource = - Resource( + override fun getCurrentRecord(): Task = + Task( localId, + localName, localSubmissionTime, localDuration, localCpuCount, @@ -172,12 +181,11 @@ internal class ResourceRecordMaterializer(schema: MessageType) : RecordMateriali /** * Helper class to convert parent and child relations and add them to [relations]. */ - private class RelationConverter(private val relations: MutableSet<String>) : GroupConverter() { + private class RelationConverter(private val relations: MutableSet<Int>) : GroupConverter() { private val entryConverter = object : PrimitiveConverter() { - override fun addBinary(value: Binary) { - val str = value.toStringUsingUTF8() - relations.add(str) + override fun addInt(value: Int) { + relations.add(value) } } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskSchemas.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskSchemas.kt new file mode 100644 index 00000000..f7f5e953 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskSchemas.kt @@ -0,0 +1,166 @@ +/* + * Copyright (c) 2025 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.formats.workload.parquet + +import org.apache.parquet.schema.LogicalTypeAnnotation +import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.PrimitiveType +import org.apache.parquet.schema.Type +import org.apache.parquet.schema.Types + +private val TASK_SCHEMA_V1: MessageType = + Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("submission_time"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("duration"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("cpu_count"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_capacity"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("mem_capacity"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT32) + .named("gpu_count"), + Types + .optional(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("gpu_capacity"), + Types + .buildGroup(Type.Repetition.OPTIONAL) + .addField( + Types.repeatedGroup() + .addField( + Types.optional(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("element"), + ) + .named("list"), + ) + .`as`(LogicalTypeAnnotation.listType()) + .named("parents"), + Types + .buildGroup(Type.Repetition.OPTIONAL) + .addField( + Types.repeatedGroup() + .addField( + Types.optional(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("element"), + ) + .named("list"), + ) + .`as`(LogicalTypeAnnotation.listType()) + .named("children"), + Types + .optional(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("nature"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .named("deadline"), + ) + .named("resource") + +private val TASK_SCHEMA_V2: MessageType = + Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("id"), + Types + .optional(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("name"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("submission_time"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("duration"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("cpu_count"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_capacity"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("mem_capacity"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT32) + .named("gpu_count"), + Types + .optional(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("gpu_capacity"), + Types + .buildGroup(Type.Repetition.OPTIONAL) + .addField( + Types.repeatedGroup() + .addField( + Types.optional( + PrimitiveType.PrimitiveTypeName.INT32, + ) + .named("element"), + ) + .named("list"), + ) + .`as`(LogicalTypeAnnotation.listType()) + .named("parents"), + Types + .buildGroup(Type.Repetition.OPTIONAL) + .addField( + Types.repeatedGroup() + .addField( + Types.optional( + PrimitiveType.PrimitiveTypeName.INT32, + ) + .named("element"), + ) + .named("list"), + ) + .`as`(LogicalTypeAnnotation.listType()) + .named("children"), + Types + .optional(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("nature"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .named("deadline"), + ) + .named("resource") + +public val TASK_SCHEMA: MessageType = TASK_SCHEMA_V2 diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceWriteSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskWriteSupport.kt index c3e984fb..a7ce62b8 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceWriteSupport.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskWriteSupport.kt @@ -20,47 +20,43 @@ * SOFTWARE. */ -package org.opendc.trace.formats.opendc.parquet +package org.opendc.trace.formats.workload.parquet import org.apache.hadoop.conf.Configuration import org.apache.parquet.hadoop.api.WriteSupport import org.apache.parquet.io.api.Binary import org.apache.parquet.io.api.RecordConsumer -import org.apache.parquet.schema.LogicalTypeAnnotation -import org.apache.parquet.schema.MessageType -import org.apache.parquet.schema.PrimitiveType -import org.apache.parquet.schema.Types import kotlin.math.roundToLong /** - * Support for writing [Resource] instances to Parquet format. + * Support for writing [Task] instances to Parquet format. */ -internal class ResourceWriteSupport : WriteSupport<Resource>() { +internal class TaskWriteSupport : WriteSupport<Task>() { /** * The current active record consumer. */ private lateinit var recordConsumer: RecordConsumer override fun init(configuration: Configuration): WriteContext { - return WriteContext(WRITE_SCHEMA, emptyMap()) + return WriteContext(TASK_SCHEMA, emptyMap()) } override fun prepareForWrite(recordConsumer: RecordConsumer) { this.recordConsumer = recordConsumer } - override fun write(record: Resource) { + override fun write(record: Task) { write(recordConsumer, record) } private fun write( consumer: RecordConsumer, - record: Resource, + record: Task, ) { consumer.startMessage() consumer.startField("id", 0) - consumer.addBinary(Binary.fromCharSequence(record.id)) + consumer.addInteger(record.id) consumer.endField("id", 0) consumer.startField("submission_time", 1) @@ -97,43 +93,4 @@ internal class ResourceWriteSupport : WriteSupport<Resource>() { consumer.endMessage() } - - companion object { - /** - * Parquet schema for the "resources" table in the trace. - */ - @JvmStatic - val WRITE_SCHEMA: MessageType = - Types.buildMessage() - .addFields( - Types - .required(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("id"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("submission_time"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("duration"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("cpu_count"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("cpu_capacity"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("mem_capacity"), - Types - .optional(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("nature"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .named("deadline"), - ) - .named("resource") - } } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/WtfTaskTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/WtfTaskTableReader.kt deleted file mode 100644 index 95582388..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/WtfTaskTableReader.kt +++ /dev/null @@ -1,187 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.wtf - -import org.opendc.trace.TableColumnType -import org.opendc.trace.TableReader -import org.opendc.trace.conv.TASK_CHILDREN -import org.opendc.trace.conv.TASK_GROUP_ID -import org.opendc.trace.conv.TASK_ID -import org.opendc.trace.conv.TASK_PARENTS -import org.opendc.trace.conv.TASK_REQ_NCPUS -import org.opendc.trace.conv.TASK_RUNTIME -import org.opendc.trace.conv.TASK_SUBMIT_TIME -import org.opendc.trace.conv.TASK_USER_ID -import org.opendc.trace.conv.TASK_WAIT_TIME -import org.opendc.trace.conv.TASK_WORKFLOW_ID -import org.opendc.trace.util.convertTo -import org.opendc.trace.util.parquet.LocalParquetReader -import org.opendc.trace.wtf.parquet.Task -import java.time.Duration -import java.time.Instant -import java.util.UUID - -/** - * A [TableReader] implementation for the WTF format. - */ -internal class WtfTaskTableReader(private val reader: LocalParquetReader<Task>) : TableReader { - /** - * The current record. - */ - private var record: Task? = null - - override fun nextRow(): Boolean { - try { - val record = reader.read() - this.record = record - - return record != null - } catch (e: Throwable) { - this.record = null - throw e - } - } - - private val colID = 0 - private val colWorkflowID = 1 - private val colSubmitTime = 2 - private val colWaitTime = 3 - private val colRuntime = 4 - private val colReqNcpus = 5 - private val colParents = 6 - private val colChildren = 7 - private val colGroupID = 8 - private val colUserID = 9 - - private val typeParents = TableColumnType.Set(TableColumnType.String) - private val typeChildren = TableColumnType.Set(TableColumnType.String) - - override fun resolve(name: String): Int { - return when (name) { - TASK_ID -> colID - TASK_WORKFLOW_ID -> colWorkflowID - TASK_SUBMIT_TIME -> colSubmitTime - TASK_WAIT_TIME -> colWaitTime - TASK_RUNTIME -> colRuntime - TASK_REQ_NCPUS -> colReqNcpus - TASK_PARENTS -> colParents - TASK_CHILDREN -> colChildren - TASK_GROUP_ID -> colGroupID - TASK_USER_ID -> colUserID - else -> -1 - } - } - - override fun isNull(index: Int): Boolean { - require(index in colID..colUserID) { "Invalid column index" } - return false - } - - override fun getBoolean(index: Int): Boolean { - throw IllegalArgumentException("Invalid column") - } - - override fun getInt(index: Int): Int { - val record = checkNotNull(record) { "Reader in invalid state" } - - return when (index) { - colReqNcpus -> record.requestedCpus - colGroupID -> record.groupId - colUserID -> record.userId - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getLong(index: Int): Long { - throw IllegalArgumentException("Invalid column") - } - - override fun getFloat(index: Int): Float { - throw IllegalArgumentException("Invalid column") - } - - override fun getDouble(index: Int): Double { - throw IllegalArgumentException("Invalid column") - } - - override fun getString(index: Int): String { - val record = checkNotNull(record) { "Reader in invalid state" } - return when (index) { - colID -> record.id - colWorkflowID -> record.workflowId - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getUUID(index: Int): UUID? { - throw IllegalArgumentException("Invalid column") - } - - override fun getInstant(index: Int): Instant { - val record = checkNotNull(record) { "Reader in invalid state" } - return when (index) { - colSubmitTime -> record.submitTime - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getDuration(index: Int): Duration { - val record = checkNotNull(record) { "Reader in invalid state" } - return when (index) { - colWaitTime -> record.waitTime - colRuntime -> record.runtime - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun <T> getList( - index: Int, - elementType: Class<T>, - ): List<T>? { - throw IllegalArgumentException("Invalid column") - } - - override fun <T> getSet( - index: Int, - elementType: Class<T>, - ): Set<T>? { - val record = checkNotNull(record) { "Reader in invalid state" } - return when (index) { - colParents -> typeParents.convertTo(record.parents, elementType) - colChildren -> typeChildren.convertTo(record.children, elementType) - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun <K, V> getMap( - index: Int, - keyType: Class<K>, - valueType: Class<V>, - ): Map<K, V>? { - throw IllegalArgumentException("Invalid column") - } - - override fun close() { - reader.close() - } -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/WtfTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/WtfTraceFormat.kt deleted file mode 100644 index 1386d2ef..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/WtfTraceFormat.kt +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.wtf - -import org.opendc.trace.TableColumn -import org.opendc.trace.TableColumnType -import org.opendc.trace.TableReader -import org.opendc.trace.TableWriter -import org.opendc.trace.conv.TABLE_TASKS -import org.opendc.trace.conv.TASK_CHILDREN -import org.opendc.trace.conv.TASK_GROUP_ID -import org.opendc.trace.conv.TASK_ID -import org.opendc.trace.conv.TASK_PARENTS -import org.opendc.trace.conv.TASK_REQ_NCPUS -import org.opendc.trace.conv.TASK_RUNTIME -import org.opendc.trace.conv.TASK_SUBMIT_TIME -import org.opendc.trace.conv.TASK_USER_ID -import org.opendc.trace.conv.TASK_WAIT_TIME -import org.opendc.trace.conv.TASK_WORKFLOW_ID -import org.opendc.trace.spi.TableDetails -import org.opendc.trace.spi.TraceFormat -import org.opendc.trace.util.parquet.LocalParquetReader -import org.opendc.trace.wtf.parquet.TaskReadSupport -import java.nio.file.Path - -/** - * A [TraceFormat] implementation for the Workflow Trace Format (WTF). - */ -public class WtfTraceFormat : TraceFormat { - override val name: String = "wtf" - - override fun create(path: Path) { - throw UnsupportedOperationException("Writing not supported for this format") - } - - override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS) - - override fun getDetails( - path: Path, - table: String, - ): TableDetails { - return when (table) { - TABLE_TASKS -> - TableDetails( - listOf( - TableColumn(TASK_ID, TableColumnType.String), - TableColumn(TASK_WORKFLOW_ID, TableColumnType.String), - TableColumn(TASK_SUBMIT_TIME, TableColumnType.Instant), - TableColumn(TASK_WAIT_TIME, TableColumnType.Duration), - TableColumn(TASK_RUNTIME, TableColumnType.Duration), - TableColumn(TASK_REQ_NCPUS, TableColumnType.Int), - TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)), - TableColumn(TASK_CHILDREN, TableColumnType.Set(TableColumnType.String)), - TableColumn(TASK_GROUP_ID, TableColumnType.Int), - TableColumn(TASK_USER_ID, TableColumnType.Int), - ), - ) - else -> throw IllegalArgumentException("Table $table not supported") - } - } - - override fun newReader( - path: Path, - table: String, - projection: List<String>?, - ): TableReader { - return when (table) { - TABLE_TASKS -> { - val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), TaskReadSupport(projection), strictTyping = false) - WtfTaskTableReader(reader) - } - else -> throw IllegalArgumentException("Table $table not supported") - } - } - - override fun newWriter( - path: Path, - table: String, - ): TableWriter { - throw UnsupportedOperationException("Writing not supported for this format") - } -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/Task.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/Task.kt deleted file mode 100644 index a1db0cab..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/Task.kt +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.wtf.parquet - -import java.time.Duration -import java.time.Instant - -/** - * A task in the Workflow Trace Format. - */ -internal data class Task( - val id: String, - val workflowId: String, - val submitTime: Instant, - val waitTime: Duration, - val runtime: Duration, - val requestedCpus: Int, - val groupId: Int, - val userId: Int, - val parents: Set<String>, - val children: Set<String>, -) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/TaskReadSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/TaskReadSupport.kt deleted file mode 100644 index 1f9c506d..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/TaskReadSupport.kt +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.wtf.parquet - -import org.apache.hadoop.conf.Configuration -import org.apache.parquet.hadoop.api.InitContext -import org.apache.parquet.hadoop.api.ReadSupport -import org.apache.parquet.io.api.RecordMaterializer -import org.apache.parquet.schema.LogicalTypeAnnotation -import org.apache.parquet.schema.MessageType -import org.apache.parquet.schema.PrimitiveType -import org.apache.parquet.schema.Type -import org.apache.parquet.schema.Types -import org.opendc.trace.conv.TASK_CHILDREN -import org.opendc.trace.conv.TASK_GROUP_ID -import org.opendc.trace.conv.TASK_ID -import org.opendc.trace.conv.TASK_PARENTS -import org.opendc.trace.conv.TASK_REQ_NCPUS -import org.opendc.trace.conv.TASK_RUNTIME -import org.opendc.trace.conv.TASK_SUBMIT_TIME -import org.opendc.trace.conv.TASK_USER_ID -import org.opendc.trace.conv.TASK_WAIT_TIME -import org.opendc.trace.conv.TASK_WORKFLOW_ID - -/** - * A [ReadSupport] instance for [Task] objects. - * - * @param projection The projection of the table to read. - */ -internal class TaskReadSupport(private val projection: List<String>?) : ReadSupport<Task>() { - /** - * Mapping of table columns to their Parquet column names. - */ - private val colMap = - mapOf( - TASK_ID to "id", - TASK_WORKFLOW_ID to "workflow_id", - TASK_SUBMIT_TIME to "ts_submit", - TASK_WAIT_TIME to "wait_time", - TASK_RUNTIME to "runtime", - TASK_REQ_NCPUS to "resource_amount_requested", - TASK_PARENTS to "parents", - TASK_CHILDREN to "children", - TASK_GROUP_ID to "group_id", - TASK_USER_ID to "user_id", - ) - - override fun init(context: InitContext): ReadContext { - val projectedSchema = - if (projection != null) { - Types.buildMessage() - .apply { - val fieldByName = READ_SCHEMA.fields.associateBy { it.name } - - for (col in projection) { - val fieldName = colMap[col] ?: continue - addField(fieldByName.getValue(fieldName)) - } - } - .named(READ_SCHEMA.name) - } else { - READ_SCHEMA - } - return ReadContext(projectedSchema) - } - - override fun prepareForRead( - configuration: Configuration, - keyValueMetaData: Map<String, String>, - fileSchema: MessageType, - readContext: ReadContext, - ): RecordMaterializer<Task> = TaskRecordMaterializer(readContext.requestedSchema) - - companion object { - /** - * Parquet read schema for the "tasks" table in the trace. - */ - @JvmStatic - val READ_SCHEMA: MessageType = - Types.buildMessage() - .addFields( - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .named("id"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .named("workflow_id"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("ts_submit"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .named("wait_time"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .named("runtime"), - Types - .optional(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("resource_amount_requested"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT32) - .named("user_id"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT32) - .named("group_id"), - Types - .buildGroup(Type.Repetition.OPTIONAL) - .addField( - Types.repeatedGroup() - .addField(Types.optional(PrimitiveType.PrimitiveTypeName.INT64).named("item")) - .named("list"), - ) - .`as`(LogicalTypeAnnotation.listType()) - .named("children"), - Types - .buildGroup(Type.Repetition.OPTIONAL) - .addField( - Types.repeatedGroup() - .addField(Types.optional(PrimitiveType.PrimitiveTypeName.INT64).named("item")) - .named("list"), - ) - .`as`(LogicalTypeAnnotation.listType()) - .named("parents"), - ) - .named("task") - } -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/TaskRecordMaterializer.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/TaskRecordMaterializer.kt deleted file mode 100644 index 412a4f8b..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/TaskRecordMaterializer.kt +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.wtf.parquet - -import org.apache.parquet.io.api.Converter -import org.apache.parquet.io.api.GroupConverter -import org.apache.parquet.io.api.PrimitiveConverter -import org.apache.parquet.io.api.RecordMaterializer -import org.apache.parquet.schema.MessageType -import java.time.Duration -import java.time.Instant -import kotlin.math.roundToInt -import kotlin.math.roundToLong - -/** - * A [RecordMaterializer] for [Task] records. - */ -internal class TaskRecordMaterializer(schema: MessageType) : RecordMaterializer<Task>() { - /** - * State of current record being read. - */ - private var localID = "" - private var localWorkflowID = "" - private var localSubmitTime = Instant.MIN - private var localWaitTime = Duration.ZERO - private var localRuntime = Duration.ZERO - private var localRequestedCpus = 0 - private var localGroupId = 0 - private var localUserId = 0 - private var localParents = mutableSetOf<String>() - private var localChildren = mutableSetOf<String>() - - /** - * Root converter for the record. - */ - private val root = - object : GroupConverter() { - /** - * The converters for the columns of the schema. - */ - private val converters = - schema.fields.map { type -> - when (type.name) { - "id" -> - object : PrimitiveConverter() { - override fun addLong(value: Long) { - localID = value.toString() - } - } - "workflow_id" -> - object : PrimitiveConverter() { - override fun addLong(value: Long) { - localWorkflowID = value.toString() - } - } - "ts_submit" -> - object : PrimitiveConverter() { - override fun addLong(value: Long) { - localSubmitTime = Instant.ofEpochMilli(value) - } - } - "wait_time" -> - object : PrimitiveConverter() { - override fun addLong(value: Long) { - localWaitTime = Duration.ofMillis(value) - } - } - "runtime" -> - object : PrimitiveConverter() { - override fun addLong(value: Long) { - localRuntime = Duration.ofMillis(value) - } - } - "resource_amount_requested" -> - object : PrimitiveConverter() { - override fun addDouble(value: Double) { - localRequestedCpus = value.roundToInt() - } - } - "group_id" -> - object : PrimitiveConverter() { - override fun addInt(value: Int) { - localGroupId = value - } - } - "user_id" -> - object : PrimitiveConverter() { - override fun addInt(value: Int) { - localUserId = value - } - } - "children" -> RelationConverter(localChildren) - "parents" -> RelationConverter(localParents) - else -> error("Unknown column $type") - } - } - - override fun start() { - localID = "" - localWorkflowID = "" - localSubmitTime = Instant.MIN - localWaitTime = Duration.ZERO - localRuntime = Duration.ZERO - localRequestedCpus = 0 - localGroupId = 0 - localUserId = 0 - localParents.clear() - localChildren.clear() - } - - override fun end() {} - - override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex] - } - - override fun getCurrentRecord(): Task = - Task( - localID, - localWorkflowID, - localSubmitTime, - localWaitTime, - localRuntime, - localRequestedCpus, - localGroupId, - localUserId, - localParents.toSet(), - localChildren.toSet(), - ) - - override fun getRootConverter(): GroupConverter = root - - /** - * Helper class to convert parent and child relations and add them to [relations]. - */ - private class RelationConverter(private val relations: MutableSet<String>) : GroupConverter() { - private val entryConverter = - object : PrimitiveConverter() { - override fun addLong(value: Long) { - relations.add(value.toString()) - } - - override fun addDouble(value: Double) { - relations.add(value.roundToLong().toString()) - } - } - - private val listConverter = - object : GroupConverter() { - override fun getConverter(fieldIndex: Int): Converter { - require(fieldIndex == 0) - return entryConverter - } - - override fun start() {} - - override fun end() {} - } - - override fun getConverter(fieldIndex: Int): Converter { - require(fieldIndex == 0) - return listConverter - } - - override fun start() {} - - override fun end() {} - } -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt index e586f90a..945d8f2f 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt @@ -24,15 +24,9 @@ package org.opendc.trace.spi import org.opendc.trace.TableReader import org.opendc.trace.TableWriter -import org.opendc.trace.azure.AzureTraceFormat -import org.opendc.trace.bitbrains.BitbrainsTraceFormat import org.opendc.trace.formats.carbon.CarbonTraceFormat import org.opendc.trace.formats.failure.FailureTraceFormat -import org.opendc.trace.formats.opendc.OdcVmTraceFormat -import org.opendc.trace.gwf.GwfTraceFormat -import org.opendc.trace.swf.SwfTraceFormat -import org.opendc.trace.wfformat.WfFormatTraceFormat -import org.opendc.trace.wtf.WtfTraceFormat +import org.opendc.trace.formats.workload.WorkloadTraceFormat import java.nio.file.Path import java.util.ServiceLoader @@ -122,15 +116,9 @@ public interface TraceFormat { @JvmStatic public fun byName(name: String): TraceFormat? { return when (name) { - "azure" -> AzureTraceFormat() - "bitbrains" -> BitbrainsTraceFormat() "carbon" -> CarbonTraceFormat() "failure" -> FailureTraceFormat() - "gwf" -> GwfTraceFormat() - "opendc-vm" -> OdcVmTraceFormat() - "swf" -> SwfTraceFormat() - "wfformat" -> WfFormatTraceFormat() - "wtf" -> WtfTraceFormat() + "workload" -> WorkloadTraceFormat() else -> null } } |
