From e22c97dcca7478d6941b78bdf7cd873bc0d23cdc Mon Sep 17 00:00:00 2001 From: Dante Niewenhuis Date: Tue, 22 Jul 2025 15:47:44 +0200 Subject: Updated workload schema (#360) --- .../kotlin/org/opendc/trace/conv/CarbonColumns.kt | 35 ++ .../opendc/trace/conv/CarbonIntensityColumns.kt | 35 -- .../org/opendc/trace/conv/FragmentColumns.kt | 45 +++ .../opendc/trace/conv/InterferenceGroupColumns.kt | 40 --- .../org/opendc/trace/conv/ResourceColumns.kt | 115 ------- .../org/opendc/trace/conv/ResourceStateColumns.kt | 103 ------ .../main/kotlin/org/opendc/trace/conv/Tables.kt | 19 +- .../kotlin/org/opendc/trace/conv/TaskColumns.kt | 56 ++-- .../formats/azure/AzureResourceStateTableReader.kt | 219 ------------- .../formats/azure/AzureResourceTableReader.kt | 246 -------------- .../opendc/trace/formats/azure/AzureTraceFormat.kt | 147 --------- .../BitbrainsExResourceStateTableReader.kt | 292 ----------------- .../formats/bitbrains/BitbrainsExTraceFormat.kt | 135 -------- .../bitbrains/BitbrainsResourceStateTableReader.kt | 365 --------------------- .../bitbrains/BitbrainsResourceTableReader.kt | 175 ---------- .../formats/bitbrains/BitbrainsTraceFormat.kt | 159 --------- .../trace/formats/carbon/CarbonTableReader.kt | 14 +- .../trace/formats/carbon/CarbonTraceFormat.kt | 20 +- .../trace/formats/carbon/parquet/CarbonFragment.kt | 33 ++ .../carbon/parquet/CarbonIntensityFragment.kt | 33 -- .../carbon/parquet/CarbonIntensityReadSupport.kt | 95 ------ .../parquet/CarbonIntensityRecordMaterializer.kt | 86 ----- .../formats/carbon/parquet/CarbonReadSupport.kt | 74 +++++ .../carbon/parquet/CarbonRecordMaterializer.kt | 86 +++++ .../trace/formats/carbon/parquet/CarbonSchemas.kt | 43 +++ .../formats/failure/parquet/FailureReadSupport.kt | 30 +- .../failure/parquet/FailureRecordMaterializer.kt | 2 +- .../formats/failure/parquet/FailureSchemas.kt | 44 +++ .../opendc/trace/formats/gwf/GwfTaskTableReader.kt | 286 ---------------- .../org/opendc/trace/formats/gwf/GwfTraceFormat.kt | 104 ------ .../opendc/OdcVmInterferenceJsonTableReader.kt | 225 ------------- .../opendc/OdcVmInterferenceJsonTableWriter.kt | 192 ----------- .../opendc/OdcVmResourceStateTableReader.kt | 175 ---------- .../opendc/OdcVmResourceStateTableWriter.kt | 221 ------------- .../formats/opendc/OdcVmResourceTableReader.kt | 209 ------------ .../formats/opendc/OdcVmResourceTableWriter.kt | 249 -------------- .../trace/formats/opendc/OdcVmTraceFormat.kt | 202 ------------ .../trace/formats/opendc/parquet/Resource.kt | 43 --- .../formats/opendc/parquet/ResourceReadSupport.kt | 214 ------------ .../opendc/parquet/ResourceRecordMaterializer.kt | 207 ------------ .../trace/formats/opendc/parquet/ResourceState.kt | 36 -- .../opendc/parquet/ResourceStateReadSupport.kt | 161 --------- .../parquet/ResourceStateRecordMaterializer.kt | 133 -------- .../opendc/parquet/ResourceStateWriteSupport.kt | 112 ------- .../formats/opendc/parquet/ResourceWriteSupport.kt | 139 -------- .../opendc/trace/formats/swf/SwfTaskTableReader.kt | 236 ------------- .../org/opendc/trace/formats/swf/SwfTraceFormat.kt | 100 ------ .../formats/wfformat/WfFormatTaskTableReader.kt | 314 ------------------ .../trace/formats/wfformat/WfFormatTraceFormat.kt | 95 ------ .../trace/formats/workload/FragmentTableReader.kt | 154 +++++++++ .../trace/formats/workload/FragmentTableWriter.kt | 193 +++++++++++ .../trace/formats/workload/TaskTableReader.kt | 213 ++++++++++++ .../trace/formats/workload/TaskTableWriter.kt | 256 +++++++++++++++ .../trace/formats/workload/WorkloadTraceFormat.kt | 165 ++++++++++ .../trace/formats/workload/parquet/Fragment.kt | 32 ++ .../workload/parquet/FragmentReadSupport.kt | 79 +++++ .../workload/parquet/FragmentRecordMaterializer.kt | 128 ++++++++ .../formats/workload/parquet/FragmentSchemas.kt | 80 +++++ .../workload/parquet/FragmentWriteSupport.kt | 100 ++++++ .../opendc/trace/formats/workload/parquet/Task.kt | 44 +++ .../formats/workload/parquet/TaskReadSupport.kt | 101 ++++++ .../workload/parquet/TaskRecordMaterializer.kt | 215 ++++++++++++ .../trace/formats/workload/parquet/TaskSchemas.kt | 166 ++++++++++ .../formats/workload/parquet/TaskWriteSupport.kt | 96 ++++++ .../opendc/trace/formats/wtf/WtfTaskTableReader.kt | 187 ----------- .../org/opendc/trace/formats/wtf/WtfTraceFormat.kt | 102 ------ .../org/opendc/trace/formats/wtf/parquet/Task.kt | 42 --- .../trace/formats/wtf/parquet/TaskReadSupport.kt | 148 --------- .../formats/wtf/parquet/TaskRecordMaterializer.kt | 188 ----------- .../kotlin/org/opendc/trace/spi/TraceFormat.kt | 16 +- 70 files changed, 2444 insertions(+), 6660 deletions(-) create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/CarbonColumns.kt delete mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/CarbonIntensityColumns.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/FragmentColumns.kt delete mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt delete mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt delete mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt delete mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureResourceStateTableReader.kt delete mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureResourceTableReader.kt delete mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureTraceFormat.kt delete mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsExResourceStateTableReader.kt delete mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsExTraceFormat.kt delete mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsResourceStateTableReader.kt delete mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsResourceTableReader.kt delete mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsTraceFormat.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonFragment.kt delete mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityFragment.kt delete mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityReadSupport.kt delete mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityRecordMaterializer.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonReadSupport.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonRecordMaterializer.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonSchemas.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/parquet/FailureSchemas.kt delete mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/gwf/GwfTaskTableReader.kt delete mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/gwf/GwfTraceFormat.kt delete mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmInterferenceJsonTableReader.kt delete mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmInterferenceJsonTableWriter.kt delete mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceStateTableReader.kt delete mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceStateTableWriter.kt delete mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableReader.kt delete mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableWriter.kt delete mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmTraceFormat.kt delete mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/Resource.kt delete mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceReadSupport.kt delete mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceRecordMaterializer.kt delete mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceState.kt delete mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateReadSupport.kt delete mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateRecordMaterializer.kt delete mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateWriteSupport.kt delete mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceWriteSupport.kt delete mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/swf/SwfTaskTableReader.kt delete mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/swf/SwfTraceFormat.kt delete mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wfformat/WfFormatTaskTableReader.kt delete mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wfformat/WfFormatTraceFormat.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/FragmentTableReader.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/FragmentTableWriter.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableReader.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableWriter.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/WorkloadTraceFormat.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/Fragment.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentReadSupport.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentRecordMaterializer.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentSchemas.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentWriteSupport.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/Task.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskReadSupport.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskRecordMaterializer.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskSchemas.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskWriteSupport.kt delete mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/WtfTaskTableReader.kt delete mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/WtfTraceFormat.kt delete mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/Task.kt delete mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/TaskReadSupport.kt delete mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/TaskRecordMaterializer.kt (limited to 'opendc-trace/opendc-trace-api/src/main') diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/CarbonColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/CarbonColumns.kt new file mode 100644 index 00000000..32cdd78b --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/CarbonColumns.kt @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("CarbonColumns") + +package org.opendc.trace.conv + +/** + * A column containing the timestamp of the carbon intensity measurement. + */ +public const val CARBON_TIMESTAMP: String = "timestamp" + +/** + * A column containing the intensity of the carbon when sampled. + */ +public const val CARBON_INTENSITY: String = "carbon_intensity" diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/CarbonIntensityColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/CarbonIntensityColumns.kt deleted file mode 100644 index de74c4fd..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/CarbonIntensityColumns.kt +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -@file:JvmName("CarbonIntensityColumns") - -package org.opendc.trace.conv - -/** - * A column containing the task identifier. - */ -public const val CARBON_INTENSITY_TIMESTAMP: String = "timestamp" - -/** - * A column containing the task identifier. - */ -public const val CARBON_INTENSITY_VALUE: String = "carbon_intensity" diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/FragmentColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/FragmentColumns.kt new file mode 100644 index 00000000..e0d01ef2 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/FragmentColumns.kt @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("FragmentColumns") + +package org.opendc.trace.conv + +/** + * Duration for the fragment. + */ +public const val FRAGMENT_DURATION: String = "duration" + +/** + * Total CPU usage during the fragment in MHz. + */ +public const val FRAGMENT_CPU_USAGE: String = "cpu_usage" + +/** + * Total GPU usage during the fragment in MHz. + */ +public const val FRAGMENT_GPU_USAGE: String = "gpu_usage" + +/** + * Memory usage during the fragment in KB. + */ +public const val FRAGMENT_MEM_USAGE: String = "mem_usage" diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt deleted file mode 100644 index fbbfdea9..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -@file:JvmName("InterferenceGroupColumns") - -package org.opendc.trace.conv - -/** - * Members of the interference group. - */ -public const val INTERFERENCE_GROUP_MEMBERS: String = "members" - -/** - * Target load after which the interference occurs. - */ -public const val INTERFERENCE_GROUP_TARGET: String = "target" - -/** - * Performance score when the interference occurs. - */ -public const val INTERFERENCE_GROUP_SCORE: String = "score" diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt deleted file mode 100644 index 3d0341b2..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -@file:JvmName("ResourceColumns") - -package org.opendc.trace.conv - -/** - * Identifier of the resource. - */ -@JvmField -public val resourceID: String = "id" - -/** - * The cluster to which the resource belongs. - */ -@JvmField -public val resourceClusterID: String = "cluster_id" - -/** - * Start time for the resource. - */ -@JvmField -public val resourceSubmissionTime: String = "submission_time" - -/** - * Carbon intensity of the resource. - */ -@JvmField -public val resourceCarbonIntensity: String = "carbon_intensity" - -/** - * End time for the resource. - */ -@JvmField -public val resourceDuration: String = "duration" - -/** - * Number of CPUs for the resource. - */ -@JvmField -public val resourceCpuCount: String = "cpu_count" - -/** - * Total CPU capacity of the resource in MHz. - */ -@JvmField -public val resourceCpuCapacity: String = "cpu_capacity" - -/** - * Memory capacity for the resource in KB. - */ -@JvmField -public val resourceMemCapacity: String = "mem_capacity" - -/** - * Number of GPU cores for the resource. - */ -@JvmField -public val resourceGpuCount: String = "gpu_count" - -/** - * Total GPU capacity of the resource in MHz. - */ -@JvmField -public val resourceGpuCapacity: String = "gpu_capacity" - -/** - * Total GPU memory capacity of the resource in MB. - */ -@JvmField -public val resourceGpuMemCapacity: String = "gpu_mem_capacity" - -/** - * The parents of the resource that need to be completed before this resource can be used. - */ -@JvmField -public val resourceParents: String = "parents" - -/** - * The children of the resource that cannot be started before this is completed. - */ -@JvmField -public val resourceChildren: String = "children" - -/** - * Nature of the task. Delayable, interruptible, etc. - */ -@JvmField -public val resourceNature: String = "nature" - -/** - * Deadline of the task. - */ -@JvmField -public val resourceDeadline: String = "deadline" diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt deleted file mode 100644 index f4ab7759..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -@file:JvmName("ResourceStateColumns") - -package org.opendc.trace.conv - -/** - * The timestamp at which the state was recorded. - */ -@JvmField -public val resourceStateTimestamp: String = "timestamp" - -/** - * Duration for the state. - */ -@JvmField -public val resourceStateDuration: String = "duration" - -/** - * A flag to indicate that the resource is powered on. - */ -@JvmField -public val resourceStatePoweredOn: String = "powered_on" - -/** - * Total CPU usage of the resource in MHz. - */ -@JvmField -public val resourceStateCpuUsage: String = "cpu_usage" - -/** - * Total CPU usage of the resource in percentage. - */ -@JvmField -public val resourceStateCpuUsagePct: String = "cpu_usage_pct" - -/** - * Total CPU demand of the resource in MHz. - */ -@JvmField -public val resourceStateCpuDemand: String = "cpu_demand" - -/** - * CPU ready percentage. - */ -@JvmField -public val resourceStateCpuReadyPct: String = "cpu_ready_pct" - -/** - * Memory usage of the resource in KB. - */ -@JvmField -public val resourceStateMemUsage: String = "mem_usage" - -/** - * Disk read throughput of the resource in KB/s. - */ -@JvmField -public val resourceStateDiskRead: String = "disk_read" - -/** - * Disk write throughput of the resource in KB/s. - */ -@JvmField -public val resourceStateDiskWrite: String = "disk_write" - -/** - * Network receive throughput of the resource in KB/s. - */ -@JvmField -public val resourceStateNetRx: String = "net_rx" - -/** - * Network transmit throughput of the resource in KB/s. - */ -@JvmField -public val resourceStateNetTx: String = "net_tx" - -/** - * Total GPU capacity of the resource in MHz. - */ -@JvmField -public val resourceStateGpuUsage: String = "gpu_usage" diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/Tables.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/Tables.kt index d4019f73..310d268a 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/Tables.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/Tables.kt @@ -24,31 +24,22 @@ package org.opendc.trace.conv -/** - * A table containing all workflows in a workload. - */ -public const val TABLE_WORKFLOWS: String = "workflows" - /** * A table containing all tasks in a workload. */ public const val TABLE_TASKS: String = "tasks" /** - * A table containing all resources in a workload. + * A table containing all resource states in a workload. */ -public const val TABLE_RESOURCES: String = "resources" +public const val TABLE_FRAGMENTS: String = "fragments" /** - * A table containing all resource states in a workload. + * A table containing the carbon intensities of the region */ -public const val TABLE_RESOURCE_STATES: String = "resource_states" +public const val TABLE_CARBON: String = "carbon" /** - * A table containing the groups of resources that interfere when run on the same execution platform. + * A table containing failures that can be injected during simulation. */ -public const val TABLE_INTERFERENCE_GROUPS: String = "interference_groups" - -public const val TABLE_CARBON_INTENSITIES: String = "carbon_intensities" - public const val TABLE_FAILURES: String = "failures" diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt index 6ca87a60..0df52c71 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt @@ -25,61 +25,71 @@ package org.opendc.trace.conv /** - * A column containing the task identifier. + * Identifier of the task. */ public const val TASK_ID: String = "id" /** - * A column containing the identifier of the workflow. + * Identifier of the task. */ -public const val TASK_WORKFLOW_ID: String = "workflow_id" +public const val TASK_NAME: String = "name" /** - * A column containing the submission time of the task. + * The time of submission of the task. */ -public const val TASK_SUBMIT_TIME: String = "submit_time" +public const val TASK_SUBMISSION_TIME: String = "submission_time" /** - * A column containing the wait time of the task. + * The duration of a task in ms */ -public const val TASK_WAIT_TIME: String = "wait_time" +public const val TASK_DURATION: String = "duration" /** - * A column containing the runtime time of the task. + * Number of CPUs for the task. */ -public const val TASK_RUNTIME: String = "runtime" +public const val TASK_CPU_COUNT: String = "cpu_count" /** - * A column containing the parents of a task. + * Total CPU capacity of the task in MHz. */ -public const val TASK_PARENTS: String = "parents" +public const val TASK_CPU_CAPACITY: String = "cpu_capacity" /** - * A column containing the children of a task. + * Memory capacity for the task in KB. */ -public const val TASK_CHILDREN: String = "children" +public const val TASK_MEM_CAPACITY: String = "mem_capacity" + +/** + * Number of GPU cores for the task. + */ +public const val TASK_GPU_COUNT: String = "gpu_count" /** - * A column containing the requested CPUs of a task. + * Total GPU capacity of the task in MHz. */ -public const val TASK_REQ_NCPUS: String = "req_ncpus" +public const val TASK_GPU_CAPACITY: String = "gpu_capacity" /** - * A column containing the allocated CPUs of a task. + * Total GPU memory capacity of the task in MB. */ -public const val TASK_ALLOC_NCPUS: String = "alloc_ncpus" +public const val TASK_GPU_MEM_CAPACITY: String = "gpu_mem_capacity" /** - * A column containing the status of a task. + * The parents of the task that need to be completed before this task can be used. */ -public const val TASK_STATUS: String = "status" +public const val TASK_PARENTS: String = "parents" + +/** + * The children of the task that cannot be started before this is completed. + */ +public const val TASK_CHILDREN: String = "children" /** - * A column containing the group id of a task. + * Nature of the task. Delayable, interruptible, etc. */ -public const val TASK_GROUP_ID: String = "group_id" +public const val TASK_NATURE: String = "nature" /** - * A column containing the user id of a task. + * Deadline of the task. */ -public const val TASK_USER_ID: String = "user_id" +public const val TASK_DEADLINE: String = "deadline" diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureResourceStateTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureResourceStateTableReader.kt deleted file mode 100644 index bcf6ff52..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureResourceStateTableReader.kt +++ /dev/null @@ -1,219 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.azure - -import com.fasterxml.jackson.core.JsonToken -import com.fasterxml.jackson.dataformat.csv.CsvParser -import com.fasterxml.jackson.dataformat.csv.CsvSchema -import org.opendc.trace.TableReader -import org.opendc.trace.conv.resourceID -import org.opendc.trace.conv.resourceStateCpuUsagePct -import org.opendc.trace.conv.resourceStateTimestamp -import java.time.Duration -import java.time.Instant -import java.util.UUID - -/** - * A [TableReader] for the Azure v1 VM resource state table. - */ -internal class AzureResourceStateTableReader(private val parser: CsvParser) : TableReader { - /** - * A flag to indicate whether a single row has been read already. - */ - private var isStarted = false - - init { - parser.schema = schema - } - - override fun nextRow(): Boolean { - if (!isStarted) { - isStarted = true - } - - reset() - - if (!nextStart()) { - return false - } - - while (true) { - val token = parser.nextValue() - - if (token == null || token == JsonToken.END_OBJECT) { - break - } - - when (parser.currentName) { - "timestamp" -> timestamp = Instant.ofEpochSecond(parser.longValue) - "vm id" -> id = parser.text - "CPU avg cpu" -> cpuUsagePct = (parser.doubleValue / 100.0) // Convert from % to [0, 1] - } - } - - return true - } - - private val colID = 0 - private val colTimestamp = 1 - private val colCpuUsagePct = 2 - - override fun resolve(name: String): Int { - return when (name) { - resourceID -> colID - resourceStateTimestamp -> colTimestamp - resourceStateCpuUsagePct -> colCpuUsagePct - else -> -1 - } - } - - override fun isNull(index: Int): Boolean { - require(index in 0..colCpuUsagePct) { "Invalid column index" } - return false - } - - override fun getBoolean(index: Int): Boolean { - throw IllegalArgumentException("Invalid column") - } - - override fun getInt(index: Int): Int { - throw IllegalArgumentException("Invalid column") - } - - override fun getLong(index: Int): Long { - throw IllegalArgumentException("Invalid column") - } - - override fun getFloat(index: Int): Float { - throw IllegalArgumentException("Invalid column") - } - - override fun getDouble(index: Int): Double { - checkActive() - return when (index) { - colCpuUsagePct -> cpuUsagePct - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getString(index: Int): String? { - checkActive() - return when (index) { - colID -> id - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getUUID(index: Int): UUID? { - throw IllegalArgumentException("Invalid column") - } - - override fun getInstant(index: Int): Instant? { - checkActive() - return when (index) { - colTimestamp -> timestamp - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getDuration(index: Int): Duration? { - throw IllegalArgumentException("Invalid column") - } - - override fun getList( - index: Int, - elementType: Class, - ): List? { - throw IllegalArgumentException("Invalid column") - } - - override fun getMap( - index: Int, - keyType: Class, - valueType: Class, - ): Map? { - throw IllegalArgumentException("Invalid column") - } - - override fun getSet( - index: Int, - elementType: Class, - ): Set? { - throw IllegalArgumentException("Invalid column") - } - - override fun close() { - parser.close() - } - - /** - * Helper method to check if the reader is active. - */ - private fun checkActive() { - check(isStarted && !parser.isClosed) { "No active row. Did you call nextRow()?" } - } - - /** - * Advance the parser until the next object start. - */ - private fun nextStart(): Boolean { - var token = parser.nextValue() - - while (token != null && token != JsonToken.START_OBJECT) { - token = parser.nextValue() - } - - return token != null - } - - /** - * State fields of the reader. - */ - private var id: String? = null - private var timestamp: Instant? = null - private var cpuUsagePct = Double.NaN - - /** - * Reset the state. - */ - private fun reset() { - id = null - timestamp = null - cpuUsagePct = Double.NaN - } - - companion object { - /** - * The [CsvSchema] that is used to parse the trace. - */ - private val schema = - CsvSchema.builder() - .addColumn("timestamp", CsvSchema.ColumnType.NUMBER) - .addColumn("vm id", CsvSchema.ColumnType.STRING) - .addColumn("CPU min cpu", CsvSchema.ColumnType.NUMBER) - .addColumn("CPU max cpu", CsvSchema.ColumnType.NUMBER) - .addColumn("CPU avg cpu", CsvSchema.ColumnType.NUMBER) - .setAllowComments(true) - .build() - } -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureResourceTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureResourceTableReader.kt deleted file mode 100644 index 55f26fa6..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureResourceTableReader.kt +++ /dev/null @@ -1,246 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.azure - -import com.fasterxml.jackson.core.JsonToken -import com.fasterxml.jackson.dataformat.csv.CsvParser -import com.fasterxml.jackson.dataformat.csv.CsvSchema -import org.opendc.trace.TableReader -import org.opendc.trace.conv.resourceCpuCount -import org.opendc.trace.conv.resourceDuration -import org.opendc.trace.conv.resourceID -import org.opendc.trace.conv.resourceMemCapacity -import org.opendc.trace.conv.resourceSubmissionTime -import java.time.Duration -import java.time.Instant -import java.util.UUID - -/** - * A [TableReader] for the Azure v1 VM resources table. - */ -internal class AzureResourceTableReader(private val parser: CsvParser) : TableReader { - /** - * A flag to indicate whether a single row has been read already. - */ - private var isStarted = false - - init { - parser.schema = schema - } - - override fun nextRow(): Boolean { - if (!isStarted) { - isStarted = true - } - - reset() - - if (!nextStart()) { - return false - } - - while (true) { - val token = parser.nextValue() - - if (token == null || token == JsonToken.END_OBJECT) { - break - } - - when (parser.currentName) { - "vm id" -> id = parser.text - "timestamp vm created" -> startTime = Instant.ofEpochSecond(parser.longValue) - "timestamp vm deleted" -> stopTime = Instant.ofEpochSecond(parser.longValue) - "vm virtual core count" -> cpuCores = parser.intValue - "vm memory" -> memCapacity = parser.doubleValue * 1e6 // GB to KB - } - } - - return true - } - - private val colID = 0 - private val colStartTime = 1 - private val colStopTime = 2 - private val colCpuCount = 3 - private val colMemCapacity = 4 - - override fun resolve(name: String): Int { - return when (name) { - resourceID -> colID - resourceSubmissionTime -> colStartTime - resourceDuration -> colStopTime - resourceCpuCount -> colCpuCount - resourceMemCapacity -> colMemCapacity - else -> -1 - } - } - - override fun isNull(index: Int): Boolean { - require(index in 0..colMemCapacity) { "Invalid column index" } - return false - } - - override fun getBoolean(index: Int): Boolean { - throw IllegalArgumentException("Invalid column") - } - - override fun getInt(index: Int): Int { - checkActive() - return when (index) { - colCpuCount -> cpuCores - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getLong(index: Int): Long { - checkActive() - return when (index) { - colCpuCount -> cpuCores.toLong() - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getFloat(index: Int): Float { - throw IllegalArgumentException("Invalid column") - } - - override fun getDouble(index: Int): Double { - checkActive() - return when (index) { - colMemCapacity -> memCapacity - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getString(index: Int): String? { - checkActive() - return when (index) { - colID -> id - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getUUID(index: Int): UUID? { - throw IllegalArgumentException("Invalid column") - } - - override fun getInstant(index: Int): Instant? { - checkActive() - return when (index) { - colStartTime -> startTime - colStopTime -> stopTime - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getDuration(index: Int): Duration? { - throw IllegalArgumentException("Invalid column") - } - - override fun getList( - index: Int, - elementType: Class, - ): List? { - throw IllegalArgumentException("Invalid column") - } - - override fun getSet( - index: Int, - elementType: Class, - ): Set? { - throw IllegalArgumentException("Invalid column") - } - - override fun getMap( - index: Int, - keyType: Class, - valueType: Class, - ): Map? { - throw IllegalArgumentException("Invalid column") - } - - override fun close() { - parser.close() - } - - /** - * Helper method to check if the reader is active. - */ - private fun checkActive() { - check(isStarted && !parser.isClosed) { "No active row. Did you call nextRow()?" } - } - - /** - * Advance the parser until the next object start. - */ - private fun nextStart(): Boolean { - var token = parser.nextValue() - - while (token != null && token != JsonToken.START_OBJECT) { - token = parser.nextValue() - } - - return token != null - } - - /** - * State fields of the reader. - */ - private var id: String? = null - private var startTime: Instant? = null - private var stopTime: Instant? = null - private var cpuCores = -1 - private var memCapacity = Double.NaN - - /** - * Reset the state. - */ - private fun reset() { - id = null - startTime = null - stopTime = null - cpuCores = -1 - memCapacity = Double.NaN - } - - companion object { - /** - * The [CsvSchema] that is used to parse the trace. - */ - private val schema = - CsvSchema.builder() - .addColumn("vm id", CsvSchema.ColumnType.NUMBER) - .addColumn("subscription id", CsvSchema.ColumnType.STRING) - .addColumn("deployment id", CsvSchema.ColumnType.NUMBER) - .addColumn("timestamp vm created", CsvSchema.ColumnType.NUMBER) - .addColumn("timestamp vm deleted", CsvSchema.ColumnType.NUMBER) - .addColumn("max cpu", CsvSchema.ColumnType.NUMBER) - .addColumn("avg cpu", CsvSchema.ColumnType.NUMBER) - .addColumn("p95 cpu", CsvSchema.ColumnType.NUMBER) - .addColumn("vm category", CsvSchema.ColumnType.NUMBER) - .addColumn("vm virtual core count", CsvSchema.ColumnType.NUMBER) - .addColumn("vm memory", CsvSchema.ColumnType.NUMBER) - .setAllowComments(true) - .build() - } -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureTraceFormat.kt deleted file mode 100644 index 7ce1c11a..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureTraceFormat.kt +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.azure - -import com.fasterxml.jackson.dataformat.csv.CsvFactory -import com.fasterxml.jackson.dataformat.csv.CsvParser -import org.opendc.trace.TableColumn -import org.opendc.trace.TableColumnType -import org.opendc.trace.TableReader -import org.opendc.trace.TableWriter -import org.opendc.trace.conv.TABLE_RESOURCES -import org.opendc.trace.conv.TABLE_RESOURCE_STATES -import org.opendc.trace.conv.resourceCpuCount -import org.opendc.trace.conv.resourceDuration -import org.opendc.trace.conv.resourceID -import org.opendc.trace.conv.resourceMemCapacity -import org.opendc.trace.conv.resourceStateCpuUsagePct -import org.opendc.trace.conv.resourceStateTimestamp -import org.opendc.trace.conv.resourceSubmissionTime -import org.opendc.trace.spi.TableDetails -import org.opendc.trace.spi.TraceFormat -import org.opendc.trace.util.CompositeTableReader -import java.nio.file.Files -import java.nio.file.Path -import java.util.stream.Collectors -import java.util.zip.GZIPInputStream -import kotlin.io.path.inputStream -import kotlin.io.path.name - -/** - * A format implementation for the Azure v1 format. - */ -public class AzureTraceFormat : TraceFormat { - /** - * The name of this trace format. - */ - override val name: String = "azure" - - /** - * The [CsvFactory] used to create the parser. - */ - private val factory = - CsvFactory() - .enable(CsvParser.Feature.ALLOW_COMMENTS) - .enable(CsvParser.Feature.TRIM_SPACES) - - override fun create(path: Path) { - throw UnsupportedOperationException("Writing not supported for this format") - } - - override fun getTables(path: Path): List = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) - - override fun getDetails( - path: Path, - table: String, - ): TableDetails { - return when (table) { - TABLE_RESOURCES -> - TableDetails( - listOf( - TableColumn(resourceID, TableColumnType.String), - TableColumn(resourceSubmissionTime, TableColumnType.Instant), - TableColumn(resourceDuration, TableColumnType.Instant), - TableColumn(resourceCpuCount, TableColumnType.Int), - TableColumn(resourceMemCapacity, TableColumnType.Double), - ), - ) - TABLE_RESOURCE_STATES -> - TableDetails( - listOf( - TableColumn(resourceID, TableColumnType.String), - TableColumn(resourceStateTimestamp, TableColumnType.Instant), - TableColumn(resourceStateCpuUsagePct, TableColumnType.Double), - ), - ) - else -> throw IllegalArgumentException("Table $table not supported") - } - } - - override fun newReader( - path: Path, - table: String, - projection: List?, - ): TableReader { - return when (table) { - TABLE_RESOURCES -> { - val stream = GZIPInputStream(path.resolve("vmtable/vmtable.csv.gz").inputStream()) - AzureResourceTableReader(factory.createParser(stream)) - } - TABLE_RESOURCE_STATES -> newResourceStateReader(path) - else -> throw IllegalArgumentException("Table $table not supported") - } - } - - override fun newWriter( - path: Path, - table: String, - ): TableWriter { - throw UnsupportedOperationException("Writing not supported for this format") - } - - /** - * Construct a [TableReader] for reading over all VM CPU readings. - */ - private fun newResourceStateReader(path: Path): TableReader { - val partitions = - Files.walk(path.resolve("vm_cpu_readings"), 1) - .filter { !Files.isDirectory(it) && it.name.endsWith(".csv.gz") } - .collect(Collectors.toMap({ it.name.removeSuffix(".csv.gz") }, { it })) - .toSortedMap() - val it = partitions.iterator() - - return object : CompositeTableReader() { - override fun nextReader(): TableReader? { - return if (it.hasNext()) { - val (_, partPath) = it.next() - val stream = GZIPInputStream(partPath.inputStream()) - return AzureResourceStateTableReader(factory.createParser(stream)) - } else { - null - } - } - - override fun toString(): String = "AzureCompositeTableReader" - } - } -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsExResourceStateTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsExResourceStateTableReader.kt deleted file mode 100644 index 8387d1ed..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsExResourceStateTableReader.kt +++ /dev/null @@ -1,292 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.bitbrains - -import org.opendc.trace.TableReader -import org.opendc.trace.conv.resourceClusterID -import org.opendc.trace.conv.resourceCpuCapacity -import org.opendc.trace.conv.resourceCpuCount -import org.opendc.trace.conv.resourceID -import org.opendc.trace.conv.resourceMemCapacity -import org.opendc.trace.conv.resourceStateCpuDemand -import org.opendc.trace.conv.resourceStateCpuReadyPct -import org.opendc.trace.conv.resourceStateCpuUsage -import org.opendc.trace.conv.resourceStateCpuUsagePct -import org.opendc.trace.conv.resourceStateDiskRead -import org.opendc.trace.conv.resourceStateDiskWrite -import org.opendc.trace.conv.resourceStateTimestamp -import java.io.BufferedReader -import java.time.Duration -import java.time.Instant -import java.util.UUID - -/** - * A [TableReader] for the Bitbrains resource state table. - */ -internal class BitbrainsExResourceStateTableReader(private val reader: BufferedReader) : TableReader { - private var state = State.Pending - - override fun nextRow(): Boolean { - val state = state - if (state == State.Closed) { - return false - } else if (state == State.Pending) { - this.state = State.Active - } - - reset() - - var line: String? - var num = 0 - - while (true) { - line = reader.readLine() - - if (line == null) { - this.state = State.Closed - return false - } - - num++ - - if (line[0] == '#' || line.isBlank()) { - // Ignore empty lines or comments - continue - } - - break - } - - line = line!!.trim() - - val length = line.length - var col = 0 - var start: Int - var end = 0 - - while (end < length) { - // Trim all whitespace before the field - start = end - while (start < length && line[start].isWhitespace()) { - start++ - } - - end = line.indexOf(' ', start) - - if (end < 0) { - end = length - } - - val field = line.subSequence(start, end) as String - when (col++) { - colTimestamp -> timestamp = Instant.ofEpochSecond(field.toLong(10)) - colCpuUsage -> cpuUsage = field.toDouble() - colCpuDemand -> cpuDemand = field.toDouble() - colDiskRead -> diskRead = field.toDouble() - colDiskWrite -> diskWrite = field.toDouble() - colClusterID -> cluster = field.trim() - colNcpus -> cpuCores = field.toInt(10) - colCpuReadyPct -> cpuReadyPct = field.toDouble() - colPoweredOn -> poweredOn = field.toInt(10) == 1 - colCpuCapacity -> cpuCapacity = field.toDouble() - colID -> id = field.trim() - colMemCapacity -> memCapacity = field.toDouble() * 1000 // Convert from MB to KB - } - } - - return true - } - - override fun resolve(name: String): Int { - return when (name) { - resourceID -> colID - resourceClusterID -> colClusterID - resourceStateTimestamp -> colTimestamp - resourceCpuCount -> colNcpus - resourceCpuCapacity -> colCpuCapacity - resourceStateCpuUsage -> colCpuUsage - resourceStateCpuUsagePct -> colCpuUsagePct - resourceStateCpuDemand -> colCpuDemand - resourceStateCpuReadyPct -> colCpuReadyPct - resourceMemCapacity -> colMemCapacity - resourceStateDiskRead -> colDiskRead - resourceStateDiskWrite -> colDiskWrite - else -> -1 - } - } - - override fun isNull(index: Int): Boolean { - require(index in 0 until colMax) { "Invalid column index" } - return false - } - - override fun getBoolean(index: Int): Boolean { - check(state == State.Active) { "No active row" } - return when (index) { - colPoweredOn -> poweredOn - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getInt(index: Int): Int { - check(state == State.Active) { "No active row" } - return when (index) { - colNcpus -> cpuCores - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getLong(index: Int): Long { - throw IllegalArgumentException("Invalid column") - } - - override fun getFloat(index: Int): Float { - throw IllegalArgumentException("Invalid column") - } - - override fun getDouble(index: Int): Double { - check(state == State.Active) { "No active row" } - return when (index) { - colCpuCapacity -> cpuCapacity - colCpuUsage -> cpuUsage - colCpuUsagePct -> cpuUsage / cpuCapacity - colCpuReadyPct -> cpuReadyPct - colCpuDemand -> cpuDemand - colMemCapacity -> memCapacity - colDiskRead -> diskRead - colDiskWrite -> diskWrite - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getString(index: Int): String? { - check(state == State.Active) { "No active row" } - return when (index) { - colID -> id - colClusterID -> cluster - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getUUID(index: Int): UUID? { - throw IllegalArgumentException("Invalid column") - } - - override fun getInstant(index: Int): Instant? { - check(state == State.Active) { "No active row" } - return when (index) { - colTimestamp -> timestamp - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getDuration(index: Int): Duration? { - throw IllegalArgumentException("Invalid column") - } - - override fun getList( - index: Int, - elementType: Class, - ): List? { - throw IllegalArgumentException("Invalid column") - } - - override fun getSet( - index: Int, - elementType: Class, - ): Set? { - throw IllegalArgumentException("Invalid column") - } - - override fun getMap( - index: Int, - keyType: Class, - valueType: Class, - ): Map? { - throw IllegalArgumentException("Invalid column") - } - - override fun close() { - reader.close() - reset() - state = State.Closed - } - - /** - * State fields of the reader. - */ - private var id: String? = null - private var cluster: String? = null - private var timestamp: Instant? = null - private var cpuCores = -1 - private var cpuCapacity = Double.NaN - private var cpuUsage = Double.NaN - private var cpuDemand = Double.NaN - private var cpuReadyPct = Double.NaN - private var memCapacity = Double.NaN - private var diskRead = Double.NaN - private var diskWrite = Double.NaN - private var poweredOn: Boolean = false - - /** - * Reset the state of the reader. - */ - private fun reset() { - id = null - timestamp = null - cluster = null - cpuCores = -1 - cpuCapacity = Double.NaN - cpuUsage = Double.NaN - cpuDemand = Double.NaN - cpuReadyPct = Double.NaN - memCapacity = Double.NaN - diskRead = Double.NaN - diskWrite = Double.NaN - poweredOn = false - } - - /** - * Default column indices for the extended Bitbrains format. - */ - private val colTimestamp = 0 - private val colCpuUsage = 1 - private val colCpuDemand = 2 - private val colDiskRead = 4 - private val colDiskWrite = 6 - private val colClusterID = 10 - private val colNcpus = 12 - private val colCpuReadyPct = 13 - private val colPoweredOn = 14 - private val colCpuCapacity = 18 - private val colID = 19 - private val colMemCapacity = 20 - private val colCpuUsagePct = 21 - private val colMax = colCpuUsagePct + 1 - - private enum class State { - Pending, - Active, - Closed, - } -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsExTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsExTraceFormat.kt deleted file mode 100644 index 6115953f..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsExTraceFormat.kt +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.bitbrains - -import org.opendc.trace.TableColumn -import org.opendc.trace.TableColumnType -import org.opendc.trace.TableReader -import org.opendc.trace.TableWriter -import org.opendc.trace.conv.TABLE_RESOURCE_STATES -import org.opendc.trace.conv.resourceClusterID -import org.opendc.trace.conv.resourceCpuCapacity -import org.opendc.trace.conv.resourceCpuCount -import org.opendc.trace.conv.resourceID -import org.opendc.trace.conv.resourceMemCapacity -import org.opendc.trace.conv.resourceStateCpuDemand -import org.opendc.trace.conv.resourceStateCpuReadyPct -import org.opendc.trace.conv.resourceStateCpuUsage -import org.opendc.trace.conv.resourceStateCpuUsagePct -import org.opendc.trace.conv.resourceStateDiskRead -import org.opendc.trace.conv.resourceStateDiskWrite -import org.opendc.trace.conv.resourceStateTimestamp -import org.opendc.trace.spi.TableDetails -import org.opendc.trace.spi.TraceFormat -import org.opendc.trace.util.CompositeTableReader -import java.nio.file.Files -import java.nio.file.Path -import java.util.stream.Collectors -import kotlin.io.path.bufferedReader -import kotlin.io.path.extension -import kotlin.io.path.nameWithoutExtension - -/** - * A format implementation for the extended Bitbrains trace format. - */ -public class BitbrainsExTraceFormat : TraceFormat { - /** - * The name of this trace format. - */ - override val name: String = "bitbrains-ex" - - override fun create(path: Path) { - throw UnsupportedOperationException("Writing not supported for this format") - } - - override fun getTables(path: Path): List = listOf(TABLE_RESOURCE_STATES) - - override fun getDetails( - path: Path, - table: String, - ): TableDetails { - return when (table) { - TABLE_RESOURCE_STATES -> - TableDetails( - listOf( - TableColumn(resourceID, TableColumnType.String), - TableColumn(resourceClusterID, TableColumnType.String), - TableColumn(resourceStateTimestamp, TableColumnType.Instant), - TableColumn(resourceCpuCount, TableColumnType.Int), - TableColumn(resourceCpuCapacity, TableColumnType.Double), - TableColumn(resourceStateCpuUsage, TableColumnType.Double), - TableColumn(resourceStateCpuUsagePct, TableColumnType.Double), - TableColumn(resourceStateCpuDemand, TableColumnType.Double), - TableColumn(resourceStateCpuReadyPct, TableColumnType.Double), - TableColumn(resourceMemCapacity, TableColumnType.Double), - TableColumn(resourceStateDiskRead, TableColumnType.Double), - TableColumn(resourceStateDiskWrite, TableColumnType.Double), - ), - ) - else -> throw IllegalArgumentException("Table $table not supported") - } - } - - override fun newReader( - path: Path, - table: String, - projection: List?, - ): TableReader { - return when (table) { - TABLE_RESOURCE_STATES -> newResourceStateReader(path) - else -> throw IllegalArgumentException("Table $table not supported") - } - } - - override fun newWriter( - path: Path, - table: String, - ): TableWriter { - throw UnsupportedOperationException("Writing not supported for this format") - } - - /** - * Construct a [TableReader] for reading over all resource state partitions. - */ - private fun newResourceStateReader(path: Path): TableReader { - val partitions = - Files.walk(path, 1) - .filter { !Files.isDirectory(it) && it.extension == "txt" } - .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) - .toSortedMap() - val it = partitions.iterator() - - return object : CompositeTableReader() { - override fun nextReader(): TableReader? { - return if (it.hasNext()) { - val (_, partPath) = it.next() - return BitbrainsExResourceStateTableReader(partPath.bufferedReader()) - } else { - null - } - } - - override fun toString(): String = "BitbrainsExCompositeTableReader" - } - } -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsResourceStateTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsResourceStateTableReader.kt deleted file mode 100644 index e264fccb..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsResourceStateTableReader.kt +++ /dev/null @@ -1,365 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.bitbrains - -import com.fasterxml.jackson.core.JsonParseException -import com.fasterxml.jackson.core.JsonToken -import com.fasterxml.jackson.dataformat.csv.CsvParser -import com.fasterxml.jackson.dataformat.csv.CsvSchema -import org.opendc.trace.TableReader -import org.opendc.trace.conv.resourceCpuCapacity -import org.opendc.trace.conv.resourceCpuCount -import org.opendc.trace.conv.resourceID -import org.opendc.trace.conv.resourceMemCapacity -import org.opendc.trace.conv.resourceStateCpuUsage -import org.opendc.trace.conv.resourceStateCpuUsagePct -import org.opendc.trace.conv.resourceStateDiskRead -import org.opendc.trace.conv.resourceStateDiskWrite -import org.opendc.trace.conv.resourceStateMemUsage -import org.opendc.trace.conv.resourceStateNetRx -import org.opendc.trace.conv.resourceStateNetTx -import org.opendc.trace.conv.resourceStateTimestamp -import java.text.NumberFormat -import java.time.Duration -import java.time.Instant -import java.time.LocalDateTime -import java.time.ZoneOffset -import java.time.format.DateTimeFormatter -import java.time.format.DateTimeParseException -import java.util.Locale -import java.util.UUID - -/** - * A [TableReader] for the Bitbrains resource state table. - */ -internal class BitbrainsResourceStateTableReader(private val partition: String, private val parser: CsvParser) : TableReader { - /** - * A flag to indicate whether a single row has been read already. - */ - private var isStarted = false - - /** - * The [DateTimeFormatter] used to parse the timestamps in case of the Materna trace. - */ - private val formatter = DateTimeFormatter.ofPattern("dd.MM.yyyy HH:mm:ss") - - /** - * The type of timestamps in the trace. - */ - private var timestampType: TimestampType = TimestampType.UNDECIDED - - /** - * The [NumberFormat] used to parse doubles containing a comma. - */ - private val nf = NumberFormat.getInstance(Locale.GERMAN) - - /** - * A flag to indicate that the trace contains decimals with a comma separator. - */ - private var usesCommaDecimalSeparator = false - - init { - parser.schema = schema - } - - override fun nextRow(): Boolean { - if (!isStarted) { - isStarted = true - } - - // Reset the row state - reset() - - if (!nextStart()) { - return false - } - - while (true) { - val token = parser.nextValue() - - if (token == null || token == JsonToken.END_OBJECT) { - break - } - - when (parser.currentName) { - "Timestamp [ms]" -> { - timestamp = - when (timestampType) { - TimestampType.UNDECIDED -> { - try { - val res = LocalDateTime.parse(parser.text, formatter).toInstant(ZoneOffset.UTC) - timestampType = TimestampType.DATE_TIME - res - } catch (e: DateTimeParseException) { - timestampType = TimestampType.EPOCH_MILLIS - Instant.ofEpochSecond(parser.longValue) - } - } - TimestampType.DATE_TIME -> LocalDateTime.parse(parser.text, formatter).toInstant(ZoneOffset.UTC) - TimestampType.EPOCH_MILLIS -> Instant.ofEpochSecond(parser.longValue) - } - } - "CPU cores" -> cpuCores = parser.intValue - "CPU capacity provisioned [MHZ]" -> cpuCapacity = parseSafeDouble() - "CPU usage [MHZ]" -> cpuUsage = parseSafeDouble() - "CPU usage [%]" -> cpuUsagePct = parseSafeDouble() / 100.0 // Convert to range [0, 1] - "Memory capacity provisioned [KB]" -> memCapacity = parseSafeDouble() - "Memory usage [KB]" -> memUsage = parseSafeDouble() - "Disk read throughput [KB/s]" -> diskRead = parseSafeDouble() - "Disk write throughput [KB/s]" -> diskWrite = parseSafeDouble() - "Network received throughput [KB/s]" -> netReceived = parseSafeDouble() - "Network transmitted throughput [KB/s]" -> netTransmitted = parseSafeDouble() - } - } - - return true - } - - private val colTimestamp = 0 - private val colCpuCount = 1 - private val colCpuCapacity = 2 - private val colCpuUsage = 3 - private val colCpuUsagePct = 4 - private val colMemCapacity = 5 - private val colMemUsage = 6 - private val colDiskRead = 7 - private val colDiskWrite = 8 - private val colNetRx = 9 - private val colNetTx = 10 - private val colID = 11 - - override fun resolve(name: String): Int { - return when (name) { - resourceID -> colID - resourceStateTimestamp -> colTimestamp - resourceCpuCount -> colCpuCount - resourceCpuCapacity -> colCpuCapacity - resourceStateCpuUsage -> colCpuUsage - resourceStateCpuUsagePct -> colCpuUsagePct - resourceMemCapacity -> colMemCapacity - resourceStateMemUsage -> colMemUsage - resourceStateDiskRead -> colDiskRead - resourceStateDiskWrite -> colDiskWrite - resourceStateNetRx -> colNetRx - resourceStateNetTx -> colNetTx - else -> -1 - } - } - - override fun isNull(index: Int): Boolean { - require(index in 0..colID) { "Invalid column index" } - return false - } - - override fun getBoolean(index: Int): Boolean { - throw IllegalArgumentException("Invalid column") - } - - override fun getInt(index: Int): Int { - checkActive() - return when (index) { - colCpuCount -> cpuCores - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getLong(index: Int): Long { - throw IllegalArgumentException("Invalid column") - } - - override fun getFloat(index: Int): Float { - throw IllegalArgumentException("Invalid column") - } - - override fun getDouble(index: Int): Double { - checkActive() - return when (index) { - colCpuCapacity -> cpuCapacity - colCpuUsage -> cpuUsage - colCpuUsagePct -> cpuUsagePct - colMemCapacity -> memCapacity - colMemUsage -> memUsage - colDiskRead -> diskRead - colDiskWrite -> diskWrite - colNetRx -> netReceived - colNetTx -> netTransmitted - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getString(index: Int): String { - checkActive() - return when (index) { - colID -> partition - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getUUID(index: Int): UUID? { - throw IllegalArgumentException("Invalid column") - } - - override fun getInstant(index: Int): Instant? { - checkActive() - return when (index) { - colTimestamp -> timestamp - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getDuration(index: Int): Duration? { - throw IllegalArgumentException("Invalid column") - } - - override fun getList( - index: Int, - elementType: Class, - ): List? { - throw IllegalArgumentException("Invalid column") - } - - override fun getSet( - index: Int, - elementType: Class, - ): Set? { - throw IllegalArgumentException("Invalid column") - } - - override fun getMap( - index: Int, - keyType: Class, - valueType: Class, - ): Map? { - throw IllegalArgumentException("Invalid column") - } - - override fun close() { - parser.close() - } - - /** - * Helper method to check if the reader is active. - */ - private fun checkActive() { - check(isStarted && !parser.isClosed) { "No active row. Did you call nextRow()?" } - } - - /** - * Advance the parser until the next object start. - */ - private fun nextStart(): Boolean { - var token = parser.nextValue() - - while (token != null && token != JsonToken.START_OBJECT) { - token = parser.nextValue() - } - - return token != null - } - - /** - * Try to parse the current value safely as double. - */ - private fun parseSafeDouble(): Double { - if (!usesCommaDecimalSeparator) { - try { - return parser.doubleValue - } catch (e: JsonParseException) { - usesCommaDecimalSeparator = true - } - } - - val text = parser.text - if (text.isBlank()) { - return 0.0 - } - - return nf.parse(text).toDouble() - } - - /** - * State fields of the reader. - */ - private var timestamp: Instant? = null - private var cpuCores = -1 - private var cpuCapacity = Double.NaN - private var cpuUsage = Double.NaN - private var cpuUsagePct = Double.NaN - private var memCapacity = Double.NaN - private var memUsage = Double.NaN - private var diskRead = Double.NaN - private var diskWrite = Double.NaN - private var netReceived = Double.NaN - private var netTransmitted = Double.NaN - - /** - * Reset the state. - */ - private fun reset() { - timestamp = null - cpuCores = -1 - cpuCapacity = Double.NaN - cpuUsage = Double.NaN - cpuUsagePct = Double.NaN - memCapacity = Double.NaN - memUsage = Double.NaN - diskRead = Double.NaN - diskWrite = Double.NaN - netReceived = Double.NaN - netTransmitted = Double.NaN - } - - /** - * The type of the timestamp in the trace. - */ - private enum class TimestampType { - UNDECIDED, - DATE_TIME, - EPOCH_MILLIS, - } - - companion object { - /** - * The [CsvSchema] that is used to parse the trace. - */ - private val schema = - CsvSchema.builder() - .addColumn("Timestamp [ms]", CsvSchema.ColumnType.NUMBER_OR_STRING) - .addColumn("CPU cores", CsvSchema.ColumnType.NUMBER) - .addColumn("CPU capacity provisioned [MHZ]", CsvSchema.ColumnType.NUMBER) - .addColumn("CPU usage [MHZ]", CsvSchema.ColumnType.NUMBER) - .addColumn("CPU usage [%]", CsvSchema.ColumnType.NUMBER) - .addColumn("Memory capacity provisioned [KB]", CsvSchema.ColumnType.NUMBER) - .addColumn("Memory usage [KB]", CsvSchema.ColumnType.NUMBER) - .addColumn("Memory usage [%]", CsvSchema.ColumnType.NUMBER) - .addColumn("Disk read throughput [KB/s]", CsvSchema.ColumnType.NUMBER) - .addColumn("Disk write throughput [KB/s]", CsvSchema.ColumnType.NUMBER) - .addColumn("Disk size [GB]", CsvSchema.ColumnType.NUMBER) - .addColumn("Network received throughput [KB/s]", CsvSchema.ColumnType.NUMBER) - .addColumn("Network transmitted throughput [KB/s]", CsvSchema.ColumnType.NUMBER) - .setAllowComments(true) - .setUseHeader(true) - .setColumnSeparator(';') - .build() - } -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsResourceTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsResourceTableReader.kt deleted file mode 100644 index a12785f0..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsResourceTableReader.kt +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.bitbrains - -import com.fasterxml.jackson.dataformat.csv.CsvFactory -import org.opendc.trace.TableReader -import org.opendc.trace.conv.resourceID -import java.nio.file.Path -import java.time.Duration -import java.time.Instant -import java.util.UUID - -/** - * A [TableReader] for the Bitbrains resource table. - */ -internal class BitbrainsResourceTableReader(private val factory: CsvFactory, vms: Map) : TableReader { - /** - * An iterator to iterate over the resource entries. - */ - private val it = vms.iterator() - - /** - * The state of the reader. - */ - private var state = State.Pending - - override fun nextRow(): Boolean { - if (state == State.Pending) { - state = State.Active - } - - reset() - - while (it.hasNext()) { - val (name, path) = it.next() - - val parser = factory.createParser(path.toFile()) - val reader = BitbrainsResourceStateTableReader(name, parser) - val idCol = reader.resolve(resourceID) - - try { - if (!reader.nextRow()) { - continue - } - - id = reader.getString(idCol) - return true - } finally { - reader.close() - } - } - - state = State.Closed - return false - } - - private val colID = 0 - - override fun resolve(name: String): Int { - return when (name) { - resourceID -> colID - else -> -1 - } - } - - override fun isNull(index: Int): Boolean { - require(index in 0..colID) { "Invalid column index" } - return false - } - - override fun getBoolean(index: Int): Boolean { - throw IllegalArgumentException("Invalid column") - } - - override fun getInt(index: Int): Int { - throw IllegalArgumentException("Invalid column") - } - - override fun getLong(index: Int): Long { - throw IllegalArgumentException("Invalid column") - } - - override fun getFloat(index: Int): Float { - throw IllegalArgumentException("Invalid column") - } - - override fun getDouble(index: Int): Double { - throw IllegalArgumentException("Invalid column") - } - - override fun getString(index: Int): String? { - check(state == State.Active) { "No active row" } - return when (index) { - colID -> id - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getUUID(index: Int): UUID? { - throw IllegalArgumentException("Invalid column") - } - - override fun getInstant(index: Int): Instant? { - throw IllegalArgumentException("Invalid column") - } - - override fun getDuration(index: Int): Duration? { - throw IllegalArgumentException("Invalid column") - } - - override fun getList( - index: Int, - elementType: Class, - ): List? { - throw IllegalArgumentException("Invalid column") - } - - override fun getSet( - index: Int, - elementType: Class, - ): Set? { - throw IllegalArgumentException("Invalid column") - } - - override fun getMap( - index: Int, - keyType: Class, - valueType: Class, - ): Map? { - throw IllegalArgumentException("Invalid column") - } - - override fun close() { - reset() - state = State.Closed - } - - /** - * State fields of the reader. - */ - private var id: String? = null - - /** - * Reset the state of the reader. - */ - private fun reset() { - id = null - } - - private enum class State { - Pending, - Active, - Closed, - } -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsTraceFormat.kt deleted file mode 100644 index 23853077..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsTraceFormat.kt +++ /dev/null @@ -1,159 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.bitbrains - -import com.fasterxml.jackson.dataformat.csv.CsvFactory -import com.fasterxml.jackson.dataformat.csv.CsvParser -import org.opendc.trace.TableColumn -import org.opendc.trace.TableColumnType -import org.opendc.trace.TableReader -import org.opendc.trace.TableWriter -import org.opendc.trace.conv.TABLE_RESOURCES -import org.opendc.trace.conv.TABLE_RESOURCE_STATES -import org.opendc.trace.conv.resourceCpuCapacity -import org.opendc.trace.conv.resourceCpuCount -import org.opendc.trace.conv.resourceID -import org.opendc.trace.conv.resourceMemCapacity -import org.opendc.trace.conv.resourceStateCpuUsage -import org.opendc.trace.conv.resourceStateCpuUsagePct -import org.opendc.trace.conv.resourceStateDiskRead -import org.opendc.trace.conv.resourceStateDiskWrite -import org.opendc.trace.conv.resourceStateMemUsage -import org.opendc.trace.conv.resourceStateNetRx -import org.opendc.trace.conv.resourceStateNetTx -import org.opendc.trace.conv.resourceStateTimestamp -import org.opendc.trace.spi.TableDetails -import org.opendc.trace.spi.TraceFormat -import org.opendc.trace.util.CompositeTableReader -import java.nio.file.Files -import java.nio.file.Path -import java.util.stream.Collectors -import kotlin.io.path.extension -import kotlin.io.path.nameWithoutExtension - -/** - * A format implementation for the GWF trace format. - */ -public class BitbrainsTraceFormat : TraceFormat { - /** - * The name of this trace format. - */ - override val name: String = "bitbrains" - - /** - * The [CsvFactory] used to create the parser. - */ - private val factory = - CsvFactory() - .enable(CsvParser.Feature.ALLOW_COMMENTS) - .enable(CsvParser.Feature.TRIM_SPACES) - - override fun create(path: Path) { - throw UnsupportedOperationException("Writing not supported for this format") - } - - override fun getTables(path: Path): List = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) - - override fun getDetails( - path: Path, - table: String, - ): TableDetails { - return when (table) { - TABLE_RESOURCES -> - TableDetails( - listOf( - TableColumn(resourceID, TableColumnType.String), - ), - ) - TABLE_RESOURCE_STATES -> - TableDetails( - listOf( - TableColumn(resourceID, TableColumnType.String), - TableColumn(resourceStateTimestamp, TableColumnType.Instant), - TableColumn(resourceCpuCount, TableColumnType.Int), - TableColumn(resourceCpuCapacity, TableColumnType.Double), - TableColumn(resourceStateCpuUsage, TableColumnType.Double), - TableColumn(resourceStateCpuUsagePct, TableColumnType.Double), - TableColumn(resourceMemCapacity, TableColumnType.Double), - TableColumn(resourceStateMemUsage, TableColumnType.Double), - TableColumn(resourceStateDiskRead, TableColumnType.Double), - TableColumn(resourceStateDiskWrite, TableColumnType.Double), - TableColumn(resourceStateNetRx, TableColumnType.Double), - TableColumn(resourceStateNetTx, TableColumnType.Double), - ), - ) - else -> throw IllegalArgumentException("Table $table not supported") - } - } - - override fun newReader( - path: Path, - table: String, - projection: List?, - ): TableReader { - return when (table) { - TABLE_RESOURCES -> { - val vms = - Files.walk(path, 1) - .filter { !Files.isDirectory(it) && it.extension == "csv" } - .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) - .toSortedMap() - BitbrainsResourceTableReader(factory, vms) - } - TABLE_RESOURCE_STATES -> newResourceStateReader(path) - else -> throw IllegalArgumentException("Table $table not supported") - } - } - - override fun newWriter( - path: Path, - table: String, - ): TableWriter { - throw UnsupportedOperationException("Writing not supported for this format") - } - - /** - * Construct a [TableReader] for reading over all resource state partitions. - */ - private fun newResourceStateReader(path: Path): TableReader { - val partitions = - Files.walk(path, 1) - .filter { !Files.isDirectory(it) && it.extension == "csv" } - .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) - .toSortedMap() - val it = partitions.iterator() - - return object : CompositeTableReader() { - override fun nextReader(): TableReader? { - return if (it.hasNext()) { - val (partition, partPath) = it.next() - return BitbrainsResourceStateTableReader(partition, factory.createParser(partPath.toFile())) - } else { - null - } - } - - override fun toString(): String = "BitbrainsCompositeTableReader" - } - } -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/CarbonTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/CarbonTableReader.kt index 226c8806..c5face9f 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/CarbonTableReader.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/CarbonTableReader.kt @@ -23,9 +23,9 @@ package org.opendc.trace.formats.carbon import org.opendc.trace.TableReader -import org.opendc.trace.conv.CARBON_INTENSITY_TIMESTAMP -import org.opendc.trace.conv.CARBON_INTENSITY_VALUE -import org.opendc.trace.formats.carbon.parquet.CarbonIntensityFragment +import org.opendc.trace.conv.CARBON_INTENSITY +import org.opendc.trace.conv.CARBON_TIMESTAMP +import org.opendc.trace.formats.carbon.parquet.CarbonFragment import org.opendc.trace.util.parquet.LocalParquetReader import java.time.Duration import java.time.Instant @@ -34,11 +34,11 @@ import java.util.UUID /** * A [TableReader] implementation for the WTF format. */ -internal class CarbonTableReader(private val reader: LocalParquetReader) : TableReader { +internal class CarbonTableReader(private val reader: LocalParquetReader) : TableReader { /** * The current record. */ - private var record: CarbonIntensityFragment? = null + private var record: CarbonFragment? = null override fun nextRow(): Boolean { try { @@ -57,8 +57,8 @@ internal class CarbonTableReader(private val reader: LocalParquetReader colTimestamp - CARBON_INTENSITY_VALUE -> colCarbonIntensity + CARBON_TIMESTAMP -> colTimestamp + CARBON_INTENSITY -> colCarbonIntensity else -> -1 } } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/CarbonTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/CarbonTraceFormat.kt index d8adc739..764bb349 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/CarbonTraceFormat.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/CarbonTraceFormat.kt @@ -26,10 +26,10 @@ import org.opendc.trace.TableColumn import org.opendc.trace.TableColumnType import org.opendc.trace.TableReader import org.opendc.trace.TableWriter -import org.opendc.trace.conv.CARBON_INTENSITY_TIMESTAMP -import org.opendc.trace.conv.CARBON_INTENSITY_VALUE -import org.opendc.trace.conv.TABLE_CARBON_INTENSITIES -import org.opendc.trace.formats.carbon.parquet.CarbonIntensityReadSupport +import org.opendc.trace.conv.CARBON_INTENSITY +import org.opendc.trace.conv.CARBON_TIMESTAMP +import org.opendc.trace.conv.TABLE_CARBON +import org.opendc.trace.formats.carbon.parquet.CarbonReadSupport import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat import org.opendc.trace.util.parquet.LocalParquetReader @@ -45,18 +45,18 @@ public class CarbonTraceFormat : TraceFormat { throw UnsupportedOperationException("Writing not supported for this format") } - override fun getTables(path: Path): List = listOf(TABLE_CARBON_INTENSITIES) + override fun getTables(path: Path): List = listOf(TABLE_CARBON) override fun getDetails( path: Path, table: String, ): TableDetails { return when (table) { - TABLE_CARBON_INTENSITIES -> + TABLE_CARBON -> TableDetails( listOf( - TableColumn(CARBON_INTENSITY_TIMESTAMP, TableColumnType.Instant), - TableColumn(CARBON_INTENSITY_VALUE, TableColumnType.Double), + TableColumn(CARBON_TIMESTAMP, TableColumnType.Instant), + TableColumn(CARBON_INTENSITY, TableColumnType.Double), ), ) else -> throw IllegalArgumentException("Table $table not supported") @@ -69,8 +69,8 @@ public class CarbonTraceFormat : TraceFormat { projection: List?, ): TableReader { return when (table) { - TABLE_CARBON_INTENSITIES -> { - val reader = LocalParquetReader(path, CarbonIntensityReadSupport(projection)) + TABLE_CARBON -> { + val reader = LocalParquetReader(path, CarbonReadSupport(projection)) CarbonTableReader(reader) } else -> throw IllegalArgumentException("Table $table not supported") diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonFragment.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonFragment.kt new file mode 100644 index 00000000..fe05876b --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonFragment.kt @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.formats.carbon.parquet + +import java.time.Instant + +/** + * A task in the Workflow Trace Format. + */ +internal data class CarbonFragment( + val timestamp: Instant, + val carbonIntensity: Double, +) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityFragment.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityFragment.kt deleted file mode 100644 index 3211cb6c..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityFragment.kt +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.formats.carbon.parquet - -import java.time.Instant - -/** - * A task in the Workflow Trace Format. - */ -internal data class CarbonIntensityFragment( - val timestamp: Instant, - val carbonIntensity: Double, -) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityReadSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityReadSupport.kt deleted file mode 100644 index 2f4eac05..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityReadSupport.kt +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.formats.carbon.parquet - -import org.apache.hadoop.conf.Configuration -import org.apache.parquet.hadoop.api.InitContext -import org.apache.parquet.hadoop.api.ReadSupport -import org.apache.parquet.io.api.RecordMaterializer -import org.apache.parquet.schema.LogicalTypeAnnotation -import org.apache.parquet.schema.MessageType -import org.apache.parquet.schema.PrimitiveType -import org.apache.parquet.schema.Types -import org.opendc.trace.conv.CARBON_INTENSITY_TIMESTAMP -import org.opendc.trace.conv.CARBON_INTENSITY_VALUE - -/** - * A [ReadSupport] instance for [Task] objects. - * - * @param projection The projection of the table to read. - */ -internal class CarbonIntensityReadSupport(private val projection: List?) : ReadSupport() { - /** - * Mapping of table columns to their Parquet column names. - */ - private val colMap = - mapOf( - CARBON_INTENSITY_TIMESTAMP to "timestamp", - CARBON_INTENSITY_VALUE to "carbon_intensity", - ) - - override fun init(context: InitContext): ReadContext { - val projectedSchema = - if (projection != null) { - Types.buildMessage() - .apply { - val fieldByName = READ_SCHEMA.fields.associateBy { it.name } - - for (col in projection) { - val fieldName = colMap[col] ?: continue - addField(fieldByName.getValue(fieldName)) - } - } - .named(READ_SCHEMA.name) - } else { - READ_SCHEMA - } - return ReadContext(projectedSchema) - } - - override fun prepareForRead( - configuration: Configuration, - keyValueMetaData: Map, - fileSchema: MessageType, - readContext: ReadContext, - ): RecordMaterializer = CarbonIntensityRecordMaterializer(readContext.requestedSchema) - - companion object { - /** - * Parquet read schema for the "tasks" table in the trace. - */ - @JvmStatic - val READ_SCHEMA: MessageType = - Types.buildMessage() - .addFields( - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("timestamp"), - Types - .optional(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("carbon_intensity"), - ) - .named("carbon_intensity_fragment") - } -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityRecordMaterializer.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityRecordMaterializer.kt deleted file mode 100644 index f5d68f22..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityRecordMaterializer.kt +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.formats.carbon.parquet - -import org.apache.parquet.io.api.Converter -import org.apache.parquet.io.api.GroupConverter -import org.apache.parquet.io.api.PrimitiveConverter -import org.apache.parquet.io.api.RecordMaterializer -import org.apache.parquet.schema.MessageType -import java.time.Instant - -/** - * A [RecordMaterializer] for [Task] records. - */ -internal class CarbonIntensityRecordMaterializer(schema: MessageType) : RecordMaterializer() { - /** - * State of current record being read. - */ - private var localTimestamp: Instant = Instant.MIN - private var localCarbonIntensity: Double = 0.0 - - /** - * Root converter for the record. - */ - private val root = - object : GroupConverter() { - /** - * The converters for the columns of the schema. - */ - private val converters = - schema.fields.map { type -> - when (type.name) { - "timestamp" -> - object : PrimitiveConverter() { - override fun addLong(value: Long) { - localTimestamp = Instant.ofEpochMilli(value) - } - } - "carbon_intensity" -> - object : PrimitiveConverter() { - override fun addDouble(value: Double) { - localCarbonIntensity = value - } - } - else -> error("Unknown column $type") - } - } - - override fun start() { - localTimestamp = Instant.MIN - localCarbonIntensity = 0.0 - } - - override fun end() {} - - override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex] - } - - override fun getCurrentRecord(): CarbonIntensityFragment = - CarbonIntensityFragment( - localTimestamp, - localCarbonIntensity, - ) - - override fun getRootConverter(): GroupConverter = root -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonReadSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonReadSupport.kt new file mode 100644 index 00000000..53087079 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonReadSupport.kt @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.formats.carbon.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.api.InitContext +import org.apache.parquet.hadoop.api.ReadSupport +import org.apache.parquet.io.api.RecordMaterializer +import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.Types +import org.opendc.trace.conv.CARBON_INTENSITY +import org.opendc.trace.conv.CARBON_TIMESTAMP + +/** + * A [ReadSupport] instance for [CarbonFragment] objects. + * + * @param projection The projection of the table to read. + */ +internal class CarbonReadSupport(private val projection: List?) : ReadSupport() { + /** + * Mapping of table columns to their Parquet column names. + */ + private val colMap = + mapOf( + CARBON_TIMESTAMP to "timestamp", + CARBON_INTENSITY to "carbon_intensity", + ) + + override fun init(context: InitContext): ReadContext { + val projectedSchema = + if (projection != null) { + Types.buildMessage() + .apply { + val fieldByName = CARBON_SCHEMA.fields.associateBy { it.name } + + for (col in projection) { + val fieldName = colMap[col] ?: continue + addField(fieldByName.getValue(fieldName)) + } + } + .named(CARBON_SCHEMA.name) + } else { + CARBON_SCHEMA + } + return ReadContext(projectedSchema) + } + + override fun prepareForRead( + configuration: Configuration, + keyValueMetaData: Map, + fileSchema: MessageType, + readContext: ReadContext, + ): RecordMaterializer = CarbonRecordMaterializer(readContext.requestedSchema) +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonRecordMaterializer.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonRecordMaterializer.kt new file mode 100644 index 00000000..aa915a39 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonRecordMaterializer.kt @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.formats.carbon.parquet + +import org.apache.parquet.io.api.Converter +import org.apache.parquet.io.api.GroupConverter +import org.apache.parquet.io.api.PrimitiveConverter +import org.apache.parquet.io.api.RecordMaterializer +import org.apache.parquet.schema.MessageType +import java.time.Instant + +/** + * A [RecordMaterializer] for [CarbonFragment] records. + */ +internal class CarbonRecordMaterializer(schema: MessageType) : RecordMaterializer() { + /** + * State of current record being read. + */ + private var localTimestamp: Instant = Instant.MIN + private var localCarbonIntensity: Double = 0.0 + + /** + * Root converter for the record. + */ + private val root = + object : GroupConverter() { + /** + * The converters for the columns of the schema. + */ + private val converters = + schema.fields.map { type -> + when (type.name) { + "timestamp" -> + object : PrimitiveConverter() { + override fun addLong(value: Long) { + localTimestamp = Instant.ofEpochMilli(value) + } + } + "carbon_intensity" -> + object : PrimitiveConverter() { + override fun addDouble(value: Double) { + localCarbonIntensity = value + } + } + else -> error("Unknown column $type") + } + } + + override fun start() { + localTimestamp = Instant.MIN + localCarbonIntensity = 0.0 + } + + override fun end() {} + + override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex] + } + + override fun getCurrentRecord(): CarbonFragment = + CarbonFragment( + localTimestamp, + localCarbonIntensity, + ) + + override fun getRootConverter(): GroupConverter = root +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonSchemas.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonSchemas.kt new file mode 100644 index 00000000..c8b11968 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonSchemas.kt @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2025 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.formats.carbon.parquet + +import org.apache.parquet.schema.LogicalTypeAnnotation +import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.PrimitiveType +import org.apache.parquet.schema.Types + +private val CARBON_SCHEMA_v1: MessageType = + Types.buildMessage() + .addFields( + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("timestamp"), + Types + .optional(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("carbon_intensity"), + ) + .named("carbon_intensity_fragment") + +public val CARBON_SCHEMA: MessageType = CARBON_SCHEMA_v1 diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/parquet/FailureReadSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/parquet/FailureReadSupport.kt index d49f86c6..9bd8fd72 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/parquet/FailureReadSupport.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/parquet/FailureReadSupport.kt @@ -27,14 +27,13 @@ import org.apache.parquet.hadoop.api.InitContext import org.apache.parquet.hadoop.api.ReadSupport import org.apache.parquet.io.api.RecordMaterializer import org.apache.parquet.schema.MessageType -import org.apache.parquet.schema.PrimitiveType import org.apache.parquet.schema.Types import org.opendc.trace.conv.FAILURE_DURATION import org.opendc.trace.conv.FAILURE_INTENSITY import org.opendc.trace.conv.FAILURE_INTERVAL /** - * A [ReadSupport] instance for [Task] objects. + * A [ReadSupport] instance for [FailureFragment] objects. * * @param projection The projection of the table to read. */ @@ -54,16 +53,16 @@ internal class FailureReadSupport(private val projection: List?) : ReadS if (projection != null) { Types.buildMessage() .apply { - val fieldByName = READ_SCHEMA.fields.associateBy { it.name } + val fieldByName = FAILURE_SCHEMA.fields.associateBy { it.name } for (col in projection) { val fieldName = colMap[col] ?: continue addField(fieldByName.getValue(fieldName)) } } - .named(READ_SCHEMA.name) + .named(FAILURE_SCHEMA.name) } else { - READ_SCHEMA + FAILURE_SCHEMA } return ReadContext(projectedSchema) } @@ -74,25 +73,4 @@ internal class FailureReadSupport(private val projection: List?) : ReadS fileSchema: MessageType, readContext: ReadContext, ): RecordMaterializer = FailureRecordMaterializer(readContext.requestedSchema) - - companion object { - /** - * Parquet read schema for the "tasks" table in the trace. - */ - @JvmStatic - val READ_SCHEMA: MessageType = - Types.buildMessage() - .addFields( - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .named("failure_interval"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .named("failure_duration"), - Types - .optional(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("failure_intensity"), - ) - .named("failure_fragment") - } } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/parquet/FailureRecordMaterializer.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/parquet/FailureRecordMaterializer.kt index 5a00f8c9..83281984 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/parquet/FailureRecordMaterializer.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/parquet/FailureRecordMaterializer.kt @@ -29,7 +29,7 @@ import org.apache.parquet.io.api.RecordMaterializer import org.apache.parquet.schema.MessageType /** - * A [RecordMaterializer] for [Task] records. + * A [RecordMaterializer] for [FailureFragment] records. */ internal class FailureRecordMaterializer(schema: MessageType) : RecordMaterializer() { /** diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/parquet/FailureSchemas.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/parquet/FailureSchemas.kt new file mode 100644 index 00000000..bafac387 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/parquet/FailureSchemas.kt @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2025 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.formats.failure.parquet + +import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.PrimitiveType +import org.apache.parquet.schema.Types + +private val FAILURE_SCHEMA_v1: MessageType = + Types.buildMessage() + .addFields( + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .named("failure_interval"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .named("failure_duration"), + Types + .optional(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("failure_intensity"), + ) + .named("failure_fragment") + +public val FAILURE_SCHEMA: MessageType = FAILURE_SCHEMA_v1 diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/gwf/GwfTaskTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/gwf/GwfTaskTableReader.kt deleted file mode 100644 index 8a2a99cb..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/gwf/GwfTaskTableReader.kt +++ /dev/null @@ -1,286 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.gwf - -import com.fasterxml.jackson.core.JsonToken -import com.fasterxml.jackson.dataformat.csv.CsvParser -import com.fasterxml.jackson.dataformat.csv.CsvSchema -import org.opendc.trace.TableColumnType -import org.opendc.trace.TableReader -import org.opendc.trace.conv.TASK_ALLOC_NCPUS -import org.opendc.trace.conv.TASK_ID -import org.opendc.trace.conv.TASK_PARENTS -import org.opendc.trace.conv.TASK_REQ_NCPUS -import org.opendc.trace.conv.TASK_RUNTIME -import org.opendc.trace.conv.TASK_SUBMIT_TIME -import org.opendc.trace.conv.TASK_WORKFLOW_ID -import org.opendc.trace.util.convertTo -import java.time.Duration -import java.time.Instant -import java.util.UUID -import java.util.regex.Pattern - -/** - * A [TableReader] implementation for the GWF format. - */ -internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { - /** - * A flag to indicate whether a single row has been read already. - */ - private var isStarted = false - - init { - parser.schema = schema - } - - override fun nextRow(): Boolean { - if (!isStarted) { - isStarted = true - } - - // Reset the row state - reset() - - if (parser.isClosed || !nextStart()) { - return false - } - - while (true) { - val token = parser.nextValue() - - if (token == null || token == JsonToken.END_OBJECT) { - break - } - - when (parser.currentName) { - "WorkflowID" -> workflowId = parser.text - "JobID" -> jobId = parser.text - "SubmitTime" -> submitTime = Instant.ofEpochSecond(parser.longValue) - "RunTime" -> runtime = Duration.ofSeconds(parser.longValue) - "NProcs" -> nProcs = parser.intValue - "ReqNProcs" -> reqNProcs = parser.intValue - "Dependencies" -> dependencies = parseParents(parser.valueAsString) - } - } - - return true - } - - override fun resolve(name: String): Int { - return when (name) { - TASK_ID -> colJobID - TASK_WORKFLOW_ID -> colWorkflowID - TASK_SUBMIT_TIME -> colSubmitTime - TASK_RUNTIME -> colRuntime - TASK_ALLOC_NCPUS -> colNproc - TASK_REQ_NCPUS -> colReqNproc - TASK_PARENTS -> colDeps - else -> -1 - } - } - - override fun isNull(index: Int): Boolean { - require(index in 0..colDeps) { "Invalid column" } - return false - } - - override fun getBoolean(index: Int): Boolean { - throw IllegalArgumentException("Invalid column") - } - - override fun getInt(index: Int): Int { - checkActive() - return when (index) { - colReqNproc -> reqNProcs - colNproc -> nProcs - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getLong(index: Int): Long { - throw IllegalArgumentException("Invalid column") - } - - override fun getFloat(index: Int): Float { - throw IllegalArgumentException("Invalid column") - } - - override fun getDouble(index: Int): Double { - throw IllegalArgumentException("Invalid column") - } - - override fun getString(index: Int): String? { - checkActive() - return when (index) { - colJobID -> jobId - colWorkflowID -> workflowId - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getUUID(index: Int): UUID? { - throw IllegalArgumentException("Invalid column") - } - - override fun getInstant(index: Int): Instant? { - checkActive() - return when (index) { - colSubmitTime -> submitTime - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getDuration(index: Int): Duration? { - checkActive() - return when (index) { - colRuntime -> runtime - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getList( - index: Int, - elementType: Class, - ): List? { - throw IllegalArgumentException("Invalid column") - } - - override fun getMap( - index: Int, - keyType: Class, - valueType: Class, - ): Map? { - throw IllegalArgumentException("Invalid column") - } - - override fun getSet( - index: Int, - elementType: Class, - ): Set? { - checkActive() - return when (index) { - colDeps -> typeDeps.convertTo(dependencies, elementType) - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun close() { - parser.close() - } - - /** - * Helper method to check if the reader is active. - */ - private fun checkActive() { - check(isStarted && !parser.isClosed) { "No active row. Did you call nextRow()?" } - } - - /** - * The pattern used to parse the parents. - */ - private val pattern = Pattern.compile("\\s+") - - /** - * Parse the parents into a set of longs. - */ - private fun parseParents(value: String): Set { - val result = mutableSetOf() - val deps = value.split(pattern) - - for (dep in deps) { - if (dep.isBlank()) { - continue - } - - result.add(dep) - } - - return result - } - - /** - * Advance the parser until the next object start. - */ - private fun nextStart(): Boolean { - var token = parser.nextValue() - - while (token != null && token != JsonToken.START_OBJECT) { - token = parser.nextValue() - } - - return token != null - } - - /** - * Reader state fields. - */ - private var workflowId: String? = null - private var jobId: String? = null - private var submitTime: Instant? = null - private var runtime: Duration? = null - private var nProcs = -1 - private var reqNProcs = -1 - private var dependencies = emptySet() - - /** - * Reset the state. - */ - private fun reset() { - workflowId = null - jobId = null - submitTime = null - runtime = null - nProcs = -1 - reqNProcs = -1 - dependencies = emptySet() - } - - private val colWorkflowID = 0 - private val colJobID = 1 - private val colSubmitTime = 2 - private val colRuntime = 3 - private val colNproc = 4 - private val colReqNproc = 5 - private val colDeps = 6 - - private val typeDeps = TableColumnType.Set(TableColumnType.String) - - companion object { - /** - * The [CsvSchema] that is used to parse the trace. - */ - private val schema = - CsvSchema.builder() - .addColumn("WorkflowID", CsvSchema.ColumnType.NUMBER) - .addColumn("JobID", CsvSchema.ColumnType.NUMBER) - .addColumn("SubmitTime", CsvSchema.ColumnType.NUMBER) - .addColumn("RunTime", CsvSchema.ColumnType.NUMBER) - .addColumn("NProcs", CsvSchema.ColumnType.NUMBER) - .addColumn("ReqNProcs", CsvSchema.ColumnType.NUMBER) - .addColumn("Dependencies", CsvSchema.ColumnType.STRING) - .setAllowComments(true) - .setUseHeader(true) - .setColumnSeparator(',') - .build() - } -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/gwf/GwfTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/gwf/GwfTraceFormat.kt deleted file mode 100644 index 097c5593..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/gwf/GwfTraceFormat.kt +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.gwf - -import com.fasterxml.jackson.dataformat.csv.CsvFactory -import com.fasterxml.jackson.dataformat.csv.CsvParser -import org.opendc.trace.TableColumn -import org.opendc.trace.TableColumnType -import org.opendc.trace.TableReader -import org.opendc.trace.TableWriter -import org.opendc.trace.conv.TABLE_TASKS -import org.opendc.trace.conv.TASK_ALLOC_NCPUS -import org.opendc.trace.conv.TASK_ID -import org.opendc.trace.conv.TASK_PARENTS -import org.opendc.trace.conv.TASK_REQ_NCPUS -import org.opendc.trace.conv.TASK_RUNTIME -import org.opendc.trace.conv.TASK_SUBMIT_TIME -import org.opendc.trace.conv.TASK_WORKFLOW_ID -import org.opendc.trace.spi.TableDetails -import org.opendc.trace.spi.TraceFormat -import java.nio.file.Path - -/** - * A [TraceFormat] implementation for the GWF trace format. - */ -public class GwfTraceFormat : TraceFormat { - /** - * The name of this trace format. - */ - override val name: String = "gwf" - - /** - * The [CsvFactory] used to create the parser. - */ - private val factory = - CsvFactory() - .enable(CsvParser.Feature.ALLOW_COMMENTS) - .enable(CsvParser.Feature.TRIM_SPACES) - - override fun create(path: Path) { - throw UnsupportedOperationException("Writing not supported for this format") - } - - override fun getTables(path: Path): List = listOf(TABLE_TASKS) - - override fun getDetails( - path: Path, - table: String, - ): TableDetails { - return when (table) { - TABLE_TASKS -> - TableDetails( - listOf( - TableColumn(TASK_WORKFLOW_ID, TableColumnType.String), - TableColumn(TASK_ID, TableColumnType.String), - TableColumn(TASK_SUBMIT_TIME, TableColumnType.Instant), - TableColumn(TASK_RUNTIME, TableColumnType.Duration), - TableColumn(TASK_REQ_NCPUS, TableColumnType.Int), - TableColumn(TASK_ALLOC_NCPUS, TableColumnType.Int), - TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)), - ), - ) - else -> throw IllegalArgumentException("Table $table not supported") - } - } - - override fun newReader( - path: Path, - table: String, - projection: List?, - ): TableReader { - return when (table) { - TABLE_TASKS -> GwfTaskTableReader(factory.createParser(path.toFile())) - else -> throw IllegalArgumentException("Table $table not supported") - } - } - - override fun newWriter( - path: Path, - table: String, - ): TableWriter { - throw UnsupportedOperationException("Writing not supported for this format") - } -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmInterferenceJsonTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmInterferenceJsonTableReader.kt deleted file mode 100644 index dba971d7..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmInterferenceJsonTableReader.kt +++ /dev/null @@ -1,225 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.formats.opendc - -import com.fasterxml.jackson.core.JsonParseException -import com.fasterxml.jackson.core.JsonParser -import com.fasterxml.jackson.core.JsonToken -import org.opendc.trace.TableColumnType -import org.opendc.trace.TableReader -import org.opendc.trace.conv.INTERFERENCE_GROUP_MEMBERS -import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE -import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET -import org.opendc.trace.util.convertTo -import java.time.Duration -import java.time.Instant -import java.util.UUID - -/** - * A [TableReader] implementation for the OpenDC VM interference JSON format. - */ -internal class OdcVmInterferenceJsonTableReader(private val parser: JsonParser) : TableReader { - /** - * A flag to indicate whether a single row has been read already. - */ - private var isStarted = false - - override fun nextRow(): Boolean { - if (!isStarted) { - isStarted = true - - parser.nextToken() - - if (!parser.isExpectedStartArrayToken) { - throw JsonParseException(parser, "Expected array at start, but got ${parser.currentToken()}") - } - } - - return if (parser.isClosed || parser.nextToken() == JsonToken.END_ARRAY) { - parser.close() - reset() - false - } else { - parseGroup(parser) - true - } - } - - private val colMembers = 0 - private val colTarget = 1 - private val colScore = 2 - - private val typeMembers = TableColumnType.Set(TableColumnType.String) - - override fun resolve(name: String): Int { - return when (name) { - INTERFERENCE_GROUP_MEMBERS -> colMembers - INTERFERENCE_GROUP_TARGET -> colTarget - INTERFERENCE_GROUP_SCORE -> colScore - else -> -1 - } - } - - override fun isNull(index: Int): Boolean { - return when (index) { - colMembers, colTarget, colScore -> false - else -> throw IllegalArgumentException("Invalid column index $index") - } - } - - override fun getBoolean(index: Int): Boolean { - throw IllegalArgumentException("Invalid column $index") - } - - override fun getInt(index: Int): Int { - throw IllegalArgumentException("Invalid column $index") - } - - override fun getLong(index: Int): Long { - throw IllegalArgumentException("Invalid column $index") - } - - override fun getFloat(index: Int): Float { - throw IllegalArgumentException("Invalid column $index") - } - - override fun getDouble(index: Int): Double { - checkActive() - return when (index) { - colTarget -> targetLoad - colScore -> score - else -> throw IllegalArgumentException("Invalid column $index") - } - } - - override fun getString(index: Int): String? { - throw IllegalArgumentException("Invalid column $index") - } - - override fun getUUID(index: Int): UUID? { - throw IllegalArgumentException("Invalid column $index") - } - - override fun getInstant(index: Int): Instant? { - throw IllegalArgumentException("Invalid column $index") - } - - override fun getDuration(index: Int): Duration? { - throw IllegalArgumentException("Invalid column $index") - } - - override fun getList( - index: Int, - elementType: Class, - ): List? { - throw IllegalArgumentException("Invalid column $index") - } - - override fun getSet( - index: Int, - elementType: Class, - ): Set? { - checkActive() - return when (index) { - colMembers -> typeMembers.convertTo(members, elementType) - else -> throw IllegalArgumentException("Invalid column $index") - } - } - - override fun getMap( - index: Int, - keyType: Class, - valueType: Class, - ): Map? { - throw IllegalArgumentException("Invalid column $index") - } - - override fun close() { - parser.close() - } - - private var members = emptySet() - private var targetLoad = Double.POSITIVE_INFINITY - private var score = 1.0 - - /** - * Helper method to check if the reader is active. - */ - private fun checkActive() { - check(isStarted && !parser.isClosed) { "No active row. Did you call nextRow()?" } - } - - /** - * Reset the state. - */ - private fun reset() { - members = emptySet() - targetLoad = Double.POSITIVE_INFINITY - score = 1.0 - } - - /** - * Parse a group an interference JSON file. - */ - private fun parseGroup(parser: JsonParser) { - var targetLoad = Double.POSITIVE_INFINITY - var score = 1.0 - val members = mutableSetOf() - - if (!parser.isExpectedStartObjectToken) { - throw JsonParseException(parser, "Expected object, but got ${parser.currentToken()}") - } - - while (parser.nextValue() != JsonToken.END_OBJECT) { - when (parser.currentName) { - "vms" -> parseGroupMembers(parser, members) - "minServerLoad" -> targetLoad = parser.doubleValue - "performanceScore" -> score = parser.doubleValue - } - } - - this.members = members - this.targetLoad = targetLoad - this.score = score - } - - /** - * Parse the members of a group. - */ - private fun parseGroupMembers( - parser: JsonParser, - members: MutableSet, - ) { - if (!parser.isExpectedStartArrayToken) { - throw JsonParseException(parser, "Expected array for group members, but got ${parser.currentToken()}") - } - - while (parser.nextValue() != JsonToken.END_ARRAY) { - if (parser.currentToken() != JsonToken.VALUE_STRING) { - throw JsonParseException(parser, "Expected string value for group member") - } - - members.add(parser.text) - } - } -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmInterferenceJsonTableWriter.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmInterferenceJsonTableWriter.kt deleted file mode 100644 index b3286a1c..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmInterferenceJsonTableWriter.kt +++ /dev/null @@ -1,192 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.formats.opendc - -import com.fasterxml.jackson.core.JsonGenerator -import org.opendc.trace.TableWriter -import org.opendc.trace.conv.INTERFERENCE_GROUP_MEMBERS -import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE -import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET -import java.time.Duration -import java.time.Instant -import java.util.UUID - -/** - * A [TableWriter] implementation for the OpenDC VM interference JSON format. - */ -internal class OdcVmInterferenceJsonTableWriter(private val generator: JsonGenerator) : TableWriter { - /** - * A flag to indicate whether a row has been started. - */ - private var isRowActive = false - - init { - generator.writeStartArray() - } - - override fun startRow() { - // Reset state - members = emptySet() - targetLoad = Double.POSITIVE_INFINITY - score = 1.0 - - // Mark row as active - isRowActive = true - } - - override fun endRow() { - check(isRowActive) { "No active row" } - - generator.writeStartObject() - generator.writeArrayFieldStart("vms") - for (member in members) { - generator.writeString(member) - } - generator.writeEndArray() - generator.writeNumberField("minServerLoad", targetLoad) - generator.writeNumberField("performanceScore", score) - generator.writeEndObject() - } - - override fun resolve(name: String): Int { - return when (name) { - INTERFERENCE_GROUP_MEMBERS -> colMembers - INTERFERENCE_GROUP_TARGET -> colTarget - INTERFERENCE_GROUP_SCORE -> colScore - else -> -1 - } - } - - override fun setBoolean( - index: Int, - value: Boolean, - ) { - throw IllegalArgumentException("Invalid column $index") - } - - override fun setInt( - index: Int, - value: Int, - ) { - throw IllegalArgumentException("Invalid column $index") - } - - override fun setLong( - index: Int, - value: Long, - ) { - throw IllegalArgumentException("Invalid column $index") - } - - override fun setFloat( - index: Int, - value: Float, - ) { - throw IllegalArgumentException("Invalid column $index") - } - - override fun setDouble( - index: Int, - value: Double, - ) { - check(isRowActive) { "No active row" } - - when (index) { - colTarget -> targetLoad = (value as Number).toDouble() - colScore -> score = (value as Number).toDouble() - else -> throw IllegalArgumentException("Invalid column $index") - } - } - - override fun setString( - index: Int, - value: String, - ) { - throw IllegalArgumentException("Invalid column $index") - } - - override fun setUUID( - index: Int, - value: UUID, - ) { - throw IllegalArgumentException("Invalid column $index") - } - - override fun setInstant( - index: Int, - value: Instant, - ) { - throw IllegalArgumentException("Invalid column $index") - } - - override fun setDuration( - index: Int, - value: Duration, - ) { - throw IllegalArgumentException("Invalid column $index") - } - - override fun setList( - index: Int, - value: List, - ) { - throw IllegalArgumentException("Invalid column $index") - } - - override fun setSet( - index: Int, - value: Set, - ) { - check(isRowActive) { "No active row" } - - @Suppress("UNCHECKED_CAST") - when (index) { - colMembers -> members = value as Set - else -> throw IllegalArgumentException("Invalid column index $index") - } - } - - override fun setMap( - index: Int, - value: Map, - ) { - throw IllegalArgumentException("Invalid column $index") - } - - override fun flush() { - generator.flush() - } - - override fun close() { - generator.writeEndArray() - generator.close() - } - - private val colMembers = 0 - private val colTarget = 1 - private val colScore = 2 - - private var members = emptySet() - private var targetLoad = Double.POSITIVE_INFINITY - private var score = 1.0 -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceStateTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceStateTableReader.kt deleted file mode 100644 index d474e0ec..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceStateTableReader.kt +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.formats.opendc - -import org.opendc.trace.TableReader -import org.opendc.trace.conv.resourceCpuCount -import org.opendc.trace.conv.resourceGpuCount -import org.opendc.trace.conv.resourceID -import org.opendc.trace.conv.resourceStateCpuUsage -import org.opendc.trace.conv.resourceStateDuration -import org.opendc.trace.conv.resourceStateGpuUsage -import org.opendc.trace.conv.resourceStateTimestamp -import org.opendc.trace.formats.opendc.parquet.ResourceState -import org.opendc.trace.util.parquet.LocalParquetReader -import java.time.Duration -import java.time.Instant -import java.util.UUID - -/** - * A [TableReader] implementation for the OpenDC virtual machine trace format. - */ -internal class OdcVmResourceStateTableReader(private val reader: LocalParquetReader) : TableReader { - /** - * The current record. - */ - private var record: ResourceState? = null - - override fun nextRow(): Boolean { - try { - val record = reader.read() - this.record = record - - return record != null - } catch (e: Throwable) { - this.record = null - throw e - } - } - - private val colID = 0 - private val colTimestamp = 1 - private val colDuration = 2 - private val colCpuCount = 3 - private val colCpuUsage = 4 - private val colGpuCount = 5 - private val colGpuUsage = 6 - private val colMemoryCapacity = 7 - - override fun resolve(name: String): Int { - return when (name) { - resourceID -> colID - resourceStateTimestamp -> colTimestamp - resourceStateDuration -> colDuration - resourceCpuCount -> colCpuCount - resourceStateCpuUsage -> colCpuUsage - resourceGpuCount -> colGpuCount - resourceStateGpuUsage -> colGpuUsage - else -> -1 - } - } - - override fun isNull(index: Int): Boolean { - require(index in 0..colCpuUsage) { "Invalid column index" } - return false - } - - override fun getBoolean(index: Int): Boolean { - throw IllegalArgumentException("Invalid column or type [index $index]") - } - - override fun getInt(index: Int): Int { - val record = checkNotNull(record) { "Reader in invalid state" } - return when (index) { - colCpuCount -> record.cpuCount - colGpuCount -> record.gpuCount - else -> throw IllegalArgumentException("Invalid column or type [index $index]") - } - } - - override fun getLong(index: Int): Long { - throw IllegalArgumentException("Invalid column or type [index $index]") - } - - override fun getFloat(index: Int): Float { - throw IllegalArgumentException("Invalid column or type [index $index]") - } - - override fun getDouble(index: Int): Double { - val record = checkNotNull(record) { "Reader in invalid state" } - return when (index) { - colCpuUsage -> record.cpuUsage - colGpuUsage -> record.gpuUsage - else -> throw IllegalArgumentException("Invalid column or type [index $index]") - } - } - - override fun getString(index: Int): String { - val record = checkNotNull(record) { "Reader in invalid state" } - - return when (index) { - colID -> record.id - else -> throw IllegalArgumentException("Invalid column index $index") - } - } - - override fun getUUID(index: Int): UUID? { - throw IllegalArgumentException("Invalid column or type [index $index]") - } - - override fun getInstant(index: Int): Instant { - val record = checkNotNull(record) { "Reader in invalid state" } - - return when (index) { - colTimestamp -> record.timestamp - else -> throw IllegalArgumentException("Invalid column index $index") - } - } - - override fun getDuration(index: Int): Duration { - val record = checkNotNull(record) { "Reader in invalid state" } - - return when (index) { - colDuration -> record.duration - else -> throw IllegalArgumentException("Invalid column index $index") - } - } - - override fun getList( - index: Int, - elementType: Class, - ): List? { - throw IllegalArgumentException("Invalid column or type [index $index]") - } - - override fun getSet( - index: Int, - elementType: Class, - ): Set? { - throw IllegalArgumentException("Invalid column or type [index $index]") - } - - override fun getMap( - index: Int, - keyType: Class, - valueType: Class, - ): Map? { - throw IllegalArgumentException("Invalid column or type [index $index]") - } - - override fun close() { - reader.close() - } - - override fun toString(): String = "OdcVmResourceStateTableReader" -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceStateTableWriter.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceStateTableWriter.kt deleted file mode 100644 index c6f117d2..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceStateTableWriter.kt +++ /dev/null @@ -1,221 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.formats.opendc - -import org.apache.parquet.hadoop.ParquetWriter -import org.opendc.trace.TableWriter -import org.opendc.trace.conv.resourceCpuCount -import org.opendc.trace.conv.resourceGpuCount -import org.opendc.trace.conv.resourceID -import org.opendc.trace.conv.resourceStateCpuUsage -import org.opendc.trace.conv.resourceStateDuration -import org.opendc.trace.conv.resourceStateGpuUsage -import org.opendc.trace.conv.resourceStateTimestamp -import org.opendc.trace.formats.opendc.parquet.ResourceState -import java.time.Duration -import java.time.Instant -import java.util.UUID - -/** - * A [TableWriter] implementation for the OpenDC virtual machine trace format. - */ -internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter) : TableWriter { - /** - * The current state for the record that is being written. - */ - private var localIsActive = false - private var localID: String = "" - private var localTimestamp: Instant = Instant.MIN - private var localDuration: Duration = Duration.ZERO - private var localCpuCount: Int = 0 - private var localCpuUsage: Double = Double.NaN - private var localGpuCount: Int = 0 - private var localGpuUsage: Double = Double.NaN - - override fun startRow() { - localIsActive = true - localID = "" - localTimestamp = Instant.MIN - localDuration = Duration.ZERO - localCpuCount = 0 - localCpuUsage = Double.NaN - localGpuCount = 0 - localGpuUsage = Double.NaN - } - - override fun endRow() { - check(localIsActive) { "No active row" } - localIsActive = false - - check(lastId != localID || localTimestamp >= lastTimestamp) { "Records need to be ordered by (id, timestamp)" } - - writer.write(ResourceState(localID, localTimestamp, localDuration, localCpuCount, localCpuUsage, localGpuCount, localGpuUsage)) - - lastId = localID - lastTimestamp = localTimestamp - } - - override fun resolve(name: String): Int { - return when (name) { - resourceID -> colID - resourceStateTimestamp -> colTimestamp - resourceStateDuration -> colDuration - resourceCpuCount -> colCpuCount - resourceStateCpuUsage -> colCpuUsage - resourceGpuCount -> colGpuCount - resourceStateGpuUsage -> colGpuUsage - else -> -1 - } - } - - override fun setBoolean( - index: Int, - value: Boolean, - ) { - throw IllegalArgumentException("Invalid column or type [index $index]") - } - - override fun setInt( - index: Int, - value: Int, - ) { - check(localIsActive) { "No active row" } - when (index) { - colCpuCount -> localCpuCount = value - colGpuCount -> localGpuCount = value - else -> throw IllegalArgumentException("Invalid column or type [index $index]") - } - } - - override fun setLong( - index: Int, - value: Long, - ) { - throw IllegalArgumentException("Invalid column or type [index $index]") - } - - override fun setFloat( - index: Int, - value: Float, - ) { - throw IllegalArgumentException("Invalid column or type [index $index]") - } - - override fun setDouble( - index: Int, - value: Double, - ) { - check(localIsActive) { "No active row" } - when (index) { - colCpuUsage -> localCpuUsage = value - colGpuUsage -> localGpuUsage = value - else -> throw IllegalArgumentException("Invalid column or type [index $index]") - } - } - - override fun setString( - index: Int, - value: String, - ) { - check(localIsActive) { "No active row" } - - when (index) { - colID -> localID = value - else -> throw IllegalArgumentException("Invalid column or type [index $index]") - } - } - - override fun setUUID( - index: Int, - value: UUID, - ) { - throw IllegalArgumentException("Invalid column or type [index $index]") - } - - override fun setInstant( - index: Int, - value: Instant, - ) { - check(localIsActive) { "No active row" } - - when (index) { - colTimestamp -> localTimestamp = value - else -> throw IllegalArgumentException("Invalid column or type [index $index]") - } - } - - override fun setDuration( - index: Int, - value: Duration, - ) { - check(localIsActive) { "No active row" } - - when (index) { - colDuration -> localDuration = value - else -> throw IllegalArgumentException("Invalid column or type [index $index]") - } - } - - override fun setList( - index: Int, - value: List, - ) { - throw IllegalArgumentException("Invalid column or type [index $index]") - } - - override fun setSet( - index: Int, - value: Set, - ) { - throw IllegalArgumentException("Invalid column or type [index $index]") - } - - override fun setMap( - index: Int, - value: Map, - ) { - throw IllegalArgumentException("Invalid column or type [index $index]") - } - - override fun flush() { - // Not available - } - - override fun close() { - writer.close() - } - - /** - * Last column values that are used to check for correct partitioning. - */ - private var lastId: String? = null - private var lastTimestamp: Instant = Instant.MAX - - private val colID = 0 - private val colTimestamp = 1 - private val colDuration = 2 - private val colCpuCount = 3 - private val colCpuUsage = 4 - private val colGpuCount = 5 - private val colGpuUsage = 6 -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableReader.kt deleted file mode 100644 index 495a5d75..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableReader.kt +++ /dev/null @@ -1,209 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.formats.opendc - -import org.opendc.trace.TableColumnType -import org.opendc.trace.TableReader -import org.opendc.trace.conv.resourceChildren -import org.opendc.trace.conv.resourceCpuCapacity -import org.opendc.trace.conv.resourceCpuCount -import org.opendc.trace.conv.resourceDeadline -import org.opendc.trace.conv.resourceDuration -import org.opendc.trace.conv.resourceGpuCapacity -import org.opendc.trace.conv.resourceGpuCount -import org.opendc.trace.conv.resourceID -import org.opendc.trace.conv.resourceMemCapacity -import org.opendc.trace.conv.resourceNature -import org.opendc.trace.conv.resourceParents -import org.opendc.trace.conv.resourceSubmissionTime -import org.opendc.trace.formats.opendc.parquet.Resource -import org.opendc.trace.util.convertTo -import org.opendc.trace.util.parquet.LocalParquetReader -import java.time.Duration -import java.time.Instant -import java.util.UUID - -/** - * A [TableReader] implementation for the "resources table" in the OpenDC virtual machine trace format. - */ -internal class OdcVmResourceTableReader(private val reader: LocalParquetReader) : TableReader { - /** - * The current record. - */ - private var record: Resource? = null - - override fun nextRow(): Boolean { - try { - val record = reader.read() - this.record = record - - return record != null - } catch (e: Throwable) { - this.record = null - throw e - } - } - - private val colID = 0 - private val colSubmissionTime = 1 - private val colDurationTime = 2 - private val colCpuCount = 3 - private val colCpuCapacity = 4 - private val colMemCapacity = 5 - private val colGpuCapacity = 6 - private val colGpuCount = 7 - private val colParents = 8 - private val colChildren = 9 - private val colNature = 10 - private val colDeadline = 11 - - private val typeParents = TableColumnType.Set(TableColumnType.String) - private val typeChildren = TableColumnType.Set(TableColumnType.String) - - override fun resolve(name: String): Int { - return when (name) { - resourceID -> colID - resourceSubmissionTime -> colSubmissionTime - resourceDuration -> colDurationTime - resourceCpuCount -> colCpuCount - resourceCpuCapacity -> colCpuCapacity - resourceMemCapacity -> colMemCapacity - resourceGpuCount -> colGpuCount - resourceGpuCapacity -> colGpuCapacity - resourceParents -> colParents - resourceChildren -> colChildren - resourceNature -> colNature - resourceDeadline -> colDeadline - else -> -1 - } - } - - override fun isNull(index: Int): Boolean { - require(index in 0..colDeadline) { "Invalid column index" } - val record = checkNotNull(record) { "Reader in invalid state" } - - return when (index) { - colNature -> record.nature == null - colDeadline -> record.deadline == -1L - else -> false - } - } - - override fun getBoolean(index: Int): Boolean { - throw IllegalArgumentException("Invalid column") - } - - override fun getInt(index: Int): Int { - val record = checkNotNull(record) { "Reader in invalid state" } - - return when (index) { - colCpuCount -> record.cpuCount - colGpuCount -> record.gpuCount - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getLong(index: Int): Long { - val record = checkNotNull(record) { "Reader in invalid state" } - return when (index) { - colDurationTime -> record.durationTime - colDeadline -> record.deadline - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getFloat(index: Int): Float { - throw IllegalArgumentException("Invalid column") - } - - override fun getDouble(index: Int): Double { - val record = checkNotNull(record) { "Reader in invalid state" } - - return when (index) { - colCpuCapacity -> record.cpuCapacity - colMemCapacity -> record.memCapacity - colGpuCapacity -> record.gpuCapacity - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getString(index: Int): String? { - val record = checkNotNull(record) { "Reader in invalid state" } - - return when (index) { - colID -> record.id - colNature -> record.nature - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getUUID(index: Int): UUID? { - throw IllegalArgumentException("Invalid column") - } - - override fun getInstant(index: Int): Instant { - val record = checkNotNull(record) { "Reader in invalid state" } - - return when (index) { - colSubmissionTime -> record.submissionTime - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getDuration(index: Int): Duration? { - throw IllegalArgumentException("Invalid column") - } - - override fun getList( - index: Int, - elementType: Class, - ): List? { - throw IllegalArgumentException("Invalid column") - } - - override fun getSet( - index: Int, - elementType: Class, - ): Set? { - val record = checkNotNull(record) { "Reader in invalid state" } - return when (index) { - colParents -> typeParents.convertTo(record.parents, elementType) - colChildren -> typeChildren.convertTo(record.children, elementType) - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getMap( - index: Int, - keyType: Class, - valueType: Class, - ): Map? { - throw IllegalArgumentException("Invalid column") - } - - override fun close() { - reader.close() - } - - override fun toString(): String = "OdcVmResourceTableReader" -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableWriter.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableWriter.kt deleted file mode 100644 index 022e288a..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableWriter.kt +++ /dev/null @@ -1,249 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.formats.opendc - -import org.apache.parquet.hadoop.ParquetWriter -import org.opendc.trace.TableWriter -import org.opendc.trace.conv.resourceChildren -import org.opendc.trace.conv.resourceCpuCapacity -import org.opendc.trace.conv.resourceCpuCount -import org.opendc.trace.conv.resourceDeadline -import org.opendc.trace.conv.resourceDuration -import org.opendc.trace.conv.resourceGpuCapacity -import org.opendc.trace.conv.resourceGpuCount -import org.opendc.trace.conv.resourceID -import org.opendc.trace.conv.resourceMemCapacity -import org.opendc.trace.conv.resourceNature -import org.opendc.trace.conv.resourceParents -import org.opendc.trace.conv.resourceSubmissionTime -import org.opendc.trace.formats.opendc.parquet.Resource -import java.time.Duration -import java.time.Instant -import java.util.UUID - -/** - * A [TableWriter] implementation for the OpenDC virtual machine trace format. - */ -internal class OdcVmResourceTableWriter(private val writer: ParquetWriter) : TableWriter { - /** - * The current state for the record that is being written. - */ - private var localIsActive = false - private var localId: String = "" - private var localSubmissionTime: Instant = Instant.MIN - private var localDuration: Long = 0L - private var localCpuCount: Int = 0 - private var localCpuCapacity: Double = Double.NaN - private var localMemCapacity: Double = Double.NaN - private var localGpuCount: Int = 0 - private var localGpuCapacity: Double = Double.NaN - private var localParents = mutableSetOf() - private var localChildren = mutableSetOf() - private var localNature: String? = null - private var localDeadline: Long = -1 - - override fun startRow() { - localIsActive = true - localId = "" - localSubmissionTime = Instant.MIN - localDuration = 0L - localCpuCount = 0 - localCpuCapacity = Double.NaN - localMemCapacity = Double.NaN - localGpuCount = 0 - localGpuCapacity = Double.NaN - localParents.clear() - localChildren.clear() - localNature = null - localDeadline = -1L - } - - override fun endRow() { - check(localIsActive) { "No active row" } - localIsActive = false - writer.write( - Resource( - localId, - localSubmissionTime, - localDuration, - localCpuCount, - localCpuCapacity, - localMemCapacity, - localGpuCount, - localGpuCapacity, - localParents, - localChildren, - localNature, - localDeadline, - ), - ) - } - - override fun resolve(name: String): Int { - return when (name) { - resourceID -> colID - resourceSubmissionTime -> colSubmissionTime - resourceDuration -> colDuration - resourceCpuCount -> colCpuCount - resourceCpuCapacity -> colCpuCapacity - resourceMemCapacity -> colMemCapacity - resourceGpuCount -> colGpuCount - resourceGpuCapacity -> colGpuCapacity - resourceParents -> colParents - resourceChildren -> colChildren - resourceNature -> colNature - resourceDeadline -> colDeadline - else -> -1 - } - } - - override fun setBoolean( - index: Int, - value: Boolean, - ) { - throw IllegalArgumentException("Invalid column or type [index $index]") - } - - override fun setInt( - index: Int, - value: Int, - ) { - check(localIsActive) { "No active row" } - when (index) { - colCpuCount -> localCpuCount = value - colGpuCount -> localGpuCount = value - else -> throw IllegalArgumentException("Invalid column or type [index $index]") - } - } - - override fun setLong( - index: Int, - value: Long, - ) { - check(localIsActive) { "No active row" } - when (index) { - colDuration -> localDuration = value - colDeadline -> localDeadline = value - else -> throw IllegalArgumentException("Invalid column index $index") - } - } - - override fun setFloat( - index: Int, - value: Float, - ) { - throw IllegalArgumentException("Invalid column or type [index $index]") - } - - override fun setDouble( - index: Int, - value: Double, - ) { - check(localIsActive) { "No active row" } - when (index) { - colCpuCapacity -> localCpuCapacity = value - colMemCapacity -> localMemCapacity = value - colGpuCapacity -> localGpuCapacity = value - else -> throw IllegalArgumentException("Invalid column or type [index $index]") - } - } - - override fun setString( - index: Int, - value: String, - ) { - check(localIsActive) { "No active row" } - when (index) { - colID -> localId = value - colNature -> localNature = value - else -> throw IllegalArgumentException("Invalid column index $index") - } - } - - override fun setUUID( - index: Int, - value: UUID, - ) { - throw IllegalArgumentException("Invalid column or type [index $index]") - } - - override fun setInstant( - index: Int, - value: Instant, - ) { - check(localIsActive) { "No active row" } - when (index) { - colSubmissionTime -> localSubmissionTime = value - else -> throw IllegalArgumentException("Invalid column index $index") - } - } - - override fun setDuration( - index: Int, - value: Duration, - ) { - throw IllegalArgumentException("Invalid column or type [index $index]") - } - - override fun setList( - index: Int, - value: List, - ) { - throw IllegalArgumentException("Invalid column or type [index $index]") - } - - override fun setSet( - index: Int, - value: Set, - ) { - throw IllegalArgumentException("Invalid column or type [index $index]") - } - - override fun setMap( - index: Int, - value: Map, - ) { - throw IllegalArgumentException("Invalid column or type [index $index]") - } - - override fun flush() { - // Not available - } - - override fun close() { - writer.close() - } - - private val colID = 0 - private val colSubmissionTime = 1 - private val colDuration = 2 - private val colCpuCount = 3 - private val colCpuCapacity = 4 - private val colMemCapacity = 5 - private val colGpuCount = 6 - private val colGpuCapacity = 7 - private val colParents = 8 - private val colChildren = 9 - private val colNature = 10 - private val colDeadline = 11 -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmTraceFormat.kt deleted file mode 100644 index 74e880be..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmTraceFormat.kt +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.formats.opendc - -import com.fasterxml.jackson.core.JsonEncoding -import com.fasterxml.jackson.core.JsonFactory -import org.apache.parquet.column.ParquetProperties -import org.apache.parquet.hadoop.ParquetFileWriter -import org.apache.parquet.hadoop.metadata.CompressionCodecName -import org.opendc.trace.TableColumn -import org.opendc.trace.TableColumnType -import org.opendc.trace.TableReader -import org.opendc.trace.TableWriter -import org.opendc.trace.conv.INTERFERENCE_GROUP_MEMBERS -import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE -import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET -import org.opendc.trace.conv.TABLE_INTERFERENCE_GROUPS -import org.opendc.trace.conv.TABLE_RESOURCES -import org.opendc.trace.conv.TABLE_RESOURCE_STATES -import org.opendc.trace.conv.resourceChildren -import org.opendc.trace.conv.resourceCpuCapacity -import org.opendc.trace.conv.resourceCpuCount -import org.opendc.trace.conv.resourceDeadline -import org.opendc.trace.conv.resourceDuration -import org.opendc.trace.conv.resourceGpuCapacity -import org.opendc.trace.conv.resourceGpuCount -import org.opendc.trace.conv.resourceID -import org.opendc.trace.conv.resourceMemCapacity -import org.opendc.trace.conv.resourceNature -import org.opendc.trace.conv.resourceParents -import org.opendc.trace.conv.resourceStateCpuUsage -import org.opendc.trace.conv.resourceStateDuration -import org.opendc.trace.conv.resourceStateTimestamp -import org.opendc.trace.conv.resourceSubmissionTime -import org.opendc.trace.formats.opendc.parquet.ResourceReadSupport -import org.opendc.trace.formats.opendc.parquet.ResourceStateReadSupport -import org.opendc.trace.formats.opendc.parquet.ResourceStateWriteSupport -import org.opendc.trace.formats.opendc.parquet.ResourceWriteSupport -import org.opendc.trace.spi.TableDetails -import org.opendc.trace.spi.TraceFormat -import org.opendc.trace.util.parquet.LocalParquetReader -import org.opendc.trace.util.parquet.LocalParquetWriter -import java.nio.file.Files -import java.nio.file.Path -import kotlin.io.path.exists - -/** - * A [TraceFormat] implementation of the OpenDC virtual machine trace format. - */ -public class OdcVmTraceFormat : TraceFormat { - /** - * A [JsonFactory] that is used to parse the JSON-based interference model. - */ - private val jsonFactory = JsonFactory() - - /** - * The name of this trace format. - */ - override val name: String = "opendc-vm" - - override fun create(path: Path) { - // Construct directory containing the trace files - Files.createDirectories(path) - - val tables = getTables(path) - - for (table in tables) { - val writer = newWriter(path, table) - writer.close() - } - } - - override fun getTables(path: Path): List = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES, TABLE_INTERFERENCE_GROUPS) - - override fun getDetails( - path: Path, - table: String, - ): TableDetails { - return when (table) { - TABLE_RESOURCES -> - TableDetails( - listOf( - TableColumn(resourceID, TableColumnType.String), - TableColumn(resourceSubmissionTime, TableColumnType.Instant), - TableColumn(resourceDuration, TableColumnType.Long), - TableColumn(resourceCpuCount, TableColumnType.Int), - TableColumn(resourceCpuCapacity, TableColumnType.Double), - TableColumn(resourceMemCapacity, TableColumnType.Double), - TableColumn(resourceGpuCount, TableColumnType.Int), - TableColumn(resourceGpuCapacity, TableColumnType.Double), - TableColumn(resourceParents, TableColumnType.Set(TableColumnType.String)), - TableColumn(resourceChildren, TableColumnType.Set(TableColumnType.String)), - TableColumn(resourceNature, TableColumnType.String), - TableColumn(resourceDeadline, TableColumnType.Long), - ), - ) - TABLE_RESOURCE_STATES -> - TableDetails( - listOf( - TableColumn(resourceID, TableColumnType.String), - TableColumn(resourceStateTimestamp, TableColumnType.Instant), - TableColumn(resourceStateDuration, TableColumnType.Duration), - TableColumn(resourceCpuCount, TableColumnType.Int), - TableColumn(resourceStateCpuUsage, TableColumnType.Double), - ), - ) - TABLE_INTERFERENCE_GROUPS -> - TableDetails( - listOf( - TableColumn(INTERFERENCE_GROUP_MEMBERS, TableColumnType.Set(TableColumnType.String)), - TableColumn(INTERFERENCE_GROUP_TARGET, TableColumnType.Double), - TableColumn(INTERFERENCE_GROUP_SCORE, TableColumnType.Double), - ), - ) - else -> throw IllegalArgumentException("Table $table not supported") - } - } - - override fun newReader( - path: Path, - table: String, - projection: List?, - ): TableReader { - return when (table) { - TABLE_RESOURCES -> { - val reader = LocalParquetReader(path.resolve("tasks.parquet"), ResourceReadSupport(projection)) - OdcVmResourceTableReader(reader) - } - TABLE_RESOURCE_STATES -> { - val reader = LocalParquetReader(path.resolve("fragments.parquet"), ResourceStateReadSupport(projection)) - OdcVmResourceStateTableReader(reader) - } - TABLE_INTERFERENCE_GROUPS -> { - val modelPath = path.resolve("interference-model.json") - val parser = - if (modelPath.exists()) { - jsonFactory.createParser(modelPath.toFile()) - } else { - jsonFactory.createParser("[]") // If model does not exist, return empty model - } - - OdcVmInterferenceJsonTableReader(parser) - } - else -> throw IllegalArgumentException("Table $table not supported") - } - } - - override fun newWriter( - path: Path, - table: String, - ): TableWriter { - return when (table) { - TABLE_RESOURCES -> { - val writer = - LocalParquetWriter.builder(path.resolve("tasks.parquet"), ResourceWriteSupport()) - .withCompressionCodec(CompressionCodecName.ZSTD) - .withPageWriteChecksumEnabled(true) - .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0) - .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) - .build() - OdcVmResourceTableWriter(writer) - } - TABLE_RESOURCE_STATES -> { - val writer = - LocalParquetWriter.builder(path.resolve("fragments.parquet"), ResourceStateWriteSupport()) - .withCompressionCodec(CompressionCodecName.ZSTD) - .withDictionaryEncoding("id", true) - .withBloomFilterEnabled("id", true) - .withPageWriteChecksumEnabled(true) - .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0) - .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) - .build() - OdcVmResourceStateTableWriter(writer) - } - TABLE_INTERFERENCE_GROUPS -> { - val generator = jsonFactory.createGenerator(path.resolve("interference-model.json").toFile(), JsonEncoding.UTF8) - OdcVmInterferenceJsonTableWriter(generator) - } - else -> throw IllegalArgumentException("Table $table not supported") - } - } -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/Resource.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/Resource.kt deleted file mode 100644 index d727920a..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/Resource.kt +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.formats.opendc.parquet - -import java.time.Instant - -/** - * A description of a resource in a trace. - */ -internal data class Resource( - val id: String, - val submissionTime: Instant, - val durationTime: Long, - val cpuCount: Int, - val cpuCapacity: Double, - val memCapacity: Double, - val gpuCount: Int = 0, - val gpuCapacity: Double = 0.0, - val parents: Set = emptySet(), - val children: Set = emptySet(), - val nature: String? = null, - val deadline: Long = -1, -) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceReadSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceReadSupport.kt deleted file mode 100644 index cd2ccef7..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceReadSupport.kt +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.formats.opendc.parquet - -import org.apache.hadoop.conf.Configuration -import org.apache.parquet.hadoop.api.InitContext -import org.apache.parquet.hadoop.api.ReadSupport -import org.apache.parquet.io.api.RecordMaterializer -import org.apache.parquet.schema.LogicalTypeAnnotation -import org.apache.parquet.schema.MessageType -import org.apache.parquet.schema.PrimitiveType -import org.apache.parquet.schema.Type -import org.apache.parquet.schema.Types -import org.opendc.trace.TableColumn -import org.opendc.trace.conv.resourceChildren -import org.opendc.trace.conv.resourceCpuCapacity -import org.opendc.trace.conv.resourceCpuCount -import org.opendc.trace.conv.resourceDeadline -import org.opendc.trace.conv.resourceDuration -import org.opendc.trace.conv.resourceGpuCapacity -import org.opendc.trace.conv.resourceGpuCount -import org.opendc.trace.conv.resourceID -import org.opendc.trace.conv.resourceMemCapacity -import org.opendc.trace.conv.resourceNature -import org.opendc.trace.conv.resourceParents -import org.opendc.trace.conv.resourceSubmissionTime - -/** - * A [ReadSupport] instance for [Resource] objects. - */ -internal class ResourceReadSupport(private val projection: List?) : ReadSupport() { - /** - * Mapping from field names to [TableColumn]s. - */ - private val fieldMap = - mapOf( - "id" to resourceID, - "submissionTime" to resourceSubmissionTime, - "submission_time" to resourceSubmissionTime, - "duration" to resourceDuration, - "maxCores" to resourceCpuCount, - "cpu_count" to resourceCpuCount, - "cpu_capacity" to resourceCpuCapacity, - "requiredMemory" to resourceMemCapacity, - "mem_capacity" to resourceMemCapacity, - "gpu_count" to resourceGpuCount, - "gpu_capacity" to resourceGpuCapacity, - "parents" to resourceParents, - "children" to resourceChildren, - "nature" to resourceNature, - "deadline" to resourceDeadline, - ) - - override fun init(context: InitContext): ReadContext { - val projectedSchema = - if (projection != null) { - Types.buildMessage() - .apply { - val projectionSet = projection.toSet() - - for (field in READ_SCHEMA.fields) { - val col = fieldMap[field.name] ?: continue - if (col in projectionSet) { - addField(field) - } - } - } - .named(READ_SCHEMA.name) - } else { - READ_SCHEMA - } - - return ReadContext(projectedSchema) - } - - override fun prepareForRead( - configuration: Configuration, - keyValueMetaData: Map, - fileSchema: MessageType, - readContext: ReadContext, - ): RecordMaterializer = ResourceRecordMaterializer(readContext.requestedSchema) - - companion object { - /** - * Parquet read schema (version 2.0) for the "resources" table in the trace. - */ - @JvmStatic - val READ_SCHEMA_V2_0: MessageType = - Types.buildMessage() - .addFields( - Types - .required(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("id"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("submissionTime"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("duration"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("maxCores"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("requiredMemory"), - Types - .optional(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("nature"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .named("deadline"), - ) - .named("resource") - - /** - * Parquet read schema (version 2.1) for the "resources" table in the trace. - */ - @JvmStatic - val READ_SCHEMA_V2_2: MessageType = - Types.buildMessage() - .addFields( - Types - .required(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("id"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("submission_time"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("duration"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("cpu_count"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("cpu_capacity"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("mem_capacity"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT32) - .named("gpu_count"), - Types - .optional(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("gpu_capacity"), - Types - .buildGroup(Type.Repetition.OPTIONAL) - .addField( - Types.repeatedGroup() - .addField( - Types.optional(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("element"), - ) - .named("list"), - ) - .`as`(LogicalTypeAnnotation.listType()) - .named("parents"), - Types - .buildGroup(Type.Repetition.OPTIONAL) - .addField( - Types.repeatedGroup() - .addField( - Types.optional(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("element"), - ) - .named("list"), - ) - .`as`(LogicalTypeAnnotation.listType()) - .named("children"), - Types - .optional(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("nature"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .named("deadline"), - ) - .named("resource") - - /** - * Parquet read schema for the "resources" table in the trace. - */ - @JvmStatic - val READ_SCHEMA: MessageType = - READ_SCHEMA_V2_2 - } -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceRecordMaterializer.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceRecordMaterializer.kt deleted file mode 100644 index f9493721..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceRecordMaterializer.kt +++ /dev/null @@ -1,207 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.formats.opendc.parquet - -import org.apache.parquet.io.api.Binary -import org.apache.parquet.io.api.Converter -import org.apache.parquet.io.api.GroupConverter -import org.apache.parquet.io.api.PrimitiveConverter -import org.apache.parquet.io.api.RecordMaterializer -import org.apache.parquet.schema.MessageType -import java.time.Instant - -/** - * A [RecordMaterializer] for [Resource] records. - */ -internal class ResourceRecordMaterializer(schema: MessageType) : RecordMaterializer() { - /** - * State of current record being read. - */ - private var localId = "" - private var localSubmissionTime = Instant.MIN - private var localDuration = 0L - private var localCpuCount = 0 - private var localCpuCapacity = 0.0 - private var localMemCapacity = 0.0 - private var localGpuCount = 0 - private var localGpuCapacity = 0.0 - private var localParents = mutableSetOf() - private var localChildren = mutableSetOf() - private var localNature: String? = null - private var localDeadline = -1L - - /** - * Root converter for the record. - */ - private val root = - object : GroupConverter() { - /** - * The converters for the columns of the schema. - */ - private val converters = - schema.fields.map { type -> - when (type.name) { - "id" -> - object : PrimitiveConverter() { - override fun addBinary(value: Binary) { - localId = value.toStringUsingUTF8() - } - } - "submission_time", "submissionTime" -> - object : PrimitiveConverter() { - override fun addLong(value: Long) { - localSubmissionTime = Instant.ofEpochMilli(value) - } - } - "duration" -> - object : PrimitiveConverter() { - override fun addLong(value: Long) { - localDuration = value - } - } - "cpu_count", "maxCores" -> - object : PrimitiveConverter() { - override fun addInt(value: Int) { - localCpuCount = value - } - } - "cpu_capacity" -> - object : PrimitiveConverter() { - override fun addDouble(value: Double) { - localCpuCapacity = value - } - } - "mem_capacity", "requiredMemory" -> - object : PrimitiveConverter() { - override fun addDouble(value: Double) { - localMemCapacity = value - } - - override fun addLong(value: Long) { - localMemCapacity = value.toDouble() - } - } - "gpu_count", "gpuMaxCores" -> - object : PrimitiveConverter() { - override fun addInt(value: Int) { - localGpuCount = value - } - } - "gpu_capacity" -> - object : PrimitiveConverter() { - override fun addDouble(value: Double) { - localGpuCapacity = value - } - } - "parents" -> RelationConverter(localParents) - "children" -> RelationConverter(localChildren) - "nature" -> - object : PrimitiveConverter() { - override fun addBinary(value: Binary) { - localNature = value.toStringUsingUTF8() - } - } - "deadline" -> - object : PrimitiveConverter() { - override fun addLong(value: Long) { - localDeadline = value - } - } - else -> error("Unknown column $type") - } - } - - override fun start() { - localId = "" - localSubmissionTime = Instant.MIN - localDuration = 0L - localCpuCount = 0 - localCpuCapacity = 0.0 - localMemCapacity = 0.0 - localGpuCount = 0 - localGpuCapacity = 0.0 - localParents.clear() - localChildren.clear() - localNature = null - localDeadline = -1 - } - - override fun end() {} - - override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex] - } - - override fun getCurrentRecord(): Resource = - Resource( - localId, - localSubmissionTime, - localDuration, - localCpuCount, - localCpuCapacity, - localMemCapacity, - localGpuCount, - localGpuCapacity, - localParents.toSet(), - localChildren.toSet(), - localNature, - localDeadline, - ) - - override fun getRootConverter(): GroupConverter = root - - /** - * Helper class to convert parent and child relations and add them to [relations]. - */ - private class RelationConverter(private val relations: MutableSet) : GroupConverter() { - private val entryConverter = - object : PrimitiveConverter() { - override fun addBinary(value: Binary) { - val str = value.toStringUsingUTF8() - relations.add(str) - } - } - - private val listGroupConverter = - object : GroupConverter() { - override fun getConverter(fieldIndex: Int): Converter { - // fieldIndex = 0 corresponds to "element" - require(fieldIndex == 0) - return entryConverter - } - - override fun start() {} - - override fun end() {} - } - - override fun getConverter(fieldIndex: Int): Converter { - // fieldIndex = 0 corresponds to "list" - require(fieldIndex == 0) - return listGroupConverter - } - - override fun start() {} - - override fun end() {} - } -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceState.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceState.kt deleted file mode 100644 index 10fc6be4..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceState.kt +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.formats.opendc.parquet - -import java.time.Duration -import java.time.Instant - -internal class ResourceState( - val id: String, - val timestamp: Instant, - val duration: Duration, - val cpuCount: Int, - val cpuUsage: Double, - val gpuCount: Int, - val gpuUsage: Double, -) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateReadSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateReadSupport.kt deleted file mode 100644 index 53e594de..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateReadSupport.kt +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.formats.opendc.parquet - -import org.apache.hadoop.conf.Configuration -import org.apache.parquet.hadoop.api.InitContext -import org.apache.parquet.hadoop.api.ReadSupport -import org.apache.parquet.io.api.RecordMaterializer -import org.apache.parquet.schema.LogicalTypeAnnotation -import org.apache.parquet.schema.MessageType -import org.apache.parquet.schema.PrimitiveType -import org.apache.parquet.schema.Types -import org.opendc.trace.TableColumn -import org.opendc.trace.conv.resourceCpuCount -import org.opendc.trace.conv.resourceID -import org.opendc.trace.conv.resourceStateCpuUsage -import org.opendc.trace.conv.resourceStateDuration -import org.opendc.trace.conv.resourceStateTimestamp - -/** - * A [ReadSupport] instance for [ResourceState] objects. - */ -internal class ResourceStateReadSupport(private val projection: List?) : ReadSupport() { - /** - * Mapping from field names to [TableColumn]s. - */ - private val fieldMap = - mapOf( - "id" to resourceID, - "time" to resourceStateTimestamp, - "timestamp" to resourceStateTimestamp, - "duration" to resourceStateDuration, - "cores" to resourceCpuCount, - "cpu_count" to resourceCpuCount, - "cpuUsage" to resourceStateCpuUsage, - "cpu_usage" to resourceStateCpuUsage, - ) - - override fun init(context: InitContext): ReadContext { - val projectedSchema = - if (projection != null) { - Types.buildMessage() - .apply { - val projectionSet = projection.toSet() - - for (field in READ_SCHEMA.fields) { - val col = fieldMap[field.name] ?: continue - if (col in projectionSet) { - addField(field) - } - } - } - .named(READ_SCHEMA.name) - } else { - READ_SCHEMA - } - - return ReadContext(projectedSchema) - } - - override fun prepareForRead( - configuration: Configuration, - keyValueMetaData: Map, - fileSchema: MessageType, - readContext: ReadContext, - ): RecordMaterializer = ResourceStateRecordMaterializer(readContext.requestedSchema) - - companion object { - /** - * Parquet read schema (version 2.0) for the "resource states" table in the trace. - */ - @JvmStatic - val READ_SCHEMA_V2_0: MessageType = - Types.buildMessage() - .addFields( - Types - .required(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("id"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("time"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("duration"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("cores"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("cpuUsage"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT32) - .named("gpuCount"), - Types - .optional(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("gpuUsage"), - ) - .named("resource_state") - - /** - * Parquet read schema (version 2.1) for the "resource states" table in the trace. - */ - @JvmStatic - val READ_SCHEMA_V2_1: MessageType = - Types.buildMessage() - .addFields( - Types - .required(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("id"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("timestamp"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("duration"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("cpu_count"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("cpu_usage"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT32) - .named("gpu_count"), - Types - .optional(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("gpu_usage"), - ) - .named("resource_state") - - /** - * Parquet read schema for the "resource states" table in the trace. - */ - @JvmStatic - val READ_SCHEMA: MessageType = READ_SCHEMA_V2_0.union(READ_SCHEMA_V2_1) - } -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateRecordMaterializer.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateRecordMaterializer.kt deleted file mode 100644 index ee5e56aa..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateRecordMaterializer.kt +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.formats.opendc.parquet - -import org.apache.parquet.io.api.Binary -import org.apache.parquet.io.api.Converter -import org.apache.parquet.io.api.GroupConverter -import org.apache.parquet.io.api.PrimitiveConverter -import org.apache.parquet.io.api.RecordMaterializer -import org.apache.parquet.schema.MessageType -import java.time.Duration -import java.time.Instant - -/** - * A [RecordMaterializer] for [ResourceState] records. - */ -internal class ResourceStateRecordMaterializer(schema: MessageType) : RecordMaterializer() { - /** - * State of current record being read. - */ - private var localId = "" - private var localTimestamp = Instant.MIN - private var localDuration = Duration.ZERO - private var localCpuCount = 0 - private var localCpuUsage = 0.0 - private var localGpuCount = 0 - private var localGpuUsage = 0.0 - - /** - * Root converter for the record. - */ - private val root = - object : GroupConverter() { - /** - * The converters for the columns of the schema. - */ - private val converters = - schema.fields.map { type -> - when (type.name) { - "id" -> - object : PrimitiveConverter() { - override fun addBinary(value: Binary) { - localId = value.toStringUsingUTF8() - } - } - "timestamp", "time" -> - object : PrimitiveConverter() { - override fun addLong(value: Long) { - localTimestamp = Instant.ofEpochMilli(value) - } - } - "duration" -> - object : PrimitiveConverter() { - override fun addLong(value: Long) { - localDuration = Duration.ofMillis(value) - } - } - "cpu_count", "cores" -> - object : PrimitiveConverter() { - override fun addInt(value: Int) { - localCpuCount = value - } - } - "cpu_usage", "cpuUsage" -> - object : PrimitiveConverter() { - override fun addDouble(value: Double) { - localCpuUsage = value - } - } - "gpu_count", "gpuCount", "gpu_cores", "gpuCores" -> - object : PrimitiveConverter() { - override fun addInt(value: Int) { - localGpuCount = value - } - } - "gpu_usage", "gpuUsage" -> - object : PrimitiveConverter() { - override fun addDouble(value: Double) { - localGpuUsage = value - } - } - else -> error("Unknown column $type") - } - } - - override fun start() { - localId = "" - localTimestamp = Instant.MIN - localDuration = Duration.ZERO - localCpuCount = 0 - localCpuUsage = 0.0 - localGpuCount = 0 - localGpuUsage = 0.0 - } - - override fun end() {} - - override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex] - } - - override fun getCurrentRecord(): ResourceState = - ResourceState( - localId, - localTimestamp, - localDuration, - localCpuCount, - localCpuUsage, - localGpuCount, - localGpuUsage, - ) - - override fun getRootConverter(): GroupConverter = root -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateWriteSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateWriteSupport.kt deleted file mode 100644 index 58c43916..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateWriteSupport.kt +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.formats.opendc.parquet - -import org.apache.hadoop.conf.Configuration -import org.apache.parquet.hadoop.api.WriteSupport -import org.apache.parquet.io.api.Binary -import org.apache.parquet.io.api.RecordConsumer -import org.apache.parquet.schema.LogicalTypeAnnotation -import org.apache.parquet.schema.MessageType -import org.apache.parquet.schema.PrimitiveType -import org.apache.parquet.schema.Types - -/** - * Support for writing [Resource] instances to Parquet format. - */ -internal class ResourceStateWriteSupport : WriteSupport() { - /** - * The current active record consumer. - */ - private lateinit var recordConsumer: RecordConsumer - - override fun init(configuration: Configuration): WriteContext { - return WriteContext(WRITE_SCHEMA, emptyMap()) - } - - override fun prepareForWrite(recordConsumer: RecordConsumer) { - this.recordConsumer = recordConsumer - } - - override fun write(record: ResourceState) { - write(recordConsumer, record) - } - - private fun write( - consumer: RecordConsumer, - record: ResourceState, - ) { - consumer.startMessage() - - consumer.startField("id", 0) - consumer.addBinary(Binary.fromCharSequence(record.id)) - consumer.endField("id", 0) - - consumer.startField("timestamp", 1) - consumer.addLong(record.timestamp.toEpochMilli()) - consumer.endField("timestamp", 1) - - consumer.startField("duration", 2) - consumer.addLong(record.duration.toMillis()) - consumer.endField("duration", 2) - - consumer.startField("cpu_count", 3) - consumer.addInteger(record.cpuCount) - consumer.endField("cpu_count", 3) - - consumer.startField("cpu_usage", 4) - consumer.addDouble(record.cpuUsage) - consumer.endField("cpu_usage", 4) - - consumer.endMessage() - } - - companion object { - /** - * Parquet schema for the "resource states" table in the trace. - */ - @JvmStatic - val WRITE_SCHEMA: MessageType = - Types.buildMessage() - .addFields( - Types - .required(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("id"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("timestamp"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("duration"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("cpu_count"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("cpu_usage"), - ) - .named("resource_state") - } -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceWriteSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceWriteSupport.kt deleted file mode 100644 index c3e984fb..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceWriteSupport.kt +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.formats.opendc.parquet - -import org.apache.hadoop.conf.Configuration -import org.apache.parquet.hadoop.api.WriteSupport -import org.apache.parquet.io.api.Binary -import org.apache.parquet.io.api.RecordConsumer -import org.apache.parquet.schema.LogicalTypeAnnotation -import org.apache.parquet.schema.MessageType -import org.apache.parquet.schema.PrimitiveType -import org.apache.parquet.schema.Types -import kotlin.math.roundToLong - -/** - * Support for writing [Resource] instances to Parquet format. - */ -internal class ResourceWriteSupport : WriteSupport() { - /** - * The current active record consumer. - */ - private lateinit var recordConsumer: RecordConsumer - - override fun init(configuration: Configuration): WriteContext { - return WriteContext(WRITE_SCHEMA, emptyMap()) - } - - override fun prepareForWrite(recordConsumer: RecordConsumer) { - this.recordConsumer = recordConsumer - } - - override fun write(record: Resource) { - write(recordConsumer, record) - } - - private fun write( - consumer: RecordConsumer, - record: Resource, - ) { - consumer.startMessage() - - consumer.startField("id", 0) - consumer.addBinary(Binary.fromCharSequence(record.id)) - consumer.endField("id", 0) - - consumer.startField("submission_time", 1) - consumer.addLong(record.submissionTime.toEpochMilli()) - consumer.endField("submission_time", 1) - - consumer.startField("duration", 2) - consumer.addLong(record.durationTime) - consumer.endField("duration", 2) - - consumer.startField("cpu_count", 3) - consumer.addInteger(record.cpuCount) - consumer.endField("cpu_count", 3) - - consumer.startField("cpu_capacity", 4) - consumer.addDouble(record.cpuCapacity) - consumer.endField("cpu_capacity", 4) - - consumer.startField("mem_capacity", 5) - consumer.addLong(record.memCapacity.roundToLong()) - consumer.endField("mem_capacity", 5) - - record.nature?.let { - consumer.startField("nature", 6) - consumer.addBinary(Binary.fromCharSequence(it)) - consumer.endField("nature", 6) - } - - if (record.deadline != -1L) { - consumer.startField("deadline", 7) - consumer.addLong(record.deadline) - consumer.endField("deadline", 7) - } - - consumer.endMessage() - } - - companion object { - /** - * Parquet schema for the "resources" table in the trace. - */ - @JvmStatic - val WRITE_SCHEMA: MessageType = - Types.buildMessage() - .addFields( - Types - .required(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("id"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("submission_time"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("duration"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("cpu_count"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("cpu_capacity"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("mem_capacity"), - Types - .optional(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("nature"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .named("deadline"), - ) - .named("resource") - } -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/swf/SwfTaskTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/swf/SwfTaskTableReader.kt deleted file mode 100644 index 5a79fd6f..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/swf/SwfTaskTableReader.kt +++ /dev/null @@ -1,236 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.swf - -import org.opendc.trace.TableReader -import org.opendc.trace.conv.TASK_ALLOC_NCPUS -import org.opendc.trace.conv.TASK_GROUP_ID -import org.opendc.trace.conv.TASK_ID -import org.opendc.trace.conv.TASK_PARENTS -import org.opendc.trace.conv.TASK_REQ_NCPUS -import org.opendc.trace.conv.TASK_RUNTIME -import org.opendc.trace.conv.TASK_STATUS -import org.opendc.trace.conv.TASK_SUBMIT_TIME -import org.opendc.trace.conv.TASK_USER_ID -import org.opendc.trace.conv.TASK_WAIT_TIME -import java.io.BufferedReader -import java.time.Duration -import java.time.Instant -import java.util.UUID - -/** - * A [TableReader] implementation for the SWF format. - */ -internal class SwfTaskTableReader(private val reader: BufferedReader) : TableReader { - /** - * A flag to indicate the state of the reader - */ - private var state = State.Pending - - /** - * The current row. - */ - private var fields = emptyList() - - /** - * A [Regex] object to match whitespace. - */ - private val whitespace = "\\s+".toRegex() - - override fun nextRow(): Boolean { - var line: String? - var num = 0 - - val state = state - if (state == State.Closed) { - return false - } else if (state == State.Pending) { - this.state = State.Active - } - - while (true) { - line = reader.readLine() - - if (line == null) { - this.state = State.Closed - return false - } - num++ - - if (line.isBlank()) { - // Ignore empty lines - continue - } else if (line.startsWith(";")) { - // Ignore comments for now - continue - } - - break - } - - fields = line!!.trim().split(whitespace) - - if (fields.size < 18) { - throw IllegalArgumentException("Invalid format at line $line") - } - - return true - } - - override fun resolve(name: String): Int { - return when (name) { - TASK_ID -> colJobID - TASK_SUBMIT_TIME -> colSubmitTime - TASK_WAIT_TIME -> colWaitTime - TASK_RUNTIME -> colRunTime - TASK_ALLOC_NCPUS -> colAllocNcpus - TASK_REQ_NCPUS -> colReqNcpus - TASK_STATUS -> colStatus - TASK_USER_ID -> colUserID - TASK_GROUP_ID -> colGroupID - TASK_PARENTS -> colParentJob - else -> -1 - } - } - - override fun isNull(index: Int): Boolean { - require(index in colJobID..colParentThinkTime) { "Invalid column index" } - return false - } - - override fun getBoolean(index: Int): Boolean { - throw IllegalArgumentException("Invalid column") - } - - override fun getInt(index: Int): Int { - check(state == State.Active) { "No active row" } - return when (index) { - colReqNcpus, colAllocNcpus, colStatus, colGroupID, colUserID -> fields[index].toInt(10) - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getLong(index: Int): Long { - throw IllegalArgumentException("Invalid column") - } - - override fun getFloat(index: Int): Float { - throw IllegalArgumentException("Invalid column") - } - - override fun getDouble(index: Int): Double { - throw IllegalArgumentException("Invalid column") - } - - override fun getString(index: Int): String { - check(state == State.Active) { "No active row" } - return when (index) { - colJobID -> fields[index] - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getUUID(index: Int): UUID? { - throw IllegalArgumentException("Invalid column") - } - - override fun getInstant(index: Int): Instant? { - check(state == State.Active) { "No active row" } - return when (index) { - colSubmitTime -> Instant.ofEpochSecond(fields[index].toLong(10)) - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getDuration(index: Int): Duration? { - check(state == State.Active) { "No active row" } - return when (index) { - colWaitTime, colRunTime -> Duration.ofSeconds(fields[index].toLong(10)) - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getList( - index: Int, - elementType: Class, - ): List? { - throw IllegalArgumentException("Invalid column") - } - - override fun getSet( - index: Int, - elementType: Class, - ): Set? { - check(state == State.Active) { "No active row" } - @Suppress("UNCHECKED_CAST") - return when (index) { - colParentJob -> { - require(elementType.isAssignableFrom(String::class.java)) - val parent = fields[index].toLong(10) - if (parent < 0) emptySet() else setOf(parent) - } - else -> throw IllegalArgumentException("Invalid column") - } as Set? - } - - override fun getMap( - index: Int, - keyType: Class, - valueType: Class, - ): Map? { - throw IllegalArgumentException("Invalid column") - } - - override fun close() { - reader.close() - state = State.Closed - } - - /** - * Default column indices for the SWF format. - */ - private val colJobID = 0 - private val colSubmitTime = 1 - private val colWaitTime = 2 - private val colRunTime = 3 - private val colAllocNcpus = 4 - private val colAvgCpuTime = 5 - private val colUsedMem = 6 - private val colReqNcpus = 7 - private val colReqTime = 8 - private val colReqMem = 9 - private val colStatus = 10 - private val colUserID = 11 - private val colGroupID = 12 - private val colExecNum = 13 - private val colQueueNum = 14 - private val colPartNum = 15 - private val colParentJob = 16 - private val colParentThinkTime = 17 - - private enum class State { - Pending, - Active, - Closed, - } -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/swf/SwfTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/swf/SwfTraceFormat.kt deleted file mode 100644 index d59b07b4..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/swf/SwfTraceFormat.kt +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.swf - -import org.opendc.trace.TableColumn -import org.opendc.trace.TableColumnType -import org.opendc.trace.TableReader -import org.opendc.trace.TableWriter -import org.opendc.trace.conv.TABLE_TASKS -import org.opendc.trace.conv.TASK_ALLOC_NCPUS -import org.opendc.trace.conv.TASK_GROUP_ID -import org.opendc.trace.conv.TASK_ID -import org.opendc.trace.conv.TASK_PARENTS -import org.opendc.trace.conv.TASK_REQ_NCPUS -import org.opendc.trace.conv.TASK_RUNTIME -import org.opendc.trace.conv.TASK_STATUS -import org.opendc.trace.conv.TASK_SUBMIT_TIME -import org.opendc.trace.conv.TASK_USER_ID -import org.opendc.trace.conv.TASK_WAIT_TIME -import org.opendc.trace.spi.TableDetails -import org.opendc.trace.spi.TraceFormat -import java.nio.file.Path -import kotlin.io.path.bufferedReader - -/** - * Support for the Standard Workload Format (SWF) in OpenDC. - * - * The standard is defined by the PWA, see here: https://www.cse.huji.ac.il/labs/parallel/workload/swf.html - */ -public class SwfTraceFormat : TraceFormat { - override val name: String = "swf" - - override fun create(path: Path) { - throw UnsupportedOperationException("Writing not supported for this format") - } - - override fun getTables(path: Path): List = listOf(TABLE_TASKS) - - override fun getDetails( - path: Path, - table: String, - ): TableDetails { - return when (table) { - TABLE_TASKS -> - TableDetails( - listOf( - TableColumn(TASK_ID, TableColumnType.String), - TableColumn(TASK_SUBMIT_TIME, TableColumnType.Instant), - TableColumn(TASK_WAIT_TIME, TableColumnType.Duration), - TableColumn(TASK_RUNTIME, TableColumnType.Duration), - TableColumn(TASK_REQ_NCPUS, TableColumnType.Int), - TableColumn(TASK_ALLOC_NCPUS, TableColumnType.Int), - TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)), - TableColumn(TASK_STATUS, TableColumnType.Int), - TableColumn(TASK_GROUP_ID, TableColumnType.Int), - TableColumn(TASK_USER_ID, TableColumnType.Int), - ), - ) - else -> throw IllegalArgumentException("Table $table not supported") - } - } - - override fun newReader( - path: Path, - table: String, - projection: List?, - ): TableReader { - return when (table) { - TABLE_TASKS -> SwfTaskTableReader(path.bufferedReader()) - else -> throw IllegalArgumentException("Table $table not supported") - } - } - - override fun newWriter( - path: Path, - table: String, - ): TableWriter { - throw UnsupportedOperationException("Writing not supported for this format") - } -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wfformat/WfFormatTaskTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wfformat/WfFormatTaskTableReader.kt deleted file mode 100644 index 8f84e51f..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wfformat/WfFormatTaskTableReader.kt +++ /dev/null @@ -1,314 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.wfformat - -import com.fasterxml.jackson.core.JsonParseException -import com.fasterxml.jackson.core.JsonParser -import com.fasterxml.jackson.core.JsonToken -import org.opendc.trace.TableColumnType -import org.opendc.trace.TableReader -import org.opendc.trace.conv.TASK_CHILDREN -import org.opendc.trace.conv.TASK_ID -import org.opendc.trace.conv.TASK_PARENTS -import org.opendc.trace.conv.TASK_REQ_NCPUS -import org.opendc.trace.conv.TASK_RUNTIME -import org.opendc.trace.conv.TASK_WORKFLOW_ID -import org.opendc.trace.util.convertTo -import java.time.Duration -import java.time.Instant -import java.util.UUID -import kotlin.math.roundToInt - -/** - * A [TableReader] implementation for the WfCommons workload trace format. - */ -internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableReader { - /** - * The current nesting of the parser. - */ - private var level: ParserLevel = ParserLevel.TOP - - override fun nextRow(): Boolean { - reset() - - var hasJob = false - - while (!hasJob) { - when (level) { - ParserLevel.TOP -> { - val token = parser.nextToken() - - // Check whether the document is not empty and starts with an object - if (token == null) { - parser.close() - break - } else if (token != JsonToken.START_OBJECT) { - throw JsonParseException(parser, "Expected object", parser.currentLocation) - } else { - level = ParserLevel.TRACE - } - } - ParserLevel.TRACE -> { - // Seek for the workflow object in the file - if (!seekWorkflow()) { - parser.close() - break - } else if (!parser.isExpectedStartObjectToken) { - throw JsonParseException(parser, "Expected object", parser.currentLocation) - } else { - level = ParserLevel.WORKFLOW - } - } - ParserLevel.WORKFLOW -> { - // Seek for the jobs object in the file - level = - if (!seekJobs()) { - ParserLevel.TRACE - } else if (!parser.isExpectedStartArrayToken) { - throw JsonParseException(parser, "Expected array", parser.currentLocation) - } else { - ParserLevel.JOB - } - } - ParserLevel.JOB -> { - when (parser.nextToken()) { - JsonToken.END_ARRAY -> level = ParserLevel.WORKFLOW - JsonToken.START_OBJECT -> { - parseJob() - hasJob = true - break - } - else -> throw JsonParseException(parser, "Unexpected token", parser.currentLocation) - } - } - } - } - - return hasJob - } - - override fun resolve(name: String): Int { - return when (name) { - TASK_ID -> colID - TASK_WORKFLOW_ID -> colWorkflowID - TASK_RUNTIME -> colRuntime - TASK_REQ_NCPUS -> colNproc - TASK_PARENTS -> colParents - TASK_CHILDREN -> colChildren - else -> -1 - } - } - - override fun isNull(index: Int): Boolean { - require(index in 0..colChildren) { "Invalid column value" } - return false - } - - override fun getBoolean(index: Int): Boolean { - throw IllegalArgumentException("Invalid column") - } - - override fun getInt(index: Int): Int { - checkActive() - return when (index) { - colNproc -> cores - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getLong(index: Int): Long { - throw IllegalArgumentException("Invalid column") - } - - override fun getFloat(index: Int): Float { - throw IllegalArgumentException("Invalid column") - } - - override fun getDouble(index: Int): Double { - throw IllegalArgumentException("Invalid column") - } - - override fun getString(index: Int): String? { - checkActive() - return when (index) { - colID -> id - colWorkflowID -> workflowId - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getUUID(index: Int): UUID? { - throw IllegalArgumentException("Invalid column") - } - - override fun getInstant(index: Int): Instant? { - throw IllegalArgumentException("Invalid column") - } - - override fun getDuration(index: Int): Duration? { - checkActive() - return when (index) { - colRuntime -> runtime - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getList( - index: Int, - elementType: Class, - ): List? { - throw IllegalArgumentException("Invalid column") - } - - override fun getSet( - index: Int, - elementType: Class, - ): Set? { - checkActive() - return when (index) { - colParents -> typeParents.convertTo(parents, elementType) - colChildren -> typeChildren.convertTo(children, elementType) - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getMap( - index: Int, - keyType: Class, - valueType: Class, - ): Map? { - throw IllegalArgumentException("Invalid column") - } - - override fun close() { - parser.close() - } - - /** - * Helper method to check if the reader is active. - */ - private fun checkActive() { - check(level != ParserLevel.TOP && !parser.isClosed) { "No active row. Did you call nextRow()?" } - } - - /** - * Parse the trace and seek until the workflow description. - */ - private fun seekWorkflow(): Boolean { - while (parser.nextValue() != JsonToken.END_OBJECT && !parser.isClosed) { - when (parser.currentName) { - "name" -> workflowId = parser.text - "workflow" -> return true - else -> parser.skipChildren() - } - } - - return false - } - - /** - * Parse the workflow description in the file and seek until the first job. - */ - private fun seekJobs(): Boolean { - while (parser.nextValue() != JsonToken.END_OBJECT) { - when (parser.currentName) { - "jobs" -> return true - else -> parser.skipChildren() - } - } - - return false - } - - /** - * Parse a single job in the file. - */ - private fun parseJob() { - while (parser.nextValue() != JsonToken.END_OBJECT) { - when (parser.currentName) { - "name" -> id = parser.text - "parents" -> parents = parseIds() - "children" -> children = parseIds() - "runtime" -> runtime = Duration.ofSeconds(parser.numberValue.toLong()) - "cores" -> cores = parser.floatValue.roundToInt() - else -> parser.skipChildren() - } - } - } - - /** - * Parse the parents/children of a job. - */ - private fun parseIds(): Set { - if (!parser.isExpectedStartArrayToken) { - throw JsonParseException(parser, "Expected array", parser.currentLocation) - } - - val ids = mutableSetOf() - - while (parser.nextToken() != JsonToken.END_ARRAY) { - if (parser.currentToken != JsonToken.VALUE_STRING) { - throw JsonParseException(parser, "Expected token", parser.currentLocation) - } - - ids.add(parser.valueAsString) - } - - return ids - } - - private enum class ParserLevel { - TOP, - TRACE, - WORKFLOW, - JOB, - } - - /** - * State fields for the parser. - */ - private var id: String? = null - private var workflowId: String? = null - private var runtime: Duration? = null - private var parents: Set? = null - private var children: Set? = null - private var cores = -1 - - private fun reset() { - id = null - runtime = null - parents = null - children = null - cores = -1 - } - - private val colID = 0 - private val colWorkflowID = 1 - private val colRuntime = 3 - private val colNproc = 4 - private val colParents = 5 - private val colChildren = 6 - - private val typeParents = TableColumnType.Set(TableColumnType.String) - private val typeChildren = TableColumnType.Set(TableColumnType.String) -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wfformat/WfFormatTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wfformat/WfFormatTraceFormat.kt deleted file mode 100644 index 2178fac6..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wfformat/WfFormatTraceFormat.kt +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.wfformat - -import com.fasterxml.jackson.core.JsonFactory -import org.opendc.trace.TableColumn -import org.opendc.trace.TableColumnType -import org.opendc.trace.TableReader -import org.opendc.trace.TableWriter -import org.opendc.trace.conv.TABLE_TASKS -import org.opendc.trace.conv.TASK_CHILDREN -import org.opendc.trace.conv.TASK_ID -import org.opendc.trace.conv.TASK_PARENTS -import org.opendc.trace.conv.TASK_REQ_NCPUS -import org.opendc.trace.conv.TASK_RUNTIME -import org.opendc.trace.conv.TASK_WORKFLOW_ID -import org.opendc.trace.spi.TableDetails -import org.opendc.trace.spi.TraceFormat -import java.nio.file.Path - -/** - * A [TraceFormat] implementation for the WfCommons workload trace format. - */ -public class WfFormatTraceFormat : TraceFormat { - /** - * The [JsonFactory] that is used to created JSON parsers. - */ - private val factory = JsonFactory() - - override val name: String = "wfformat" - - override fun create(path: Path) { - throw UnsupportedOperationException("Writing not supported for this format") - } - - override fun getTables(path: Path): List = listOf(TABLE_TASKS) - - override fun getDetails( - path: Path, - table: String, - ): TableDetails { - return when (table) { - TABLE_TASKS -> - TableDetails( - listOf( - TableColumn(TASK_ID, TableColumnType.String), - TableColumn(TASK_WORKFLOW_ID, TableColumnType.String), - TableColumn(TASK_RUNTIME, TableColumnType.Duration), - TableColumn(TASK_REQ_NCPUS, TableColumnType.Int), - TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)), - TableColumn(TASK_CHILDREN, TableColumnType.Set(TableColumnType.String)), - ), - ) - else -> throw IllegalArgumentException("Table $table not supported") - } - } - - override fun newReader( - path: Path, - table: String, - projection: List?, - ): TableReader { - return when (table) { - TABLE_TASKS -> WfFormatTaskTableReader(factory.createParser(path.toFile())) - else -> throw IllegalArgumentException("Table $table not supported") - } - } - - override fun newWriter( - path: Path, - table: String, - ): TableWriter { - throw UnsupportedOperationException("Writing not supported for this format") - } -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/FragmentTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/FragmentTableReader.kt new file mode 100644 index 00000000..947746c6 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/FragmentTableReader.kt @@ -0,0 +1,154 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.formats.workload + +import org.opendc.trace.TableReader +import org.opendc.trace.conv.FRAGMENT_CPU_USAGE +import org.opendc.trace.conv.FRAGMENT_DURATION +import org.opendc.trace.conv.FRAGMENT_GPU_USAGE +import org.opendc.trace.conv.TASK_ID +import org.opendc.trace.formats.workload.parquet.Fragment +import org.opendc.trace.util.parquet.LocalParquetReader +import java.time.Duration +import java.time.Instant +import java.util.UUID + +/** + * A [TableReader] implementation for the OpenDC virtual machine trace format. + */ +internal class FragmentTableReader(private val reader: LocalParquetReader) : TableReader { + /** + * The current record. + */ + private var record: Fragment? = null + + override fun nextRow(): Boolean { + try { + val record = reader.read() + this.record = record + + return record != null + } catch (e: Throwable) { + this.record = null + throw e + } + } + + private val colID = 0 + private val colDuration = 1 + private val colCpuUsage = 2 + private val colGpuUsage = 3 + + override fun resolve(name: String): Int { + return when (name) { + TASK_ID -> colID + FRAGMENT_DURATION -> colDuration + FRAGMENT_CPU_USAGE -> colCpuUsage + FRAGMENT_GPU_USAGE -> colGpuUsage + else -> -1 + } + } + + override fun isNull(index: Int): Boolean { + require(index in 0..colCpuUsage) { "Invalid column index" } + return false + } + + override fun getBoolean(index: Int): Boolean { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun getInt(index: Int): Int { + val record = checkNotNull(record) { "Reader in invalid state" } + return when (index) { + colID -> record.id + else -> throw IllegalArgumentException("Invalid column or type [index $index]") + } + } + + override fun getLong(index: Int): Long { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun getFloat(index: Int): Float { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun getDouble(index: Int): Double { + val record = checkNotNull(record) { "Reader in invalid state" } + return when (index) { + colCpuUsage -> record.cpuUsage + colGpuUsage -> record.gpuUsage + else -> throw IllegalArgumentException("Invalid column or type [index $index]") + } + } + + override fun getString(index: Int): String { + throw IllegalArgumentException("Invalid column index $index") + } + + override fun getUUID(index: Int): UUID? { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun getInstant(index: Int): Instant { + throw IllegalArgumentException("Invalid column index $index") + } + + override fun getDuration(index: Int): Duration { + val record = checkNotNull(record) { "Reader in invalid state" } + + return when (index) { + colDuration -> record.duration + else -> throw IllegalArgumentException("Invalid column index $index") + } + } + + override fun getList( + index: Int, + elementType: Class, + ): List? { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun getSet( + index: Int, + elementType: Class, + ): Set? { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun getMap( + index: Int, + keyType: Class, + valueType: Class, + ): Map? { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun close() { + reader.close() + } + + override fun toString(): String = "OdcVmResourceStateTableReader" +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/FragmentTableWriter.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/FragmentTableWriter.kt new file mode 100644 index 00000000..33cd9e17 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/FragmentTableWriter.kt @@ -0,0 +1,193 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.formats.workload + +import org.apache.parquet.hadoop.ParquetWriter +import org.opendc.trace.TableWriter +import org.opendc.trace.conv.FRAGMENT_CPU_USAGE +import org.opendc.trace.conv.FRAGMENT_DURATION +import org.opendc.trace.conv.FRAGMENT_GPU_USAGE +import org.opendc.trace.conv.TASK_ID +import org.opendc.trace.formats.workload.parquet.Fragment +import java.time.Duration +import java.time.Instant +import java.util.UUID + +/** + * A [TableWriter] implementation for the OpenDC virtual machine trace format. + */ +internal class FragmentTableWriter(private val writer: ParquetWriter) : TableWriter { + /** + * The current state for the record that is being written. + */ + private var localIsActive = false + private var localID: Int = -99 + private var localDuration: Duration = Duration.ZERO + private var localCpuUsage: Double = Double.NaN + private var localGpuUsage: Double = Double.NaN + + override fun startRow() { + localIsActive = true + localID = -99 + localDuration = Duration.ZERO + localCpuUsage = Double.NaN + localGpuUsage = Double.NaN + } + + override fun endRow() { + check(localIsActive) { "No active row" } + localIsActive = false + + check(lastId != localID) { "Records need to be ordered by (id, timestamp)" } + + writer.write(Fragment(localID, localDuration, localCpuUsage, localGpuUsage)) + + lastId = localID + } + + override fun resolve(name: String): Int { + return when (name) { + TASK_ID -> colID + FRAGMENT_DURATION -> colDuration + FRAGMENT_CPU_USAGE -> colCpuUsage + FRAGMENT_GPU_USAGE -> colGpuUsage + else -> -1 + } + } + + override fun setBoolean( + index: Int, + value: Boolean, + ) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun setInt( + index: Int, + value: Int, + ) { + check(localIsActive) { "No active row" } + when (index) { + colID -> localID = value + else -> throw IllegalArgumentException("Invalid column or type [index $index]") + } + } + + override fun setLong( + index: Int, + value: Long, + ) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun setFloat( + index: Int, + value: Float, + ) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun setDouble( + index: Int, + value: Double, + ) { + check(localIsActive) { "No active row" } + when (index) { + colCpuUsage -> localCpuUsage = value + colGpuUsage -> localGpuUsage = value + else -> throw IllegalArgumentException("Invalid column or type [index $index]") + } + } + + override fun setString( + index: Int, + value: String, + ) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun setUUID( + index: Int, + value: UUID, + ) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun setInstant( + index: Int, + value: Instant, + ) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun setDuration( + index: Int, + value: Duration, + ) { + check(localIsActive) { "No active row" } + + when (index) { + colDuration -> localDuration = value + else -> throw IllegalArgumentException("Invalid column or type [index $index]") + } + } + + override fun setList( + index: Int, + value: List, + ) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun setSet( + index: Int, + value: Set, + ) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun setMap( + index: Int, + value: Map, + ) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun flush() { + // Not available + } + + override fun close() { + writer.close() + } + + /** + * Last column values that are used to check for correct partitioning. + */ + private var lastId: Int? = null + + private val colID = 0 + private val colDuration = 1 + private val colCpuUsage = 2 + private val colGpuUsage = 3 +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableReader.kt new file mode 100644 index 00000000..6c700b0c --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableReader.kt @@ -0,0 +1,213 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.formats.workload + +import org.opendc.trace.TableColumnType +import org.opendc.trace.TableReader +import org.opendc.trace.conv.TASK_CHILDREN +import org.opendc.trace.conv.TASK_CPU_CAPACITY +import org.opendc.trace.conv.TASK_CPU_COUNT +import org.opendc.trace.conv.TASK_DEADLINE +import org.opendc.trace.conv.TASK_DURATION +import org.opendc.trace.conv.TASK_GPU_CAPACITY +import org.opendc.trace.conv.TASK_GPU_COUNT +import org.opendc.trace.conv.TASK_ID +import org.opendc.trace.conv.TASK_MEM_CAPACITY +import org.opendc.trace.conv.TASK_NAME +import org.opendc.trace.conv.TASK_NATURE +import org.opendc.trace.conv.TASK_PARENTS +import org.opendc.trace.conv.TASK_SUBMISSION_TIME +import org.opendc.trace.formats.workload.parquet.Task +import org.opendc.trace.util.convertTo +import org.opendc.trace.util.parquet.LocalParquetReader +import java.time.Duration +import java.time.Instant +import java.util.UUID + +/** + * A [TableReader] implementation for the "resources table" in the OpenDC virtual machine trace format. + */ +internal class TaskTableReader(private val reader: LocalParquetReader) : TableReader { + /** + * The current record. + */ + private var record: Task? = null + + override fun nextRow(): Boolean { + try { + val record = reader.read() + this.record = record + + return record != null + } catch (e: Throwable) { + this.record = null + throw e + } + } + + private val colID = 0 + private val colName = 1 + private val colSubmissionTime = 2 + private val colDurationTime = 3 + private val colCpuCount = 4 + private val colCpuCapacity = 5 + private val colMemCapacity = 6 + private val colGpuCapacity = 7 + private val colGpuCount = 8 + private val colParents = 9 + private val colChildren = 10 + private val colNature = 11 + private val colDeadline = 12 + + private val typeParents = TableColumnType.Set(TableColumnType.Int) + private val typeChildren = TableColumnType.Set(TableColumnType.Int) + + override fun resolve(name: String): Int { + return when (name) { + TASK_ID -> colID + TASK_NAME -> colName + TASK_SUBMISSION_TIME -> colSubmissionTime + TASK_DURATION -> colDurationTime + TASK_CPU_COUNT -> colCpuCount + TASK_CPU_CAPACITY -> colCpuCapacity + TASK_MEM_CAPACITY -> colMemCapacity + TASK_GPU_COUNT -> colGpuCount + TASK_GPU_CAPACITY -> colGpuCapacity + TASK_PARENTS -> colParents + TASK_CHILDREN -> colChildren + TASK_NATURE -> colNature + TASK_DEADLINE -> colDeadline + else -> -1 + } + } + + override fun isNull(index: Int): Boolean { + require(index in 0..colDeadline) { "Invalid column index" } + val record = checkNotNull(record) { "Reader in invalid state" } + + return when (index) { + colNature -> record.nature == null + colDeadline -> record.deadline == -1L + else -> false + } + } + + override fun getBoolean(index: Int): Boolean { + throw IllegalArgumentException("Invalid column") + } + + override fun getInt(index: Int): Int { + val record = checkNotNull(record) { "Reader in invalid state" } + + return when (index) { + colID -> record.id + colCpuCount -> record.cpuCount + colGpuCount -> record.gpuCount + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getLong(index: Int): Long { + val record = checkNotNull(record) { "Reader in invalid state" } + return when (index) { + colDurationTime -> record.durationTime + colDeadline -> record.deadline + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getFloat(index: Int): Float { + throw IllegalArgumentException("Invalid column") + } + + override fun getDouble(index: Int): Double { + val record = checkNotNull(record) { "Reader in invalid state" } + + return when (index) { + colCpuCapacity -> record.cpuCapacity + colMemCapacity -> record.memCapacity + colGpuCapacity -> record.gpuCapacity + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getString(index: Int): String? { + val record = checkNotNull(record) { "Reader in invalid state" } + + return when (index) { + colName -> record.name + colNature -> record.nature + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getUUID(index: Int): UUID? { + throw IllegalArgumentException("Invalid column") + } + + override fun getInstant(index: Int): Instant { + val record = checkNotNull(record) { "Reader in invalid state" } + + return when (index) { + colSubmissionTime -> record.submissionTime + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getDuration(index: Int): Duration? { + throw IllegalArgumentException("Invalid column") + } + + override fun getList( + index: Int, + elementType: Class, + ): List? { + throw IllegalArgumentException("Invalid column") + } + + override fun getSet( + index: Int, + elementType: Class, + ): Set? { + val record = checkNotNull(record) { "Reader in invalid state" } + return when (index) { + colParents -> typeParents.convertTo(record.parents, elementType) + colChildren -> typeChildren.convertTo(record.children, elementType) + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getMap( + index: Int, + keyType: Class, + valueType: Class, + ): Map? { + throw IllegalArgumentException("Invalid column") + } + + override fun close() { + reader.close() + } + + override fun toString(): String = "OdcVmResourceTableReader" +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableWriter.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableWriter.kt new file mode 100644 index 00000000..39be36c1 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableWriter.kt @@ -0,0 +1,256 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.formats.workload + +import org.apache.parquet.hadoop.ParquetWriter +import org.opendc.trace.TableWriter +import org.opendc.trace.conv.TASK_CHILDREN +import org.opendc.trace.conv.TASK_CPU_CAPACITY +import org.opendc.trace.conv.TASK_CPU_COUNT +import org.opendc.trace.conv.TASK_DEADLINE +import org.opendc.trace.conv.TASK_DURATION +import org.opendc.trace.conv.TASK_GPU_CAPACITY +import org.opendc.trace.conv.TASK_GPU_COUNT +import org.opendc.trace.conv.TASK_ID +import org.opendc.trace.conv.TASK_MEM_CAPACITY +import org.opendc.trace.conv.TASK_NAME +import org.opendc.trace.conv.TASK_NATURE +import org.opendc.trace.conv.TASK_PARENTS +import org.opendc.trace.conv.TASK_SUBMISSION_TIME +import org.opendc.trace.formats.workload.parquet.Task +import java.time.Duration +import java.time.Instant +import java.util.UUID + +/** + * A [TableWriter] implementation for the OpenDC virtual machine trace format. + */ +internal class TaskTableWriter(private val writer: ParquetWriter) : TableWriter { + /** + * The current state for the record that is being written. + */ + private var localIsActive = false + private var localId: Int = -99 + private var localName: String = "" + private var localSubmissionTime: Instant = Instant.MIN + private var localDuration: Long = 0L + private var localCpuCount: Int = 0 + private var localCpuCapacity: Double = Double.NaN + private var localMemCapacity: Double = Double.NaN + private var localGpuCount: Int = 0 + private var localGpuCapacity: Double = Double.NaN + private var localParents = mutableSetOf() + private var localChildren = mutableSetOf() + private var localNature: String? = null + private var localDeadline: Long = -1 + + override fun startRow() { + localIsActive = true + localId = -99 + localName = "" + localSubmissionTime = Instant.MIN + localDuration = 0L + localCpuCount = 0 + localCpuCapacity = Double.NaN + localMemCapacity = Double.NaN + localGpuCount = 0 + localGpuCapacity = Double.NaN + localParents.clear() + localChildren.clear() + localNature = null + localDeadline = -1L + } + + override fun endRow() { + check(localIsActive) { "No active row" } + localIsActive = false + writer.write( + Task( + localId, + localName, + localSubmissionTime, + localDuration, + localCpuCount, + localCpuCapacity, + localMemCapacity, + localGpuCount, + localGpuCapacity, + localParents, + localChildren, + localNature, + localDeadline, + ), + ) + } + + override fun resolve(name: String): Int { + return when (name) { + TASK_ID -> colID + TASK_NAME -> colID + TASK_SUBMISSION_TIME -> colSubmissionTime + TASK_DURATION -> colDuration + TASK_CPU_COUNT -> colCpuCount + TASK_CPU_CAPACITY -> colCpuCapacity + TASK_MEM_CAPACITY -> colMemCapacity + TASK_GPU_COUNT -> colGpuCount + TASK_GPU_CAPACITY -> colGpuCapacity + TASK_PARENTS -> colParents + TASK_CHILDREN -> colChildren + TASK_NATURE -> colNature + TASK_DEADLINE -> colDeadline + else -> -1 + } + } + + override fun setBoolean( + index: Int, + value: Boolean, + ) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun setInt( + index: Int, + value: Int, + ) { + check(localIsActive) { "No active row" } + when (index) { + colID -> localId = value + colCpuCount -> localCpuCount = value + colGpuCount -> localGpuCount = value + else -> throw IllegalArgumentException("Invalid column or type [index $index]") + } + } + + override fun setLong( + index: Int, + value: Long, + ) { + check(localIsActive) { "No active row" } + when (index) { + colDuration -> localDuration = value + colDeadline -> localDeadline = value + else -> throw IllegalArgumentException("Invalid column index $index") + } + } + + override fun setFloat( + index: Int, + value: Float, + ) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun setDouble( + index: Int, + value: Double, + ) { + check(localIsActive) { "No active row" } + when (index) { + colCpuCapacity -> localCpuCapacity = value + colMemCapacity -> localMemCapacity = value + colGpuCapacity -> localGpuCapacity = value + else -> throw IllegalArgumentException("Invalid column or type [index $index]") + } + } + + override fun setString( + index: Int, + value: String, + ) { + check(localIsActive) { "No active row" } + when (index) { + colName -> localName = value + colNature -> localNature = value + else -> throw IllegalArgumentException("Invalid column index $index") + } + } + + override fun setUUID( + index: Int, + value: UUID, + ) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun setInstant( + index: Int, + value: Instant, + ) { + check(localIsActive) { "No active row" } + when (index) { + colSubmissionTime -> localSubmissionTime = value + else -> throw IllegalArgumentException("Invalid column index $index") + } + } + + override fun setDuration( + index: Int, + value: Duration, + ) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun setList( + index: Int, + value: List, + ) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun setSet( + index: Int, + value: Set, + ) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun setMap( + index: Int, + value: Map, + ) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun flush() { + // Not available + } + + override fun close() { + writer.close() + } + + private val colID = 0 + private val colName = 1 + private val colSubmissionTime = 2 + private val colDuration = 3 + private val colCpuCount = 4 + private val colCpuCapacity = 5 + private val colMemCapacity = 6 + private val colGpuCount = 7 + private val colGpuCapacity = 8 + private val colParents = 9 + private val colChildren = 10 + private val colNature = 11 + private val colDeadline = 12 +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/WorkloadTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/WorkloadTraceFormat.kt new file mode 100644 index 00000000..7af0650e --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/WorkloadTraceFormat.kt @@ -0,0 +1,165 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.formats.workload + +import org.apache.parquet.column.ParquetProperties +import org.apache.parquet.hadoop.ParquetFileWriter +import org.apache.parquet.hadoop.metadata.CompressionCodecName +import org.opendc.trace.TableColumn +import org.opendc.trace.TableColumnType +import org.opendc.trace.TableReader +import org.opendc.trace.TableWriter +import org.opendc.trace.conv.FRAGMENT_CPU_USAGE +import org.opendc.trace.conv.FRAGMENT_DURATION +import org.opendc.trace.conv.TABLE_FRAGMENTS +import org.opendc.trace.conv.TABLE_TASKS +import org.opendc.trace.conv.TASK_CHILDREN +import org.opendc.trace.conv.TASK_CPU_CAPACITY +import org.opendc.trace.conv.TASK_CPU_COUNT +import org.opendc.trace.conv.TASK_DEADLINE +import org.opendc.trace.conv.TASK_DURATION +import org.opendc.trace.conv.TASK_GPU_CAPACITY +import org.opendc.trace.conv.TASK_GPU_COUNT +import org.opendc.trace.conv.TASK_ID +import org.opendc.trace.conv.TASK_MEM_CAPACITY +import org.opendc.trace.conv.TASK_NATURE +import org.opendc.trace.conv.TASK_PARENTS +import org.opendc.trace.conv.TASK_SUBMISSION_TIME +import org.opendc.trace.formats.workload.parquet.FragmentReadSupport +import org.opendc.trace.formats.workload.parquet.FragmentWriteSupport +import org.opendc.trace.formats.workload.parquet.TaskReadSupport +import org.opendc.trace.formats.workload.parquet.TaskWriteSupport +import org.opendc.trace.spi.TableDetails +import org.opendc.trace.spi.TraceFormat +import org.opendc.trace.util.parquet.LocalParquetReader +import org.opendc.trace.util.parquet.LocalParquetWriter +import java.nio.file.Files +import java.nio.file.Path + +/** + * A [TraceFormat] implementation of the OpenDC virtual machine trace format. + */ +public class WorkloadTraceFormat : TraceFormat { + /** + * The name of this trace format. + */ + override val name: String = "workload" + + override fun create(path: Path) { + // Construct directory containing the trace files + Files.createDirectories(path) + + val tables = getTables(path) + + for (table in tables) { + val writer = newWriter(path, table) + writer.close() + } + } + + override fun getTables(path: Path): List = listOf(TABLE_TASKS, TABLE_FRAGMENTS) + + override fun getDetails( + path: Path, + table: String, + ): TableDetails { + return when (table) { + TABLE_TASKS -> + TableDetails( + listOf( + TableColumn(TASK_ID, TableColumnType.String), + TableColumn(TASK_SUBMISSION_TIME, TableColumnType.Instant), + TableColumn(TASK_DURATION, TableColumnType.Long), + TableColumn(TASK_CPU_COUNT, TableColumnType.Int), + TableColumn(TASK_CPU_CAPACITY, TableColumnType.Double), + TableColumn(TASK_MEM_CAPACITY, TableColumnType.Double), + TableColumn(TASK_GPU_COUNT, TableColumnType.Int), + TableColumn(TASK_GPU_CAPACITY, TableColumnType.Double), + TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)), + TableColumn(TASK_CHILDREN, TableColumnType.Set(TableColumnType.String)), + TableColumn(TASK_NATURE, TableColumnType.String), + TableColumn(TASK_DEADLINE, TableColumnType.Long), + ), + ) + TABLE_FRAGMENTS -> + TableDetails( + listOf( + TableColumn(TASK_ID, TableColumnType.String), + TableColumn(FRAGMENT_DURATION, TableColumnType.Duration), + TableColumn(TASK_CPU_COUNT, TableColumnType.Int), + TableColumn(FRAGMENT_CPU_USAGE, TableColumnType.Double), + ), + ) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newReader( + path: Path, + table: String, + projection: List?, + ): TableReader { + return when (table) { + TABLE_TASKS -> { + val reader = LocalParquetReader(path.resolve("tasks.parquet"), TaskReadSupport(projection)) + TaskTableReader(reader) + } + TABLE_FRAGMENTS -> { + val reader = LocalParquetReader(path.resolve("fragments.parquet"), FragmentReadSupport(projection)) + FragmentTableReader(reader) + } + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newWriter( + path: Path, + table: String, + ): TableWriter { + return when (table) { + TABLE_TASKS -> { + val writer = + LocalParquetWriter.builder(path.resolve("tasks.parquet"), TaskWriteSupport()) + .withCompressionCodec(CompressionCodecName.ZSTD) + .withPageWriteChecksumEnabled(true) + .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .build() + TaskTableWriter(writer) + } + TABLE_FRAGMENTS -> { + val writer = + LocalParquetWriter.builder(path.resolve("fragments.parquet"), FragmentWriteSupport()) + .withCompressionCodec(CompressionCodecName.ZSTD) + .withDictionaryEncoding("id", true) + .withBloomFilterEnabled("id", true) + .withPageWriteChecksumEnabled(true) + .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .build() + FragmentTableWriter(writer) + } + else -> throw IllegalArgumentException("Table $table not supported") + } + } +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/Fragment.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/Fragment.kt new file mode 100644 index 00000000..44385088 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/Fragment.kt @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.formats.workload.parquet + +import java.time.Duration + +internal class Fragment( + val id: Int, + val duration: Duration, + val cpuUsage: Double, + val gpuUsage: Double, +) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentReadSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentReadSupport.kt new file mode 100644 index 00000000..3fa914bc --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentReadSupport.kt @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.formats.workload.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.api.InitContext +import org.apache.parquet.hadoop.api.ReadSupport +import org.apache.parquet.io.api.RecordMaterializer +import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.Types +import org.opendc.trace.TableColumn +import org.opendc.trace.conv.FRAGMENT_CPU_USAGE +import org.opendc.trace.conv.FRAGMENT_DURATION +import org.opendc.trace.conv.TASK_ID + +/** + * A [ReadSupport] instance for [Fragment] objects. + */ +internal class FragmentReadSupport(private val projection: List?) : ReadSupport() { + /** + * Mapping from field names to [TableColumn]s. + */ + private val fieldMap = + mapOf( + "id" to TASK_ID, + "duration" to FRAGMENT_DURATION, + "cpuUsage" to FRAGMENT_CPU_USAGE, + "cpu_usage" to FRAGMENT_CPU_USAGE, + ) + + override fun init(context: InitContext): ReadContext { + val projectedSchema = + if (projection != null) { + Types.buildMessage() + .apply { + val projectionSet = projection.toSet() + + for (field in FRAGMENT_SCHEMA.fields) { + val col = fieldMap[field.name] ?: continue + if (col in projectionSet) { + addField(field) + } + } + } + .named(FRAGMENT_SCHEMA.name) + } else { + FRAGMENT_SCHEMA + } + + return ReadContext(projectedSchema) + } + + override fun prepareForRead( + configuration: Configuration, + keyValueMetaData: Map, + fileSchema: MessageType, + readContext: ReadContext, + ): RecordMaterializer = FragmentRecordMaterializer(readContext.requestedSchema) +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentRecordMaterializer.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentRecordMaterializer.kt new file mode 100644 index 00000000..7902cab1 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentRecordMaterializer.kt @@ -0,0 +1,128 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.formats.workload.parquet + +import org.apache.parquet.io.api.Converter +import org.apache.parquet.io.api.GroupConverter +import org.apache.parquet.io.api.PrimitiveConverter +import org.apache.parquet.io.api.RecordMaterializer +import org.apache.parquet.schema.MessageType +import java.time.Duration +import java.time.Instant + +/** + * A [RecordMaterializer] for [Fragment] records. + */ +internal class FragmentRecordMaterializer(schema: MessageType) : RecordMaterializer() { + /** + * State of current record being read. + */ + private var localId = -99 + private var localTimestamp = Instant.MIN + private var localDuration = Duration.ZERO + private var localCpuCount = 0 + private var localCpuUsage = 0.0 + private var localGpuCount = 0 + private var localGpuUsage = 0.0 + + /** + * Root converter for the record. + */ + private val root = + object : GroupConverter() { + /** + * The converters for the columns of the schema. + */ + private val converters = + schema.fields.map { type -> + when (type.name) { + "id" -> + object : PrimitiveConverter() { + override fun addInt(value: Int) { + localId = value + } + } + "timestamp", "time" -> + object : PrimitiveConverter() { + override fun addLong(value: Long) { + localTimestamp = Instant.ofEpochMilli(value) + } + } + "duration" -> + object : PrimitiveConverter() { + override fun addLong(value: Long) { + localDuration = Duration.ofMillis(value) + } + } + "cpu_count", "cores" -> + object : PrimitiveConverter() { + override fun addInt(value: Int) { + localCpuCount = value + } + } + "cpu_usage", "cpuUsage" -> + object : PrimitiveConverter() { + override fun addDouble(value: Double) { + localCpuUsage = value + } + } + "gpu_count", "gpuCount", "gpu_cores", "gpuCores" -> + object : PrimitiveConverter() { + override fun addInt(value: Int) { + localGpuCount = value + } + } + "gpu_usage", "gpuUsage" -> + object : PrimitiveConverter() { + override fun addDouble(value: Double) { + localGpuUsage = value + } + } + else -> error("Unknown column $type") + } + } + + override fun start() { + localId = -99 + localDuration = Duration.ZERO + localCpuCount = 0 + localCpuUsage = 0.0 + localGpuCount = 0 + localGpuUsage = 0.0 + } + + override fun end() {} + + override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex] + } + + override fun getCurrentRecord(): Fragment = + Fragment( + localId, + localDuration, + localCpuUsage, + localGpuUsage, + ) + + override fun getRootConverter(): GroupConverter = root +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentSchemas.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentSchemas.kt new file mode 100644 index 00000000..cd499e7e --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentSchemas.kt @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2025 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.formats.workload.parquet + +import org.apache.parquet.schema.LogicalTypeAnnotation +import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.PrimitiveType +import org.apache.parquet.schema.Types + +private val FRAGMENT_SCHEMA_v1: MessageType = + Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("timestamp"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("duration"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("cpu_count"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_usage"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT32) + .named("gpu_count"), + Types + .optional(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("gpu_usage"), + ) + .named("resource_state") + +private val FRAGMENT_SCHEMA_v2: MessageType = + Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("duration"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_usage"), + Types + .optional(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("gpu_usage"), + ) + .named("resource_state") + +/** + * Parquet read schema for the "resource states" table in the trace. + */ +public val FRAGMENT_SCHEMA: MessageType = FRAGMENT_SCHEMA_v2 diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentWriteSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentWriteSupport.kt new file mode 100644 index 00000000..e6b7ba4f --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentWriteSupport.kt @@ -0,0 +1,100 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.formats.workload.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.io.api.RecordConsumer +import org.apache.parquet.schema.LogicalTypeAnnotation +import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.PrimitiveType +import org.apache.parquet.schema.Types + +/** + * Support for writing [Task] instances to Parquet format. + */ +internal class FragmentWriteSupport : WriteSupport() { + /** + * The current active record consumer. + */ + private lateinit var recordConsumer: RecordConsumer + + override fun init(configuration: Configuration): WriteContext { + return WriteContext(WRITE_SCHEMA, emptyMap()) + } + + override fun prepareForWrite(recordConsumer: RecordConsumer) { + this.recordConsumer = recordConsumer + } + + override fun write(record: Fragment) { + write(recordConsumer, record) + } + + private fun write( + consumer: RecordConsumer, + record: Fragment, + ) { + consumer.startMessage() + + consumer.startField("id", 0) + consumer.addInteger(record.id) + consumer.endField("id", 0) + + consumer.startField("duration", 2) + consumer.addLong(record.duration.toMillis()) + consumer.endField("duration", 2) + + consumer.startField("cpu_usage", 4) + consumer.addDouble(record.cpuUsage) + consumer.endField("cpu_usage", 4) + + consumer.endMessage() + } + + companion object { + /** + * Parquet schema for the "resource states" table in the trace. + */ + @JvmStatic + val WRITE_SCHEMA: MessageType = + Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("timestamp"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("duration"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_usage"), + ) + .named("resource_state") + } +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/Task.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/Task.kt new file mode 100644 index 00000000..f661d5a9 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/Task.kt @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.formats.workload.parquet + +import java.time.Instant + +/** + * A description of a resource in a trace. + */ +internal data class Task( + val id: Int, + val name: String, + val submissionTime: Instant, + val durationTime: Long, + val cpuCount: Int, + val cpuCapacity: Double, + val memCapacity: Double, + val gpuCount: Int = 0, + val gpuCapacity: Double = 0.0, + val parents: Set = emptySet(), + val children: Set = emptySet(), + val nature: String? = null, + val deadline: Long = -1, +) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskReadSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskReadSupport.kt new file mode 100644 index 00000000..4bbb18ac --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskReadSupport.kt @@ -0,0 +1,101 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.formats.workload.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.api.InitContext +import org.apache.parquet.hadoop.api.ReadSupport +import org.apache.parquet.io.api.RecordMaterializer +import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.Types +import org.opendc.trace.TableColumn +import org.opendc.trace.conv.TASK_CHILDREN +import org.opendc.trace.conv.TASK_CPU_CAPACITY +import org.opendc.trace.conv.TASK_CPU_COUNT +import org.opendc.trace.conv.TASK_DEADLINE +import org.opendc.trace.conv.TASK_DURATION +import org.opendc.trace.conv.TASK_GPU_CAPACITY +import org.opendc.trace.conv.TASK_GPU_COUNT +import org.opendc.trace.conv.TASK_ID +import org.opendc.trace.conv.TASK_MEM_CAPACITY +import org.opendc.trace.conv.TASK_NAME +import org.opendc.trace.conv.TASK_NATURE +import org.opendc.trace.conv.TASK_PARENTS +import org.opendc.trace.conv.TASK_SUBMISSION_TIME + +/** + * A [ReadSupport] instance for [Task] objects. + */ +internal class TaskReadSupport(private val projection: List?) : ReadSupport() { + /** + * Mapping from field names to [TableColumn]s. + */ + private val fieldMap = + mapOf( + "id" to TASK_ID, + "name" to TASK_NAME, + "submissionTime" to TASK_SUBMISSION_TIME, + "submission_time" to TASK_SUBMISSION_TIME, + "duration" to TASK_DURATION, + "maxCores" to TASK_CPU_COUNT, + "cpu_count" to TASK_CPU_COUNT, + "cpu_capacity" to TASK_CPU_CAPACITY, + "requiredMemory" to TASK_MEM_CAPACITY, + "mem_capacity" to TASK_MEM_CAPACITY, + "gpu_count" to TASK_GPU_COUNT, + "gpu_capacity" to TASK_GPU_CAPACITY, + "parents" to TASK_PARENTS, + "children" to TASK_CHILDREN, + "nature" to TASK_NATURE, + "deadline" to TASK_DEADLINE, + ) + + override fun init(context: InitContext): ReadContext { + val projectedSchema = + if (projection != null) { + Types.buildMessage() + .apply { + val projectionSet = projection.toSet() + + for (field in TASK_SCHEMA.fields) { + val col = fieldMap[field.name] ?: continue + if (col in projectionSet) { + addField(field) + } + } + } + .named(TASK_SCHEMA.name) + } else { + TASK_SCHEMA + } + + return ReadContext(projectedSchema) + } + + override fun prepareForRead( + configuration: Configuration, + keyValueMetaData: Map, + fileSchema: MessageType, + readContext: ReadContext, + ): RecordMaterializer = TaskRecordMaterializer(readContext.requestedSchema) +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskRecordMaterializer.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskRecordMaterializer.kt new file mode 100644 index 00000000..12dc54b7 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskRecordMaterializer.kt @@ -0,0 +1,215 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.formats.workload.parquet + +import org.apache.parquet.io.api.Binary +import org.apache.parquet.io.api.Converter +import org.apache.parquet.io.api.GroupConverter +import org.apache.parquet.io.api.PrimitiveConverter +import org.apache.parquet.io.api.RecordMaterializer +import org.apache.parquet.schema.MessageType +import java.time.Instant + +/** + * A [RecordMaterializer] for [Task] records. + */ +internal class TaskRecordMaterializer(schema: MessageType) : RecordMaterializer() { + /** + * State of current record being read. + */ + private var localId = -99 + private var localName = "" + private var localSubmissionTime = Instant.MIN + private var localDuration = 0L + private var localCpuCount = 0 + private var localCpuCapacity = 0.0 + private var localMemCapacity = 0.0 + private var localGpuCount = 0 + private var localGpuCapacity = 0.0 + private var localParents = mutableSetOf() + private var localChildren = mutableSetOf() + private var localNature: String? = null + private var localDeadline = -1L + + /** + * Root converter for the record. + */ + private val root = + object : GroupConverter() { + /** + * The converters for the columns of the schema. + */ + private val converters = + schema.fields.map { type -> + when (type.name) { + "id" -> + object : PrimitiveConverter() { + override fun addInt(value: Int) { + localId = value + } + } + "name" -> + object : PrimitiveConverter() { + override fun addBinary(value: Binary) { + localName = value.toStringUsingUTF8() + } + } + "submission_time", "submissionTime" -> + object : PrimitiveConverter() { + override fun addLong(value: Long) { + localSubmissionTime = Instant.ofEpochMilli(value) + } + } + "duration" -> + object : PrimitiveConverter() { + override fun addLong(value: Long) { + localDuration = value + } + } + "cpu_count", "maxCores" -> + object : PrimitiveConverter() { + override fun addInt(value: Int) { + localCpuCount = value + } + } + "cpu_capacity" -> + object : PrimitiveConverter() { + override fun addDouble(value: Double) { + localCpuCapacity = value + } + } + "mem_capacity", "requiredMemory" -> + object : PrimitiveConverter() { + override fun addDouble(value: Double) { + localMemCapacity = value + } + + override fun addLong(value: Long) { + localMemCapacity = value.toDouble() + } + } + "gpu_count", "gpuMaxCores" -> + object : PrimitiveConverter() { + override fun addInt(value: Int) { + localGpuCount = value + } + } + "gpu_capacity" -> + object : PrimitiveConverter() { + override fun addDouble(value: Double) { + localGpuCapacity = value + } + } + "parents" -> RelationConverter(localParents) + "children" -> RelationConverter(localChildren) + "nature" -> + object : PrimitiveConverter() { + override fun addBinary(value: Binary) { + localNature = value.toStringUsingUTF8() + } + } + "deadline" -> + object : PrimitiveConverter() { + override fun addLong(value: Long) { + localDeadline = value + } + } + else -> error("Unknown column $type") + } + } + + override fun start() { + localId = -99 + localName = "" + localSubmissionTime = Instant.MIN + localDuration = 0L + localCpuCount = 0 + localCpuCapacity = 0.0 + localMemCapacity = 0.0 + localGpuCount = 0 + localGpuCapacity = 0.0 + localParents.clear() + localChildren.clear() + localNature = null + localDeadline = -1 + } + + override fun end() {} + + override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex] + } + + override fun getCurrentRecord(): Task = + Task( + localId, + localName, + localSubmissionTime, + localDuration, + localCpuCount, + localCpuCapacity, + localMemCapacity, + localGpuCount, + localGpuCapacity, + localParents.toSet(), + localChildren.toSet(), + localNature, + localDeadline, + ) + + override fun getRootConverter(): GroupConverter = root + + /** + * Helper class to convert parent and child relations and add them to [relations]. + */ + private class RelationConverter(private val relations: MutableSet) : GroupConverter() { + private val entryConverter = + object : PrimitiveConverter() { + override fun addInt(value: Int) { + relations.add(value) + } + } + + private val listGroupConverter = + object : GroupConverter() { + override fun getConverter(fieldIndex: Int): Converter { + // fieldIndex = 0 corresponds to "element" + require(fieldIndex == 0) + return entryConverter + } + + override fun start() {} + + override fun end() {} + } + + override fun getConverter(fieldIndex: Int): Converter { + // fieldIndex = 0 corresponds to "list" + require(fieldIndex == 0) + return listGroupConverter + } + + override fun start() {} + + override fun end() {} + } +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskSchemas.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskSchemas.kt new file mode 100644 index 00000000..f7f5e953 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskSchemas.kt @@ -0,0 +1,166 @@ +/* + * Copyright (c) 2025 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.formats.workload.parquet + +import org.apache.parquet.schema.LogicalTypeAnnotation +import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.PrimitiveType +import org.apache.parquet.schema.Type +import org.apache.parquet.schema.Types + +private val TASK_SCHEMA_V1: MessageType = + Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("submission_time"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("duration"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("cpu_count"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_capacity"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("mem_capacity"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT32) + .named("gpu_count"), + Types + .optional(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("gpu_capacity"), + Types + .buildGroup(Type.Repetition.OPTIONAL) + .addField( + Types.repeatedGroup() + .addField( + Types.optional(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("element"), + ) + .named("list"), + ) + .`as`(LogicalTypeAnnotation.listType()) + .named("parents"), + Types + .buildGroup(Type.Repetition.OPTIONAL) + .addField( + Types.repeatedGroup() + .addField( + Types.optional(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("element"), + ) + .named("list"), + ) + .`as`(LogicalTypeAnnotation.listType()) + .named("children"), + Types + .optional(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("nature"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .named("deadline"), + ) + .named("resource") + +private val TASK_SCHEMA_V2: MessageType = + Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("id"), + Types + .optional(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("name"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("submission_time"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("duration"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("cpu_count"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_capacity"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("mem_capacity"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT32) + .named("gpu_count"), + Types + .optional(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("gpu_capacity"), + Types + .buildGroup(Type.Repetition.OPTIONAL) + .addField( + Types.repeatedGroup() + .addField( + Types.optional( + PrimitiveType.PrimitiveTypeName.INT32, + ) + .named("element"), + ) + .named("list"), + ) + .`as`(LogicalTypeAnnotation.listType()) + .named("parents"), + Types + .buildGroup(Type.Repetition.OPTIONAL) + .addField( + Types.repeatedGroup() + .addField( + Types.optional( + PrimitiveType.PrimitiveTypeName.INT32, + ) + .named("element"), + ) + .named("list"), + ) + .`as`(LogicalTypeAnnotation.listType()) + .named("children"), + Types + .optional(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("nature"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .named("deadline"), + ) + .named("resource") + +public val TASK_SCHEMA: MessageType = TASK_SCHEMA_V2 diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskWriteSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskWriteSupport.kt new file mode 100644 index 00000000..a7ce62b8 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskWriteSupport.kt @@ -0,0 +1,96 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.formats.workload.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.io.api.Binary +import org.apache.parquet.io.api.RecordConsumer +import kotlin.math.roundToLong + +/** + * Support for writing [Task] instances to Parquet format. + */ +internal class TaskWriteSupport : WriteSupport() { + /** + * The current active record consumer. + */ + private lateinit var recordConsumer: RecordConsumer + + override fun init(configuration: Configuration): WriteContext { + return WriteContext(TASK_SCHEMA, emptyMap()) + } + + override fun prepareForWrite(recordConsumer: RecordConsumer) { + this.recordConsumer = recordConsumer + } + + override fun write(record: Task) { + write(recordConsumer, record) + } + + private fun write( + consumer: RecordConsumer, + record: Task, + ) { + consumer.startMessage() + + consumer.startField("id", 0) + consumer.addInteger(record.id) + consumer.endField("id", 0) + + consumer.startField("submission_time", 1) + consumer.addLong(record.submissionTime.toEpochMilli()) + consumer.endField("submission_time", 1) + + consumer.startField("duration", 2) + consumer.addLong(record.durationTime) + consumer.endField("duration", 2) + + consumer.startField("cpu_count", 3) + consumer.addInteger(record.cpuCount) + consumer.endField("cpu_count", 3) + + consumer.startField("cpu_capacity", 4) + consumer.addDouble(record.cpuCapacity) + consumer.endField("cpu_capacity", 4) + + consumer.startField("mem_capacity", 5) + consumer.addLong(record.memCapacity.roundToLong()) + consumer.endField("mem_capacity", 5) + + record.nature?.let { + consumer.startField("nature", 6) + consumer.addBinary(Binary.fromCharSequence(it)) + consumer.endField("nature", 6) + } + + if (record.deadline != -1L) { + consumer.startField("deadline", 7) + consumer.addLong(record.deadline) + consumer.endField("deadline", 7) + } + + consumer.endMessage() + } +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/WtfTaskTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/WtfTaskTableReader.kt deleted file mode 100644 index 95582388..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/WtfTaskTableReader.kt +++ /dev/null @@ -1,187 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.wtf - -import org.opendc.trace.TableColumnType -import org.opendc.trace.TableReader -import org.opendc.trace.conv.TASK_CHILDREN -import org.opendc.trace.conv.TASK_GROUP_ID -import org.opendc.trace.conv.TASK_ID -import org.opendc.trace.conv.TASK_PARENTS -import org.opendc.trace.conv.TASK_REQ_NCPUS -import org.opendc.trace.conv.TASK_RUNTIME -import org.opendc.trace.conv.TASK_SUBMIT_TIME -import org.opendc.trace.conv.TASK_USER_ID -import org.opendc.trace.conv.TASK_WAIT_TIME -import org.opendc.trace.conv.TASK_WORKFLOW_ID -import org.opendc.trace.util.convertTo -import org.opendc.trace.util.parquet.LocalParquetReader -import org.opendc.trace.wtf.parquet.Task -import java.time.Duration -import java.time.Instant -import java.util.UUID - -/** - * A [TableReader] implementation for the WTF format. - */ -internal class WtfTaskTableReader(private val reader: LocalParquetReader) : TableReader { - /** - * The current record. - */ - private var record: Task? = null - - override fun nextRow(): Boolean { - try { - val record = reader.read() - this.record = record - - return record != null - } catch (e: Throwable) { - this.record = null - throw e - } - } - - private val colID = 0 - private val colWorkflowID = 1 - private val colSubmitTime = 2 - private val colWaitTime = 3 - private val colRuntime = 4 - private val colReqNcpus = 5 - private val colParents = 6 - private val colChildren = 7 - private val colGroupID = 8 - private val colUserID = 9 - - private val typeParents = TableColumnType.Set(TableColumnType.String) - private val typeChildren = TableColumnType.Set(TableColumnType.String) - - override fun resolve(name: String): Int { - return when (name) { - TASK_ID -> colID - TASK_WORKFLOW_ID -> colWorkflowID - TASK_SUBMIT_TIME -> colSubmitTime - TASK_WAIT_TIME -> colWaitTime - TASK_RUNTIME -> colRuntime - TASK_REQ_NCPUS -> colReqNcpus - TASK_PARENTS -> colParents - TASK_CHILDREN -> colChildren - TASK_GROUP_ID -> colGroupID - TASK_USER_ID -> colUserID - else -> -1 - } - } - - override fun isNull(index: Int): Boolean { - require(index in colID..colUserID) { "Invalid column index" } - return false - } - - override fun getBoolean(index: Int): Boolean { - throw IllegalArgumentException("Invalid column") - } - - override fun getInt(index: Int): Int { - val record = checkNotNull(record) { "Reader in invalid state" } - - return when (index) { - colReqNcpus -> record.requestedCpus - colGroupID -> record.groupId - colUserID -> record.userId - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getLong(index: Int): Long { - throw IllegalArgumentException("Invalid column") - } - - override fun getFloat(index: Int): Float { - throw IllegalArgumentException("Invalid column") - } - - override fun getDouble(index: Int): Double { - throw IllegalArgumentException("Invalid column") - } - - override fun getString(index: Int): String { - val record = checkNotNull(record) { "Reader in invalid state" } - return when (index) { - colID -> record.id - colWorkflowID -> record.workflowId - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getUUID(index: Int): UUID? { - throw IllegalArgumentException("Invalid column") - } - - override fun getInstant(index: Int): Instant { - val record = checkNotNull(record) { "Reader in invalid state" } - return when (index) { - colSubmitTime -> record.submitTime - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getDuration(index: Int): Duration { - val record = checkNotNull(record) { "Reader in invalid state" } - return when (index) { - colWaitTime -> record.waitTime - colRuntime -> record.runtime - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getList( - index: Int, - elementType: Class, - ): List? { - throw IllegalArgumentException("Invalid column") - } - - override fun getSet( - index: Int, - elementType: Class, - ): Set? { - val record = checkNotNull(record) { "Reader in invalid state" } - return when (index) { - colParents -> typeParents.convertTo(record.parents, elementType) - colChildren -> typeChildren.convertTo(record.children, elementType) - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getMap( - index: Int, - keyType: Class, - valueType: Class, - ): Map? { - throw IllegalArgumentException("Invalid column") - } - - override fun close() { - reader.close() - } -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/WtfTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/WtfTraceFormat.kt deleted file mode 100644 index 1386d2ef..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/WtfTraceFormat.kt +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.wtf - -import org.opendc.trace.TableColumn -import org.opendc.trace.TableColumnType -import org.opendc.trace.TableReader -import org.opendc.trace.TableWriter -import org.opendc.trace.conv.TABLE_TASKS -import org.opendc.trace.conv.TASK_CHILDREN -import org.opendc.trace.conv.TASK_GROUP_ID -import org.opendc.trace.conv.TASK_ID -import org.opendc.trace.conv.TASK_PARENTS -import org.opendc.trace.conv.TASK_REQ_NCPUS -import org.opendc.trace.conv.TASK_RUNTIME -import org.opendc.trace.conv.TASK_SUBMIT_TIME -import org.opendc.trace.conv.TASK_USER_ID -import org.opendc.trace.conv.TASK_WAIT_TIME -import org.opendc.trace.conv.TASK_WORKFLOW_ID -import org.opendc.trace.spi.TableDetails -import org.opendc.trace.spi.TraceFormat -import org.opendc.trace.util.parquet.LocalParquetReader -import org.opendc.trace.wtf.parquet.TaskReadSupport -import java.nio.file.Path - -/** - * A [TraceFormat] implementation for the Workflow Trace Format (WTF). - */ -public class WtfTraceFormat : TraceFormat { - override val name: String = "wtf" - - override fun create(path: Path) { - throw UnsupportedOperationException("Writing not supported for this format") - } - - override fun getTables(path: Path): List = listOf(TABLE_TASKS) - - override fun getDetails( - path: Path, - table: String, - ): TableDetails { - return when (table) { - TABLE_TASKS -> - TableDetails( - listOf( - TableColumn(TASK_ID, TableColumnType.String), - TableColumn(TASK_WORKFLOW_ID, TableColumnType.String), - TableColumn(TASK_SUBMIT_TIME, TableColumnType.Instant), - TableColumn(TASK_WAIT_TIME, TableColumnType.Duration), - TableColumn(TASK_RUNTIME, TableColumnType.Duration), - TableColumn(TASK_REQ_NCPUS, TableColumnType.Int), - TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)), - TableColumn(TASK_CHILDREN, TableColumnType.Set(TableColumnType.String)), - TableColumn(TASK_GROUP_ID, TableColumnType.Int), - TableColumn(TASK_USER_ID, TableColumnType.Int), - ), - ) - else -> throw IllegalArgumentException("Table $table not supported") - } - } - - override fun newReader( - path: Path, - table: String, - projection: List?, - ): TableReader { - return when (table) { - TABLE_TASKS -> { - val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), TaskReadSupport(projection), strictTyping = false) - WtfTaskTableReader(reader) - } - else -> throw IllegalArgumentException("Table $table not supported") - } - } - - override fun newWriter( - path: Path, - table: String, - ): TableWriter { - throw UnsupportedOperationException("Writing not supported for this format") - } -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/Task.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/Task.kt deleted file mode 100644 index a1db0cab..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/Task.kt +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.wtf.parquet - -import java.time.Duration -import java.time.Instant - -/** - * A task in the Workflow Trace Format. - */ -internal data class Task( - val id: String, - val workflowId: String, - val submitTime: Instant, - val waitTime: Duration, - val runtime: Duration, - val requestedCpus: Int, - val groupId: Int, - val userId: Int, - val parents: Set, - val children: Set, -) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/TaskReadSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/TaskReadSupport.kt deleted file mode 100644 index 1f9c506d..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/TaskReadSupport.kt +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.wtf.parquet - -import org.apache.hadoop.conf.Configuration -import org.apache.parquet.hadoop.api.InitContext -import org.apache.parquet.hadoop.api.ReadSupport -import org.apache.parquet.io.api.RecordMaterializer -import org.apache.parquet.schema.LogicalTypeAnnotation -import org.apache.parquet.schema.MessageType -import org.apache.parquet.schema.PrimitiveType -import org.apache.parquet.schema.Type -import org.apache.parquet.schema.Types -import org.opendc.trace.conv.TASK_CHILDREN -import org.opendc.trace.conv.TASK_GROUP_ID -import org.opendc.trace.conv.TASK_ID -import org.opendc.trace.conv.TASK_PARENTS -import org.opendc.trace.conv.TASK_REQ_NCPUS -import org.opendc.trace.conv.TASK_RUNTIME -import org.opendc.trace.conv.TASK_SUBMIT_TIME -import org.opendc.trace.conv.TASK_USER_ID -import org.opendc.trace.conv.TASK_WAIT_TIME -import org.opendc.trace.conv.TASK_WORKFLOW_ID - -/** - * A [ReadSupport] instance for [Task] objects. - * - * @param projection The projection of the table to read. - */ -internal class TaskReadSupport(private val projection: List?) : ReadSupport() { - /** - * Mapping of table columns to their Parquet column names. - */ - private val colMap = - mapOf( - TASK_ID to "id", - TASK_WORKFLOW_ID to "workflow_id", - TASK_SUBMIT_TIME to "ts_submit", - TASK_WAIT_TIME to "wait_time", - TASK_RUNTIME to "runtime", - TASK_REQ_NCPUS to "resource_amount_requested", - TASK_PARENTS to "parents", - TASK_CHILDREN to "children", - TASK_GROUP_ID to "group_id", - TASK_USER_ID to "user_id", - ) - - override fun init(context: InitContext): ReadContext { - val projectedSchema = - if (projection != null) { - Types.buildMessage() - .apply { - val fieldByName = READ_SCHEMA.fields.associateBy { it.name } - - for (col in projection) { - val fieldName = colMap[col] ?: continue - addField(fieldByName.getValue(fieldName)) - } - } - .named(READ_SCHEMA.name) - } else { - READ_SCHEMA - } - return ReadContext(projectedSchema) - } - - override fun prepareForRead( - configuration: Configuration, - keyValueMetaData: Map, - fileSchema: MessageType, - readContext: ReadContext, - ): RecordMaterializer = TaskRecordMaterializer(readContext.requestedSchema) - - companion object { - /** - * Parquet read schema for the "tasks" table in the trace. - */ - @JvmStatic - val READ_SCHEMA: MessageType = - Types.buildMessage() - .addFields( - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .named("id"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .named("workflow_id"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("ts_submit"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .named("wait_time"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .named("runtime"), - Types - .optional(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("resource_amount_requested"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT32) - .named("user_id"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT32) - .named("group_id"), - Types - .buildGroup(Type.Repetition.OPTIONAL) - .addField( - Types.repeatedGroup() - .addField(Types.optional(PrimitiveType.PrimitiveTypeName.INT64).named("item")) - .named("list"), - ) - .`as`(LogicalTypeAnnotation.listType()) - .named("children"), - Types - .buildGroup(Type.Repetition.OPTIONAL) - .addField( - Types.repeatedGroup() - .addField(Types.optional(PrimitiveType.PrimitiveTypeName.INT64).named("item")) - .named("list"), - ) - .`as`(LogicalTypeAnnotation.listType()) - .named("parents"), - ) - .named("task") - } -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/TaskRecordMaterializer.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/TaskRecordMaterializer.kt deleted file mode 100644 index 412a4f8b..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/TaskRecordMaterializer.kt +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.wtf.parquet - -import org.apache.parquet.io.api.Converter -import org.apache.parquet.io.api.GroupConverter -import org.apache.parquet.io.api.PrimitiveConverter -import org.apache.parquet.io.api.RecordMaterializer -import org.apache.parquet.schema.MessageType -import java.time.Duration -import java.time.Instant -import kotlin.math.roundToInt -import kotlin.math.roundToLong - -/** - * A [RecordMaterializer] for [Task] records. - */ -internal class TaskRecordMaterializer(schema: MessageType) : RecordMaterializer() { - /** - * State of current record being read. - */ - private var localID = "" - private var localWorkflowID = "" - private var localSubmitTime = Instant.MIN - private var localWaitTime = Duration.ZERO - private var localRuntime = Duration.ZERO - private var localRequestedCpus = 0 - private var localGroupId = 0 - private var localUserId = 0 - private var localParents = mutableSetOf() - private var localChildren = mutableSetOf() - - /** - * Root converter for the record. - */ - private val root = - object : GroupConverter() { - /** - * The converters for the columns of the schema. - */ - private val converters = - schema.fields.map { type -> - when (type.name) { - "id" -> - object : PrimitiveConverter() { - override fun addLong(value: Long) { - localID = value.toString() - } - } - "workflow_id" -> - object : PrimitiveConverter() { - override fun addLong(value: Long) { - localWorkflowID = value.toString() - } - } - "ts_submit" -> - object : PrimitiveConverter() { - override fun addLong(value: Long) { - localSubmitTime = Instant.ofEpochMilli(value) - } - } - "wait_time" -> - object : PrimitiveConverter() { - override fun addLong(value: Long) { - localWaitTime = Duration.ofMillis(value) - } - } - "runtime" -> - object : PrimitiveConverter() { - override fun addLong(value: Long) { - localRuntime = Duration.ofMillis(value) - } - } - "resource_amount_requested" -> - object : PrimitiveConverter() { - override fun addDouble(value: Double) { - localRequestedCpus = value.roundToInt() - } - } - "group_id" -> - object : PrimitiveConverter() { - override fun addInt(value: Int) { - localGroupId = value - } - } - "user_id" -> - object : PrimitiveConverter() { - override fun addInt(value: Int) { - localUserId = value - } - } - "children" -> RelationConverter(localChildren) - "parents" -> RelationConverter(localParents) - else -> error("Unknown column $type") - } - } - - override fun start() { - localID = "" - localWorkflowID = "" - localSubmitTime = Instant.MIN - localWaitTime = Duration.ZERO - localRuntime = Duration.ZERO - localRequestedCpus = 0 - localGroupId = 0 - localUserId = 0 - localParents.clear() - localChildren.clear() - } - - override fun end() {} - - override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex] - } - - override fun getCurrentRecord(): Task = - Task( - localID, - localWorkflowID, - localSubmitTime, - localWaitTime, - localRuntime, - localRequestedCpus, - localGroupId, - localUserId, - localParents.toSet(), - localChildren.toSet(), - ) - - override fun getRootConverter(): GroupConverter = root - - /** - * Helper class to convert parent and child relations and add them to [relations]. - */ - private class RelationConverter(private val relations: MutableSet) : GroupConverter() { - private val entryConverter = - object : PrimitiveConverter() { - override fun addLong(value: Long) { - relations.add(value.toString()) - } - - override fun addDouble(value: Double) { - relations.add(value.roundToLong().toString()) - } - } - - private val listConverter = - object : GroupConverter() { - override fun getConverter(fieldIndex: Int): Converter { - require(fieldIndex == 0) - return entryConverter - } - - override fun start() {} - - override fun end() {} - } - - override fun getConverter(fieldIndex: Int): Converter { - require(fieldIndex == 0) - return listConverter - } - - override fun start() {} - - override fun end() {} - } -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt index e586f90a..945d8f2f 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt @@ -24,15 +24,9 @@ package org.opendc.trace.spi import org.opendc.trace.TableReader import org.opendc.trace.TableWriter -import org.opendc.trace.azure.AzureTraceFormat -import org.opendc.trace.bitbrains.BitbrainsTraceFormat import org.opendc.trace.formats.carbon.CarbonTraceFormat import org.opendc.trace.formats.failure.FailureTraceFormat -import org.opendc.trace.formats.opendc.OdcVmTraceFormat -import org.opendc.trace.gwf.GwfTraceFormat -import org.opendc.trace.swf.SwfTraceFormat -import org.opendc.trace.wfformat.WfFormatTraceFormat -import org.opendc.trace.wtf.WtfTraceFormat +import org.opendc.trace.formats.workload.WorkloadTraceFormat import java.nio.file.Path import java.util.ServiceLoader @@ -122,15 +116,9 @@ public interface TraceFormat { @JvmStatic public fun byName(name: String): TraceFormat? { return when (name) { - "azure" -> AzureTraceFormat() - "bitbrains" -> BitbrainsTraceFormat() "carbon" -> CarbonTraceFormat() "failure" -> FailureTraceFormat() - "gwf" -> GwfTraceFormat() - "opendc-vm" -> OdcVmTraceFormat() - "swf" -> SwfTraceFormat() - "wfformat" -> WfFormatTraceFormat() - "wtf" -> WtfTraceFormat() + "workload" -> WorkloadTraceFormat() else -> null } } -- cgit v1.2.3