summaryrefslogtreecommitdiff
path: root/opendc-trace/opendc-trace-api/src/main/kotlin/org
diff options
context:
space:
mode:
authorDante Niewenhuis <d.niewenhuis@hotmail.com>2025-07-22 15:47:44 +0200
committerGitHub <noreply@github.com>2025-07-22 15:47:44 +0200
commite22c97dcca7478d6941b78bdf7cd873bc0d23cdc (patch)
treef1859c16f4c7973d8b16ed693caad4c749d42331 /opendc-trace/opendc-trace-api/src/main/kotlin/org
parent0c0cf25616771cd40a9e401edcba4a5e5016f76e (diff)
Updated workload schema (#360)
Diffstat (limited to 'opendc-trace/opendc-trace-api/src/main/kotlin/org')
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/CarbonColumns.kt (renamed from opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/CarbonIntensityColumns.kt)10
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/FragmentColumns.kt (renamed from opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt)19
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt115
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt103
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/Tables.kt19
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt56
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureResourceStateTableReader.kt219
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureResourceTableReader.kt246
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureTraceFormat.kt147
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsExResourceStateTableReader.kt292
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsExTraceFormat.kt135
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsResourceStateTableReader.kt365
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsResourceTableReader.kt175
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsTraceFormat.kt159
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/CarbonTableReader.kt14
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/CarbonTraceFormat.kt20
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonFragment.kt (renamed from opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityFragment.kt)2
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonReadSupport.kt (renamed from opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityReadSupport.kt)41
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonRecordMaterializer.kt (renamed from opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityRecordMaterializer.kt)8
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonSchemas.kt43
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/parquet/FailureReadSupport.kt30
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/parquet/FailureRecordMaterializer.kt2
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/parquet/FailureSchemas.kt44
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/gwf/GwfTaskTableReader.kt286
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/gwf/GwfTraceFormat.kt104
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmInterferenceJsonTableReader.kt225
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmInterferenceJsonTableWriter.kt192
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmTraceFormat.kt202
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceReadSupport.kt214
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateReadSupport.kt161
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/swf/SwfTaskTableReader.kt236
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/swf/SwfTraceFormat.kt100
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wfformat/WfFormatTaskTableReader.kt314
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wfformat/WfFormatTraceFormat.kt95
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/FragmentTableReader.kt (renamed from opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceStateTableReader.kt)57
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/FragmentTableWriter.kt (renamed from opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceStateTableWriter.kt)72
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableReader.kt (renamed from opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableReader.kt)90
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableWriter.kt (renamed from opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableWriter.kt)95
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/WorkloadTraceFormat.kt165
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/Fragment.kt (renamed from opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceState.kt)10
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentReadSupport.kt79
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentRecordMaterializer.kt (renamed from opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateRecordMaterializer.kt)23
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentSchemas.kt80
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentWriteSupport.kt (renamed from opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateWriteSupport.kt)24
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/Task.kt (renamed from opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/Resource.kt)11
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskReadSupport.kt101
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskRecordMaterializer.kt (renamed from opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceRecordMaterializer.kt)36
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskSchemas.kt166
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskWriteSupport.kt (renamed from opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceWriteSupport.kt)57
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/WtfTaskTableReader.kt187
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/WtfTraceFormat.kt102
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/Task.kt42
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/TaskReadSupport.kt148
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/TaskRecordMaterializer.kt188
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt16
55 files changed, 963 insertions, 5179 deletions
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/CarbonIntensityColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/CarbonColumns.kt
index de74c4fd..32cdd78b 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/CarbonIntensityColumns.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/CarbonColumns.kt
@@ -20,16 +20,16 @@
* SOFTWARE.
*/
-@file:JvmName("CarbonIntensityColumns")
+@file:JvmName("CarbonColumns")
package org.opendc.trace.conv
/**
- * A column containing the task identifier.
+ * A column containing the timestamp of the carbon intensity measurement.
*/
-public const val CARBON_INTENSITY_TIMESTAMP: String = "timestamp"
+public const val CARBON_TIMESTAMP: String = "timestamp"
/**
- * A column containing the task identifier.
+ * A column containing the intensity of the carbon when sampled.
*/
-public const val CARBON_INTENSITY_VALUE: String = "carbon_intensity"
+public const val CARBON_INTENSITY: String = "carbon_intensity"
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/FragmentColumns.kt
index fbbfdea9..e0d01ef2 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/FragmentColumns.kt
@@ -20,21 +20,26 @@
* SOFTWARE.
*/
-@file:JvmName("InterferenceGroupColumns")
+@file:JvmName("FragmentColumns")
package org.opendc.trace.conv
/**
- * Members of the interference group.
+ * Duration for the fragment.
*/
-public const val INTERFERENCE_GROUP_MEMBERS: String = "members"
+public const val FRAGMENT_DURATION: String = "duration"
/**
- * Target load after which the interference occurs.
+ * Total CPU usage during the fragment in MHz.
*/
-public const val INTERFERENCE_GROUP_TARGET: String = "target"
+public const val FRAGMENT_CPU_USAGE: String = "cpu_usage"
/**
- * Performance score when the interference occurs.
+ * Total GPU usage during the fragment in MHz.
*/
-public const val INTERFERENCE_GROUP_SCORE: String = "score"
+public const val FRAGMENT_GPU_USAGE: String = "gpu_usage"
+
+/**
+ * Memory usage during the fragment in KB.
+ */
+public const val FRAGMENT_MEM_USAGE: String = "mem_usage"
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt
deleted file mode 100644
index 3d0341b2..00000000
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-@file:JvmName("ResourceColumns")
-
-package org.opendc.trace.conv
-
-/**
- * Identifier of the resource.
- */
-@JvmField
-public val resourceID: String = "id"
-
-/**
- * The cluster to which the resource belongs.
- */
-@JvmField
-public val resourceClusterID: String = "cluster_id"
-
-/**
- * Start time for the resource.
- */
-@JvmField
-public val resourceSubmissionTime: String = "submission_time"
-
-/**
- * Carbon intensity of the resource.
- */
-@JvmField
-public val resourceCarbonIntensity: String = "carbon_intensity"
-
-/**
- * End time for the resource.
- */
-@JvmField
-public val resourceDuration: String = "duration"
-
-/**
- * Number of CPUs for the resource.
- */
-@JvmField
-public val resourceCpuCount: String = "cpu_count"
-
-/**
- * Total CPU capacity of the resource in MHz.
- */
-@JvmField
-public val resourceCpuCapacity: String = "cpu_capacity"
-
-/**
- * Memory capacity for the resource in KB.
- */
-@JvmField
-public val resourceMemCapacity: String = "mem_capacity"
-
-/**
- * Number of GPU cores for the resource.
- */
-@JvmField
-public val resourceGpuCount: String = "gpu_count"
-
-/**
- * Total GPU capacity of the resource in MHz.
- */
-@JvmField
-public val resourceGpuCapacity: String = "gpu_capacity"
-
-/**
- * Total GPU memory capacity of the resource in MB.
- */
-@JvmField
-public val resourceGpuMemCapacity: String = "gpu_mem_capacity"
-
-/**
- * The parents of the resource that need to be completed before this resource can be used.
- */
-@JvmField
-public val resourceParents: String = "parents"
-
-/**
- * The children of the resource that cannot be started before this is completed.
- */
-@JvmField
-public val resourceChildren: String = "children"
-
-/**
- * Nature of the task. Delayable, interruptible, etc.
- */
-@JvmField
-public val resourceNature: String = "nature"
-
-/**
- * Deadline of the task.
- */
-@JvmField
-public val resourceDeadline: String = "deadline"
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt
deleted file mode 100644
index f4ab7759..00000000
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-@file:JvmName("ResourceStateColumns")
-
-package org.opendc.trace.conv
-
-/**
- * The timestamp at which the state was recorded.
- */
-@JvmField
-public val resourceStateTimestamp: String = "timestamp"
-
-/**
- * Duration for the state.
- */
-@JvmField
-public val resourceStateDuration: String = "duration"
-
-/**
- * A flag to indicate that the resource is powered on.
- */
-@JvmField
-public val resourceStatePoweredOn: String = "powered_on"
-
-/**
- * Total CPU usage of the resource in MHz.
- */
-@JvmField
-public val resourceStateCpuUsage: String = "cpu_usage"
-
-/**
- * Total CPU usage of the resource in percentage.
- */
-@JvmField
-public val resourceStateCpuUsagePct: String = "cpu_usage_pct"
-
-/**
- * Total CPU demand of the resource in MHz.
- */
-@JvmField
-public val resourceStateCpuDemand: String = "cpu_demand"
-
-/**
- * CPU ready percentage.
- */
-@JvmField
-public val resourceStateCpuReadyPct: String = "cpu_ready_pct"
-
-/**
- * Memory usage of the resource in KB.
- */
-@JvmField
-public val resourceStateMemUsage: String = "mem_usage"
-
-/**
- * Disk read throughput of the resource in KB/s.
- */
-@JvmField
-public val resourceStateDiskRead: String = "disk_read"
-
-/**
- * Disk write throughput of the resource in KB/s.
- */
-@JvmField
-public val resourceStateDiskWrite: String = "disk_write"
-
-/**
- * Network receive throughput of the resource in KB/s.
- */
-@JvmField
-public val resourceStateNetRx: String = "net_rx"
-
-/**
- * Network transmit throughput of the resource in KB/s.
- */
-@JvmField
-public val resourceStateNetTx: String = "net_tx"
-
-/**
- * Total GPU capacity of the resource in MHz.
- */
-@JvmField
-public val resourceStateGpuUsage: String = "gpu_usage"
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/Tables.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/Tables.kt
index d4019f73..310d268a 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/Tables.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/Tables.kt
@@ -25,30 +25,21 @@
package org.opendc.trace.conv
/**
- * A table containing all workflows in a workload.
- */
-public const val TABLE_WORKFLOWS: String = "workflows"
-
-/**
* A table containing all tasks in a workload.
*/
public const val TABLE_TASKS: String = "tasks"
/**
- * A table containing all resources in a workload.
+ * A table containing all resource states in a workload.
*/
-public const val TABLE_RESOURCES: String = "resources"
+public const val TABLE_FRAGMENTS: String = "fragments"
/**
- * A table containing all resource states in a workload.
+ * A table containing the carbon intensities of the region
*/
-public const val TABLE_RESOURCE_STATES: String = "resource_states"
+public const val TABLE_CARBON: String = "carbon"
/**
- * A table containing the groups of resources that interfere when run on the same execution platform.
+ * A table containing failures that can be injected during simulation.
*/
-public const val TABLE_INTERFERENCE_GROUPS: String = "interference_groups"
-
-public const val TABLE_CARBON_INTENSITIES: String = "carbon_intensities"
-
public const val TABLE_FAILURES: String = "failures"
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt
index 6ca87a60..0df52c71 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt
@@ -25,61 +25,71 @@
package org.opendc.trace.conv
/**
- * A column containing the task identifier.
+ * Identifier of the task.
*/
public const val TASK_ID: String = "id"
/**
- * A column containing the identifier of the workflow.
+ * Identifier of the task.
*/
-public const val TASK_WORKFLOW_ID: String = "workflow_id"
+public const val TASK_NAME: String = "name"
/**
- * A column containing the submission time of the task.
+ * The time of submission of the task.
*/
-public const val TASK_SUBMIT_TIME: String = "submit_time"
+public const val TASK_SUBMISSION_TIME: String = "submission_time"
/**
- * A column containing the wait time of the task.
+ * The duration of a task in ms
*/
-public const val TASK_WAIT_TIME: String = "wait_time"
+public const val TASK_DURATION: String = "duration"
/**
- * A column containing the runtime time of the task.
+ * Number of CPUs for the task.
*/
-public const val TASK_RUNTIME: String = "runtime"
+public const val TASK_CPU_COUNT: String = "cpu_count"
/**
- * A column containing the parents of a task.
+ * Total CPU capacity of the task in MHz.
*/
-public const val TASK_PARENTS: String = "parents"
+public const val TASK_CPU_CAPACITY: String = "cpu_capacity"
/**
- * A column containing the children of a task.
+ * Memory capacity for the task in KB.
*/
-public const val TASK_CHILDREN: String = "children"
+public const val TASK_MEM_CAPACITY: String = "mem_capacity"
+
+/**
+ * Number of GPU cores for the task.
+ */
+public const val TASK_GPU_COUNT: String = "gpu_count"
/**
- * A column containing the requested CPUs of a task.
+ * Total GPU capacity of the task in MHz.
*/
-public const val TASK_REQ_NCPUS: String = "req_ncpus"
+public const val TASK_GPU_CAPACITY: String = "gpu_capacity"
/**
- * A column containing the allocated CPUs of a task.
+ * Total GPU memory capacity of the task in MB.
*/
-public const val TASK_ALLOC_NCPUS: String = "alloc_ncpus"
+public const val TASK_GPU_MEM_CAPACITY: String = "gpu_mem_capacity"
/**
- * A column containing the status of a task.
+ * The parents of the task that need to be completed before this task can be used.
*/
-public const val TASK_STATUS: String = "status"
+public const val TASK_PARENTS: String = "parents"
+
+/**
+ * The children of the task that cannot be started before this is completed.
+ */
+public const val TASK_CHILDREN: String = "children"
/**
- * A column containing the group id of a task.
+ * Nature of the task. Delayable, interruptible, etc.
*/
-public const val TASK_GROUP_ID: String = "group_id"
+public const val TASK_NATURE: String = "nature"
/**
- * A column containing the user id of a task.
+ * Deadline of the task.
*/
-public const val TASK_USER_ID: String = "user_id"
+public const val TASK_DEADLINE: String = "deadline"
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureResourceStateTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureResourceStateTableReader.kt
deleted file mode 100644
index bcf6ff52..00000000
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureResourceStateTableReader.kt
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.azure
-
-import com.fasterxml.jackson.core.JsonToken
-import com.fasterxml.jackson.dataformat.csv.CsvParser
-import com.fasterxml.jackson.dataformat.csv.CsvSchema
-import org.opendc.trace.TableReader
-import org.opendc.trace.conv.resourceID
-import org.opendc.trace.conv.resourceStateCpuUsagePct
-import org.opendc.trace.conv.resourceStateTimestamp
-import java.time.Duration
-import java.time.Instant
-import java.util.UUID
-
-/**
- * A [TableReader] for the Azure v1 VM resource state table.
- */
-internal class AzureResourceStateTableReader(private val parser: CsvParser) : TableReader {
- /**
- * A flag to indicate whether a single row has been read already.
- */
- private var isStarted = false
-
- init {
- parser.schema = schema
- }
-
- override fun nextRow(): Boolean {
- if (!isStarted) {
- isStarted = true
- }
-
- reset()
-
- if (!nextStart()) {
- return false
- }
-
- while (true) {
- val token = parser.nextValue()
-
- if (token == null || token == JsonToken.END_OBJECT) {
- break
- }
-
- when (parser.currentName) {
- "timestamp" -> timestamp = Instant.ofEpochSecond(parser.longValue)
- "vm id" -> id = parser.text
- "CPU avg cpu" -> cpuUsagePct = (parser.doubleValue / 100.0) // Convert from % to [0, 1]
- }
- }
-
- return true
- }
-
- private val colID = 0
- private val colTimestamp = 1
- private val colCpuUsagePct = 2
-
- override fun resolve(name: String): Int {
- return when (name) {
- resourceID -> colID
- resourceStateTimestamp -> colTimestamp
- resourceStateCpuUsagePct -> colCpuUsagePct
- else -> -1
- }
- }
-
- override fun isNull(index: Int): Boolean {
- require(index in 0..colCpuUsagePct) { "Invalid column index" }
- return false
- }
-
- override fun getBoolean(index: Int): Boolean {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getInt(index: Int): Int {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getLong(index: Int): Long {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getFloat(index: Int): Float {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getDouble(index: Int): Double {
- checkActive()
- return when (index) {
- colCpuUsagePct -> cpuUsagePct
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun getString(index: Int): String? {
- checkActive()
- return when (index) {
- colID -> id
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun getUUID(index: Int): UUID? {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getInstant(index: Int): Instant? {
- checkActive()
- return when (index) {
- colTimestamp -> timestamp
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun getDuration(index: Int): Duration? {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun <T> getList(
- index: Int,
- elementType: Class<T>,
- ): List<T>? {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun <K, V> getMap(
- index: Int,
- keyType: Class<K>,
- valueType: Class<V>,
- ): Map<K, V>? {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun <T> getSet(
- index: Int,
- elementType: Class<T>,
- ): Set<T>? {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun close() {
- parser.close()
- }
-
- /**
- * Helper method to check if the reader is active.
- */
- private fun checkActive() {
- check(isStarted && !parser.isClosed) { "No active row. Did you call nextRow()?" }
- }
-
- /**
- * Advance the parser until the next object start.
- */
- private fun nextStart(): Boolean {
- var token = parser.nextValue()
-
- while (token != null && token != JsonToken.START_OBJECT) {
- token = parser.nextValue()
- }
-
- return token != null
- }
-
- /**
- * State fields of the reader.
- */
- private var id: String? = null
- private var timestamp: Instant? = null
- private var cpuUsagePct = Double.NaN
-
- /**
- * Reset the state.
- */
- private fun reset() {
- id = null
- timestamp = null
- cpuUsagePct = Double.NaN
- }
-
- companion object {
- /**
- * The [CsvSchema] that is used to parse the trace.
- */
- private val schema =
- CsvSchema.builder()
- .addColumn("timestamp", CsvSchema.ColumnType.NUMBER)
- .addColumn("vm id", CsvSchema.ColumnType.STRING)
- .addColumn("CPU min cpu", CsvSchema.ColumnType.NUMBER)
- .addColumn("CPU max cpu", CsvSchema.ColumnType.NUMBER)
- .addColumn("CPU avg cpu", CsvSchema.ColumnType.NUMBER)
- .setAllowComments(true)
- .build()
- }
-}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureResourceTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureResourceTableReader.kt
deleted file mode 100644
index 55f26fa6..00000000
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureResourceTableReader.kt
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.azure
-
-import com.fasterxml.jackson.core.JsonToken
-import com.fasterxml.jackson.dataformat.csv.CsvParser
-import com.fasterxml.jackson.dataformat.csv.CsvSchema
-import org.opendc.trace.TableReader
-import org.opendc.trace.conv.resourceCpuCount
-import org.opendc.trace.conv.resourceDuration
-import org.opendc.trace.conv.resourceID
-import org.opendc.trace.conv.resourceMemCapacity
-import org.opendc.trace.conv.resourceSubmissionTime
-import java.time.Duration
-import java.time.Instant
-import java.util.UUID
-
-/**
- * A [TableReader] for the Azure v1 VM resources table.
- */
-internal class AzureResourceTableReader(private val parser: CsvParser) : TableReader {
- /**
- * A flag to indicate whether a single row has been read already.
- */
- private var isStarted = false
-
- init {
- parser.schema = schema
- }
-
- override fun nextRow(): Boolean {
- if (!isStarted) {
- isStarted = true
- }
-
- reset()
-
- if (!nextStart()) {
- return false
- }
-
- while (true) {
- val token = parser.nextValue()
-
- if (token == null || token == JsonToken.END_OBJECT) {
- break
- }
-
- when (parser.currentName) {
- "vm id" -> id = parser.text
- "timestamp vm created" -> startTime = Instant.ofEpochSecond(parser.longValue)
- "timestamp vm deleted" -> stopTime = Instant.ofEpochSecond(parser.longValue)
- "vm virtual core count" -> cpuCores = parser.intValue
- "vm memory" -> memCapacity = parser.doubleValue * 1e6 // GB to KB
- }
- }
-
- return true
- }
-
- private val colID = 0
- private val colStartTime = 1
- private val colStopTime = 2
- private val colCpuCount = 3
- private val colMemCapacity = 4
-
- override fun resolve(name: String): Int {
- return when (name) {
- resourceID -> colID
- resourceSubmissionTime -> colStartTime
- resourceDuration -> colStopTime
- resourceCpuCount -> colCpuCount
- resourceMemCapacity -> colMemCapacity
- else -> -1
- }
- }
-
- override fun isNull(index: Int): Boolean {
- require(index in 0..colMemCapacity) { "Invalid column index" }
- return false
- }
-
- override fun getBoolean(index: Int): Boolean {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getInt(index: Int): Int {
- checkActive()
- return when (index) {
- colCpuCount -> cpuCores
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun getLong(index: Int): Long {
- checkActive()
- return when (index) {
- colCpuCount -> cpuCores.toLong()
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun getFloat(index: Int): Float {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getDouble(index: Int): Double {
- checkActive()
- return when (index) {
- colMemCapacity -> memCapacity
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun getString(index: Int): String? {
- checkActive()
- return when (index) {
- colID -> id
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun getUUID(index: Int): UUID? {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getInstant(index: Int): Instant? {
- checkActive()
- return when (index) {
- colStartTime -> startTime
- colStopTime -> stopTime
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun getDuration(index: Int): Duration? {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun <T> getList(
- index: Int,
- elementType: Class<T>,
- ): List<T>? {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun <T> getSet(
- index: Int,
- elementType: Class<T>,
- ): Set<T>? {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun <K, V> getMap(
- index: Int,
- keyType: Class<K>,
- valueType: Class<V>,
- ): Map<K, V>? {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun close() {
- parser.close()
- }
-
- /**
- * Helper method to check if the reader is active.
- */
- private fun checkActive() {
- check(isStarted && !parser.isClosed) { "No active row. Did you call nextRow()?" }
- }
-
- /**
- * Advance the parser until the next object start.
- */
- private fun nextStart(): Boolean {
- var token = parser.nextValue()
-
- while (token != null && token != JsonToken.START_OBJECT) {
- token = parser.nextValue()
- }
-
- return token != null
- }
-
- /**
- * State fields of the reader.
- */
- private var id: String? = null
- private var startTime: Instant? = null
- private var stopTime: Instant? = null
- private var cpuCores = -1
- private var memCapacity = Double.NaN
-
- /**
- * Reset the state.
- */
- private fun reset() {
- id = null
- startTime = null
- stopTime = null
- cpuCores = -1
- memCapacity = Double.NaN
- }
-
- companion object {
- /**
- * The [CsvSchema] that is used to parse the trace.
- */
- private val schema =
- CsvSchema.builder()
- .addColumn("vm id", CsvSchema.ColumnType.NUMBER)
- .addColumn("subscription id", CsvSchema.ColumnType.STRING)
- .addColumn("deployment id", CsvSchema.ColumnType.NUMBER)
- .addColumn("timestamp vm created", CsvSchema.ColumnType.NUMBER)
- .addColumn("timestamp vm deleted", CsvSchema.ColumnType.NUMBER)
- .addColumn("max cpu", CsvSchema.ColumnType.NUMBER)
- .addColumn("avg cpu", CsvSchema.ColumnType.NUMBER)
- .addColumn("p95 cpu", CsvSchema.ColumnType.NUMBER)
- .addColumn("vm category", CsvSchema.ColumnType.NUMBER)
- .addColumn("vm virtual core count", CsvSchema.ColumnType.NUMBER)
- .addColumn("vm memory", CsvSchema.ColumnType.NUMBER)
- .setAllowComments(true)
- .build()
- }
-}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureTraceFormat.kt
deleted file mode 100644
index 7ce1c11a..00000000
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureTraceFormat.kt
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.azure
-
-import com.fasterxml.jackson.dataformat.csv.CsvFactory
-import com.fasterxml.jackson.dataformat.csv.CsvParser
-import org.opendc.trace.TableColumn
-import org.opendc.trace.TableColumnType
-import org.opendc.trace.TableReader
-import org.opendc.trace.TableWriter
-import org.opendc.trace.conv.TABLE_RESOURCES
-import org.opendc.trace.conv.TABLE_RESOURCE_STATES
-import org.opendc.trace.conv.resourceCpuCount
-import org.opendc.trace.conv.resourceDuration
-import org.opendc.trace.conv.resourceID
-import org.opendc.trace.conv.resourceMemCapacity
-import org.opendc.trace.conv.resourceStateCpuUsagePct
-import org.opendc.trace.conv.resourceStateTimestamp
-import org.opendc.trace.conv.resourceSubmissionTime
-import org.opendc.trace.spi.TableDetails
-import org.opendc.trace.spi.TraceFormat
-import org.opendc.trace.util.CompositeTableReader
-import java.nio.file.Files
-import java.nio.file.Path
-import java.util.stream.Collectors
-import java.util.zip.GZIPInputStream
-import kotlin.io.path.inputStream
-import kotlin.io.path.name
-
-/**
- * A format implementation for the Azure v1 format.
- */
-public class AzureTraceFormat : TraceFormat {
- /**
- * The name of this trace format.
- */
- override val name: String = "azure"
-
- /**
- * The [CsvFactory] used to create the parser.
- */
- private val factory =
- CsvFactory()
- .enable(CsvParser.Feature.ALLOW_COMMENTS)
- .enable(CsvParser.Feature.TRIM_SPACES)
-
- override fun create(path: Path) {
- throw UnsupportedOperationException("Writing not supported for this format")
- }
-
- override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES)
-
- override fun getDetails(
- path: Path,
- table: String,
- ): TableDetails {
- return when (table) {
- TABLE_RESOURCES ->
- TableDetails(
- listOf(
- TableColumn(resourceID, TableColumnType.String),
- TableColumn(resourceSubmissionTime, TableColumnType.Instant),
- TableColumn(resourceDuration, TableColumnType.Instant),
- TableColumn(resourceCpuCount, TableColumnType.Int),
- TableColumn(resourceMemCapacity, TableColumnType.Double),
- ),
- )
- TABLE_RESOURCE_STATES ->
- TableDetails(
- listOf(
- TableColumn(resourceID, TableColumnType.String),
- TableColumn(resourceStateTimestamp, TableColumnType.Instant),
- TableColumn(resourceStateCpuUsagePct, TableColumnType.Double),
- ),
- )
- else -> throw IllegalArgumentException("Table $table not supported")
- }
- }
-
- override fun newReader(
- path: Path,
- table: String,
- projection: List<String>?,
- ): TableReader {
- return when (table) {
- TABLE_RESOURCES -> {
- val stream = GZIPInputStream(path.resolve("vmtable/vmtable.csv.gz").inputStream())
- AzureResourceTableReader(factory.createParser(stream))
- }
- TABLE_RESOURCE_STATES -> newResourceStateReader(path)
- else -> throw IllegalArgumentException("Table $table not supported")
- }
- }
-
- override fun newWriter(
- path: Path,
- table: String,
- ): TableWriter {
- throw UnsupportedOperationException("Writing not supported for this format")
- }
-
- /**
- * Construct a [TableReader] for reading over all VM CPU readings.
- */
- private fun newResourceStateReader(path: Path): TableReader {
- val partitions =
- Files.walk(path.resolve("vm_cpu_readings"), 1)
- .filter { !Files.isDirectory(it) && it.name.endsWith(".csv.gz") }
- .collect(Collectors.toMap({ it.name.removeSuffix(".csv.gz") }, { it }))
- .toSortedMap()
- val it = partitions.iterator()
-
- return object : CompositeTableReader() {
- override fun nextReader(): TableReader? {
- return if (it.hasNext()) {
- val (_, partPath) = it.next()
- val stream = GZIPInputStream(partPath.inputStream())
- return AzureResourceStateTableReader(factory.createParser(stream))
- } else {
- null
- }
- }
-
- override fun toString(): String = "AzureCompositeTableReader"
- }
- }
-}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsExResourceStateTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsExResourceStateTableReader.kt
deleted file mode 100644
index 8387d1ed..00000000
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsExResourceStateTableReader.kt
+++ /dev/null
@@ -1,292 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.bitbrains
-
-import org.opendc.trace.TableReader
-import org.opendc.trace.conv.resourceClusterID
-import org.opendc.trace.conv.resourceCpuCapacity
-import org.opendc.trace.conv.resourceCpuCount
-import org.opendc.trace.conv.resourceID
-import org.opendc.trace.conv.resourceMemCapacity
-import org.opendc.trace.conv.resourceStateCpuDemand
-import org.opendc.trace.conv.resourceStateCpuReadyPct
-import org.opendc.trace.conv.resourceStateCpuUsage
-import org.opendc.trace.conv.resourceStateCpuUsagePct
-import org.opendc.trace.conv.resourceStateDiskRead
-import org.opendc.trace.conv.resourceStateDiskWrite
-import org.opendc.trace.conv.resourceStateTimestamp
-import java.io.BufferedReader
-import java.time.Duration
-import java.time.Instant
-import java.util.UUID
-
-/**
- * A [TableReader] for the Bitbrains resource state table.
- */
-internal class BitbrainsExResourceStateTableReader(private val reader: BufferedReader) : TableReader {
- private var state = State.Pending
-
- override fun nextRow(): Boolean {
- val state = state
- if (state == State.Closed) {
- return false
- } else if (state == State.Pending) {
- this.state = State.Active
- }
-
- reset()
-
- var line: String?
- var num = 0
-
- while (true) {
- line = reader.readLine()
-
- if (line == null) {
- this.state = State.Closed
- return false
- }
-
- num++
-
- if (line[0] == '#' || line.isBlank()) {
- // Ignore empty lines or comments
- continue
- }
-
- break
- }
-
- line = line!!.trim()
-
- val length = line.length
- var col = 0
- var start: Int
- var end = 0
-
- while (end < length) {
- // Trim all whitespace before the field
- start = end
- while (start < length && line[start].isWhitespace()) {
- start++
- }
-
- end = line.indexOf(' ', start)
-
- if (end < 0) {
- end = length
- }
-
- val field = line.subSequence(start, end) as String
- when (col++) {
- colTimestamp -> timestamp = Instant.ofEpochSecond(field.toLong(10))
- colCpuUsage -> cpuUsage = field.toDouble()
- colCpuDemand -> cpuDemand = field.toDouble()
- colDiskRead -> diskRead = field.toDouble()
- colDiskWrite -> diskWrite = field.toDouble()
- colClusterID -> cluster = field.trim()
- colNcpus -> cpuCores = field.toInt(10)
- colCpuReadyPct -> cpuReadyPct = field.toDouble()
- colPoweredOn -> poweredOn = field.toInt(10) == 1
- colCpuCapacity -> cpuCapacity = field.toDouble()
- colID -> id = field.trim()
- colMemCapacity -> memCapacity = field.toDouble() * 1000 // Convert from MB to KB
- }
- }
-
- return true
- }
-
- override fun resolve(name: String): Int {
- return when (name) {
- resourceID -> colID
- resourceClusterID -> colClusterID
- resourceStateTimestamp -> colTimestamp
- resourceCpuCount -> colNcpus
- resourceCpuCapacity -> colCpuCapacity
- resourceStateCpuUsage -> colCpuUsage
- resourceStateCpuUsagePct -> colCpuUsagePct
- resourceStateCpuDemand -> colCpuDemand
- resourceStateCpuReadyPct -> colCpuReadyPct
- resourceMemCapacity -> colMemCapacity
- resourceStateDiskRead -> colDiskRead
- resourceStateDiskWrite -> colDiskWrite
- else -> -1
- }
- }
-
- override fun isNull(index: Int): Boolean {
- require(index in 0 until colMax) { "Invalid column index" }
- return false
- }
-
- override fun getBoolean(index: Int): Boolean {
- check(state == State.Active) { "No active row" }
- return when (index) {
- colPoweredOn -> poweredOn
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun getInt(index: Int): Int {
- check(state == State.Active) { "No active row" }
- return when (index) {
- colNcpus -> cpuCores
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun getLong(index: Int): Long {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getFloat(index: Int): Float {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getDouble(index: Int): Double {
- check(state == State.Active) { "No active row" }
- return when (index) {
- colCpuCapacity -> cpuCapacity
- colCpuUsage -> cpuUsage
- colCpuUsagePct -> cpuUsage / cpuCapacity
- colCpuReadyPct -> cpuReadyPct
- colCpuDemand -> cpuDemand
- colMemCapacity -> memCapacity
- colDiskRead -> diskRead
- colDiskWrite -> diskWrite
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun getString(index: Int): String? {
- check(state == State.Active) { "No active row" }
- return when (index) {
- colID -> id
- colClusterID -> cluster
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun getUUID(index: Int): UUID? {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getInstant(index: Int): Instant? {
- check(state == State.Active) { "No active row" }
- return when (index) {
- colTimestamp -> timestamp
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun getDuration(index: Int): Duration? {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun <T> getList(
- index: Int,
- elementType: Class<T>,
- ): List<T>? {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun <T> getSet(
- index: Int,
- elementType: Class<T>,
- ): Set<T>? {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun <K, V> getMap(
- index: Int,
- keyType: Class<K>,
- valueType: Class<V>,
- ): Map<K, V>? {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun close() {
- reader.close()
- reset()
- state = State.Closed
- }
-
- /**
- * State fields of the reader.
- */
- private var id: String? = null
- private var cluster: String? = null
- private var timestamp: Instant? = null
- private var cpuCores = -1
- private var cpuCapacity = Double.NaN
- private var cpuUsage = Double.NaN
- private var cpuDemand = Double.NaN
- private var cpuReadyPct = Double.NaN
- private var memCapacity = Double.NaN
- private var diskRead = Double.NaN
- private var diskWrite = Double.NaN
- private var poweredOn: Boolean = false
-
- /**
- * Reset the state of the reader.
- */
- private fun reset() {
- id = null
- timestamp = null
- cluster = null
- cpuCores = -1
- cpuCapacity = Double.NaN
- cpuUsage = Double.NaN
- cpuDemand = Double.NaN
- cpuReadyPct = Double.NaN
- memCapacity = Double.NaN
- diskRead = Double.NaN
- diskWrite = Double.NaN
- poweredOn = false
- }
-
- /**
- * Default column indices for the extended Bitbrains format.
- */
- private val colTimestamp = 0
- private val colCpuUsage = 1
- private val colCpuDemand = 2
- private val colDiskRead = 4
- private val colDiskWrite = 6
- private val colClusterID = 10
- private val colNcpus = 12
- private val colCpuReadyPct = 13
- private val colPoweredOn = 14
- private val colCpuCapacity = 18
- private val colID = 19
- private val colMemCapacity = 20
- private val colCpuUsagePct = 21
- private val colMax = colCpuUsagePct + 1
-
- private enum class State {
- Pending,
- Active,
- Closed,
- }
-}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsExTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsExTraceFormat.kt
deleted file mode 100644
index 6115953f..00000000
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsExTraceFormat.kt
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.bitbrains
-
-import org.opendc.trace.TableColumn
-import org.opendc.trace.TableColumnType
-import org.opendc.trace.TableReader
-import org.opendc.trace.TableWriter
-import org.opendc.trace.conv.TABLE_RESOURCE_STATES
-import org.opendc.trace.conv.resourceClusterID
-import org.opendc.trace.conv.resourceCpuCapacity
-import org.opendc.trace.conv.resourceCpuCount
-import org.opendc.trace.conv.resourceID
-import org.opendc.trace.conv.resourceMemCapacity
-import org.opendc.trace.conv.resourceStateCpuDemand
-import org.opendc.trace.conv.resourceStateCpuReadyPct
-import org.opendc.trace.conv.resourceStateCpuUsage
-import org.opendc.trace.conv.resourceStateCpuUsagePct
-import org.opendc.trace.conv.resourceStateDiskRead
-import org.opendc.trace.conv.resourceStateDiskWrite
-import org.opendc.trace.conv.resourceStateTimestamp
-import org.opendc.trace.spi.TableDetails
-import org.opendc.trace.spi.TraceFormat
-import org.opendc.trace.util.CompositeTableReader
-import java.nio.file.Files
-import java.nio.file.Path
-import java.util.stream.Collectors
-import kotlin.io.path.bufferedReader
-import kotlin.io.path.extension
-import kotlin.io.path.nameWithoutExtension
-
-/**
- * A format implementation for the extended Bitbrains trace format.
- */
-public class BitbrainsExTraceFormat : TraceFormat {
- /**
- * The name of this trace format.
- */
- override val name: String = "bitbrains-ex"
-
- override fun create(path: Path) {
- throw UnsupportedOperationException("Writing not supported for this format")
- }
-
- override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCE_STATES)
-
- override fun getDetails(
- path: Path,
- table: String,
- ): TableDetails {
- return when (table) {
- TABLE_RESOURCE_STATES ->
- TableDetails(
- listOf(
- TableColumn(resourceID, TableColumnType.String),
- TableColumn(resourceClusterID, TableColumnType.String),
- TableColumn(resourceStateTimestamp, TableColumnType.Instant),
- TableColumn(resourceCpuCount, TableColumnType.Int),
- TableColumn(resourceCpuCapacity, TableColumnType.Double),
- TableColumn(resourceStateCpuUsage, TableColumnType.Double),
- TableColumn(resourceStateCpuUsagePct, TableColumnType.Double),
- TableColumn(resourceStateCpuDemand, TableColumnType.Double),
- TableColumn(resourceStateCpuReadyPct, TableColumnType.Double),
- TableColumn(resourceMemCapacity, TableColumnType.Double),
- TableColumn(resourceStateDiskRead, TableColumnType.Double),
- TableColumn(resourceStateDiskWrite, TableColumnType.Double),
- ),
- )
- else -> throw IllegalArgumentException("Table $table not supported")
- }
- }
-
- override fun newReader(
- path: Path,
- table: String,
- projection: List<String>?,
- ): TableReader {
- return when (table) {
- TABLE_RESOURCE_STATES -> newResourceStateReader(path)
- else -> throw IllegalArgumentException("Table $table not supported")
- }
- }
-
- override fun newWriter(
- path: Path,
- table: String,
- ): TableWriter {
- throw UnsupportedOperationException("Writing not supported for this format")
- }
-
- /**
- * Construct a [TableReader] for reading over all resource state partitions.
- */
- private fun newResourceStateReader(path: Path): TableReader {
- val partitions =
- Files.walk(path, 1)
- .filter { !Files.isDirectory(it) && it.extension == "txt" }
- .collect(Collectors.toMap({ it.nameWithoutExtension }, { it }))
- .toSortedMap()
- val it = partitions.iterator()
-
- return object : CompositeTableReader() {
- override fun nextReader(): TableReader? {
- return if (it.hasNext()) {
- val (_, partPath) = it.next()
- return BitbrainsExResourceStateTableReader(partPath.bufferedReader())
- } else {
- null
- }
- }
-
- override fun toString(): String = "BitbrainsExCompositeTableReader"
- }
- }
-}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsResourceStateTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsResourceStateTableReader.kt
deleted file mode 100644
index e264fccb..00000000
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsResourceStateTableReader.kt
+++ /dev/null
@@ -1,365 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.bitbrains
-
-import com.fasterxml.jackson.core.JsonParseException
-import com.fasterxml.jackson.core.JsonToken
-import com.fasterxml.jackson.dataformat.csv.CsvParser
-import com.fasterxml.jackson.dataformat.csv.CsvSchema
-import org.opendc.trace.TableReader
-import org.opendc.trace.conv.resourceCpuCapacity
-import org.opendc.trace.conv.resourceCpuCount
-import org.opendc.trace.conv.resourceID
-import org.opendc.trace.conv.resourceMemCapacity
-import org.opendc.trace.conv.resourceStateCpuUsage
-import org.opendc.trace.conv.resourceStateCpuUsagePct
-import org.opendc.trace.conv.resourceStateDiskRead
-import org.opendc.trace.conv.resourceStateDiskWrite
-import org.opendc.trace.conv.resourceStateMemUsage
-import org.opendc.trace.conv.resourceStateNetRx
-import org.opendc.trace.conv.resourceStateNetTx
-import org.opendc.trace.conv.resourceStateTimestamp
-import java.text.NumberFormat
-import java.time.Duration
-import java.time.Instant
-import java.time.LocalDateTime
-import java.time.ZoneOffset
-import java.time.format.DateTimeFormatter
-import java.time.format.DateTimeParseException
-import java.util.Locale
-import java.util.UUID
-
-/**
- * A [TableReader] for the Bitbrains resource state table.
- */
-internal class BitbrainsResourceStateTableReader(private val partition: String, private val parser: CsvParser) : TableReader {
- /**
- * A flag to indicate whether a single row has been read already.
- */
- private var isStarted = false
-
- /**
- * The [DateTimeFormatter] used to parse the timestamps in case of the Materna trace.
- */
- private val formatter = DateTimeFormatter.ofPattern("dd.MM.yyyy HH:mm:ss")
-
- /**
- * The type of timestamps in the trace.
- */
- private var timestampType: TimestampType = TimestampType.UNDECIDED
-
- /**
- * The [NumberFormat] used to parse doubles containing a comma.
- */
- private val nf = NumberFormat.getInstance(Locale.GERMAN)
-
- /**
- * A flag to indicate that the trace contains decimals with a comma separator.
- */
- private var usesCommaDecimalSeparator = false
-
- init {
- parser.schema = schema
- }
-
- override fun nextRow(): Boolean {
- if (!isStarted) {
- isStarted = true
- }
-
- // Reset the row state
- reset()
-
- if (!nextStart()) {
- return false
- }
-
- while (true) {
- val token = parser.nextValue()
-
- if (token == null || token == JsonToken.END_OBJECT) {
- break
- }
-
- when (parser.currentName) {
- "Timestamp [ms]" -> {
- timestamp =
- when (timestampType) {
- TimestampType.UNDECIDED -> {
- try {
- val res = LocalDateTime.parse(parser.text, formatter).toInstant(ZoneOffset.UTC)
- timestampType = TimestampType.DATE_TIME
- res
- } catch (e: DateTimeParseException) {
- timestampType = TimestampType.EPOCH_MILLIS
- Instant.ofEpochSecond(parser.longValue)
- }
- }
- TimestampType.DATE_TIME -> LocalDateTime.parse(parser.text, formatter).toInstant(ZoneOffset.UTC)
- TimestampType.EPOCH_MILLIS -> Instant.ofEpochSecond(parser.longValue)
- }
- }
- "CPU cores" -> cpuCores = parser.intValue
- "CPU capacity provisioned [MHZ]" -> cpuCapacity = parseSafeDouble()
- "CPU usage [MHZ]" -> cpuUsage = parseSafeDouble()
- "CPU usage [%]" -> cpuUsagePct = parseSafeDouble() / 100.0 // Convert to range [0, 1]
- "Memory capacity provisioned [KB]" -> memCapacity = parseSafeDouble()
- "Memory usage [KB]" -> memUsage = parseSafeDouble()
- "Disk read throughput [KB/s]" -> diskRead = parseSafeDouble()
- "Disk write throughput [KB/s]" -> diskWrite = parseSafeDouble()
- "Network received throughput [KB/s]" -> netReceived = parseSafeDouble()
- "Network transmitted throughput [KB/s]" -> netTransmitted = parseSafeDouble()
- }
- }
-
- return true
- }
-
- private val colTimestamp = 0
- private val colCpuCount = 1
- private val colCpuCapacity = 2
- private val colCpuUsage = 3
- private val colCpuUsagePct = 4
- private val colMemCapacity = 5
- private val colMemUsage = 6
- private val colDiskRead = 7
- private val colDiskWrite = 8
- private val colNetRx = 9
- private val colNetTx = 10
- private val colID = 11
-
- override fun resolve(name: String): Int {
- return when (name) {
- resourceID -> colID
- resourceStateTimestamp -> colTimestamp
- resourceCpuCount -> colCpuCount
- resourceCpuCapacity -> colCpuCapacity
- resourceStateCpuUsage -> colCpuUsage
- resourceStateCpuUsagePct -> colCpuUsagePct
- resourceMemCapacity -> colMemCapacity
- resourceStateMemUsage -> colMemUsage
- resourceStateDiskRead -> colDiskRead
- resourceStateDiskWrite -> colDiskWrite
- resourceStateNetRx -> colNetRx
- resourceStateNetTx -> colNetTx
- else -> -1
- }
- }
-
- override fun isNull(index: Int): Boolean {
- require(index in 0..colID) { "Invalid column index" }
- return false
- }
-
- override fun getBoolean(index: Int): Boolean {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getInt(index: Int): Int {
- checkActive()
- return when (index) {
- colCpuCount -> cpuCores
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun getLong(index: Int): Long {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getFloat(index: Int): Float {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getDouble(index: Int): Double {
- checkActive()
- return when (index) {
- colCpuCapacity -> cpuCapacity
- colCpuUsage -> cpuUsage
- colCpuUsagePct -> cpuUsagePct
- colMemCapacity -> memCapacity
- colMemUsage -> memUsage
- colDiskRead -> diskRead
- colDiskWrite -> diskWrite
- colNetRx -> netReceived
- colNetTx -> netTransmitted
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun getString(index: Int): String {
- checkActive()
- return when (index) {
- colID -> partition
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun getUUID(index: Int): UUID? {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getInstant(index: Int): Instant? {
- checkActive()
- return when (index) {
- colTimestamp -> timestamp
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun getDuration(index: Int): Duration? {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun <T> getList(
- index: Int,
- elementType: Class<T>,
- ): List<T>? {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun <T> getSet(
- index: Int,
- elementType: Class<T>,
- ): Set<T>? {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun <K, V> getMap(
- index: Int,
- keyType: Class<K>,
- valueType: Class<V>,
- ): Map<K, V>? {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun close() {
- parser.close()
- }
-
- /**
- * Helper method to check if the reader is active.
- */
- private fun checkActive() {
- check(isStarted && !parser.isClosed) { "No active row. Did you call nextRow()?" }
- }
-
- /**
- * Advance the parser until the next object start.
- */
- private fun nextStart(): Boolean {
- var token = parser.nextValue()
-
- while (token != null && token != JsonToken.START_OBJECT) {
- token = parser.nextValue()
- }
-
- return token != null
- }
-
- /**
- * Try to parse the current value safely as double.
- */
- private fun parseSafeDouble(): Double {
- if (!usesCommaDecimalSeparator) {
- try {
- return parser.doubleValue
- } catch (e: JsonParseException) {
- usesCommaDecimalSeparator = true
- }
- }
-
- val text = parser.text
- if (text.isBlank()) {
- return 0.0
- }
-
- return nf.parse(text).toDouble()
- }
-
- /**
- * State fields of the reader.
- */
- private var timestamp: Instant? = null
- private var cpuCores = -1
- private var cpuCapacity = Double.NaN
- private var cpuUsage = Double.NaN
- private var cpuUsagePct = Double.NaN
- private var memCapacity = Double.NaN
- private var memUsage = Double.NaN
- private var diskRead = Double.NaN
- private var diskWrite = Double.NaN
- private var netReceived = Double.NaN
- private var netTransmitted = Double.NaN
-
- /**
- * Reset the state.
- */
- private fun reset() {
- timestamp = null
- cpuCores = -1
- cpuCapacity = Double.NaN
- cpuUsage = Double.NaN
- cpuUsagePct = Double.NaN
- memCapacity = Double.NaN
- memUsage = Double.NaN
- diskRead = Double.NaN
- diskWrite = Double.NaN
- netReceived = Double.NaN
- netTransmitted = Double.NaN
- }
-
- /**
- * The type of the timestamp in the trace.
- */
- private enum class TimestampType {
- UNDECIDED,
- DATE_TIME,
- EPOCH_MILLIS,
- }
-
- companion object {
- /**
- * The [CsvSchema] that is used to parse the trace.
- */
- private val schema =
- CsvSchema.builder()
- .addColumn("Timestamp [ms]", CsvSchema.ColumnType.NUMBER_OR_STRING)
- .addColumn("CPU cores", CsvSchema.ColumnType.NUMBER)
- .addColumn("CPU capacity provisioned [MHZ]", CsvSchema.ColumnType.NUMBER)
- .addColumn("CPU usage [MHZ]", CsvSchema.ColumnType.NUMBER)
- .addColumn("CPU usage [%]", CsvSchema.ColumnType.NUMBER)
- .addColumn("Memory capacity provisioned [KB]", CsvSchema.ColumnType.NUMBER)
- .addColumn("Memory usage [KB]", CsvSchema.ColumnType.NUMBER)
- .addColumn("Memory usage [%]", CsvSchema.ColumnType.NUMBER)
- .addColumn("Disk read throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
- .addColumn("Disk write throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
- .addColumn("Disk size [GB]", CsvSchema.ColumnType.NUMBER)
- .addColumn("Network received throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
- .addColumn("Network transmitted throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
- .setAllowComments(true)
- .setUseHeader(true)
- .setColumnSeparator(';')
- .build()
- }
-}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsResourceTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsResourceTableReader.kt
deleted file mode 100644
index a12785f0..00000000
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsResourceTableReader.kt
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.bitbrains
-
-import com.fasterxml.jackson.dataformat.csv.CsvFactory
-import org.opendc.trace.TableReader
-import org.opendc.trace.conv.resourceID
-import java.nio.file.Path
-import java.time.Duration
-import java.time.Instant
-import java.util.UUID
-
-/**
- * A [TableReader] for the Bitbrains resource table.
- */
-internal class BitbrainsResourceTableReader(private val factory: CsvFactory, vms: Map<String, Path>) : TableReader {
- /**
- * An iterator to iterate over the resource entries.
- */
- private val it = vms.iterator()
-
- /**
- * The state of the reader.
- */
- private var state = State.Pending
-
- override fun nextRow(): Boolean {
- if (state == State.Pending) {
- state = State.Active
- }
-
- reset()
-
- while (it.hasNext()) {
- val (name, path) = it.next()
-
- val parser = factory.createParser(path.toFile())
- val reader = BitbrainsResourceStateTableReader(name, parser)
- val idCol = reader.resolve(resourceID)
-
- try {
- if (!reader.nextRow()) {
- continue
- }
-
- id = reader.getString(idCol)
- return true
- } finally {
- reader.close()
- }
- }
-
- state = State.Closed
- return false
- }
-
- private val colID = 0
-
- override fun resolve(name: String): Int {
- return when (name) {
- resourceID -> colID
- else -> -1
- }
- }
-
- override fun isNull(index: Int): Boolean {
- require(index in 0..colID) { "Invalid column index" }
- return false
- }
-
- override fun getBoolean(index: Int): Boolean {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getInt(index: Int): Int {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getLong(index: Int): Long {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getFloat(index: Int): Float {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getDouble(index: Int): Double {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getString(index: Int): String? {
- check(state == State.Active) { "No active row" }
- return when (index) {
- colID -> id
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun getUUID(index: Int): UUID? {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getInstant(index: Int): Instant? {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getDuration(index: Int): Duration? {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun <T> getList(
- index: Int,
- elementType: Class<T>,
- ): List<T>? {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun <T> getSet(
- index: Int,
- elementType: Class<T>,
- ): Set<T>? {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun <K, V> getMap(
- index: Int,
- keyType: Class<K>,
- valueType: Class<V>,
- ): Map<K, V>? {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun close() {
- reset()
- state = State.Closed
- }
-
- /**
- * State fields of the reader.
- */
- private var id: String? = null
-
- /**
- * Reset the state of the reader.
- */
- private fun reset() {
- id = null
- }
-
- private enum class State {
- Pending,
- Active,
- Closed,
- }
-}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsTraceFormat.kt
deleted file mode 100644
index 23853077..00000000
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsTraceFormat.kt
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.bitbrains
-
-import com.fasterxml.jackson.dataformat.csv.CsvFactory
-import com.fasterxml.jackson.dataformat.csv.CsvParser
-import org.opendc.trace.TableColumn
-import org.opendc.trace.TableColumnType
-import org.opendc.trace.TableReader
-import org.opendc.trace.TableWriter
-import org.opendc.trace.conv.TABLE_RESOURCES
-import org.opendc.trace.conv.TABLE_RESOURCE_STATES
-import org.opendc.trace.conv.resourceCpuCapacity
-import org.opendc.trace.conv.resourceCpuCount
-import org.opendc.trace.conv.resourceID
-import org.opendc.trace.conv.resourceMemCapacity
-import org.opendc.trace.conv.resourceStateCpuUsage
-import org.opendc.trace.conv.resourceStateCpuUsagePct
-import org.opendc.trace.conv.resourceStateDiskRead
-import org.opendc.trace.conv.resourceStateDiskWrite
-import org.opendc.trace.conv.resourceStateMemUsage
-import org.opendc.trace.conv.resourceStateNetRx
-import org.opendc.trace.conv.resourceStateNetTx
-import org.opendc.trace.conv.resourceStateTimestamp
-import org.opendc.trace.spi.TableDetails
-import org.opendc.trace.spi.TraceFormat
-import org.opendc.trace.util.CompositeTableReader
-import java.nio.file.Files
-import java.nio.file.Path
-import java.util.stream.Collectors
-import kotlin.io.path.extension
-import kotlin.io.path.nameWithoutExtension
-
-/**
- * A format implementation for the GWF trace format.
- */
-public class BitbrainsTraceFormat : TraceFormat {
- /**
- * The name of this trace format.
- */
- override val name: String = "bitbrains"
-
- /**
- * The [CsvFactory] used to create the parser.
- */
- private val factory =
- CsvFactory()
- .enable(CsvParser.Feature.ALLOW_COMMENTS)
- .enable(CsvParser.Feature.TRIM_SPACES)
-
- override fun create(path: Path) {
- throw UnsupportedOperationException("Writing not supported for this format")
- }
-
- override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES)
-
- override fun getDetails(
- path: Path,
- table: String,
- ): TableDetails {
- return when (table) {
- TABLE_RESOURCES ->
- TableDetails(
- listOf(
- TableColumn(resourceID, TableColumnType.String),
- ),
- )
- TABLE_RESOURCE_STATES ->
- TableDetails(
- listOf(
- TableColumn(resourceID, TableColumnType.String),
- TableColumn(resourceStateTimestamp, TableColumnType.Instant),
- TableColumn(resourceCpuCount, TableColumnType.Int),
- TableColumn(resourceCpuCapacity, TableColumnType.Double),
- TableColumn(resourceStateCpuUsage, TableColumnType.Double),
- TableColumn(resourceStateCpuUsagePct, TableColumnType.Double),
- TableColumn(resourceMemCapacity, TableColumnType.Double),
- TableColumn(resourceStateMemUsage, TableColumnType.Double),
- TableColumn(resourceStateDiskRead, TableColumnType.Double),
- TableColumn(resourceStateDiskWrite, TableColumnType.Double),
- TableColumn(resourceStateNetRx, TableColumnType.Double),
- TableColumn(resourceStateNetTx, TableColumnType.Double),
- ),
- )
- else -> throw IllegalArgumentException("Table $table not supported")
- }
- }
-
- override fun newReader(
- path: Path,
- table: String,
- projection: List<String>?,
- ): TableReader {
- return when (table) {
- TABLE_RESOURCES -> {
- val vms =
- Files.walk(path, 1)
- .filter { !Files.isDirectory(it) && it.extension == "csv" }
- .collect(Collectors.toMap({ it.nameWithoutExtension }, { it }))
- .toSortedMap()
- BitbrainsResourceTableReader(factory, vms)
- }
- TABLE_RESOURCE_STATES -> newResourceStateReader(path)
- else -> throw IllegalArgumentException("Table $table not supported")
- }
- }
-
- override fun newWriter(
- path: Path,
- table: String,
- ): TableWriter {
- throw UnsupportedOperationException("Writing not supported for this format")
- }
-
- /**
- * Construct a [TableReader] for reading over all resource state partitions.
- */
- private fun newResourceStateReader(path: Path): TableReader {
- val partitions =
- Files.walk(path, 1)
- .filter { !Files.isDirectory(it) && it.extension == "csv" }
- .collect(Collectors.toMap({ it.nameWithoutExtension }, { it }))
- .toSortedMap()
- val it = partitions.iterator()
-
- return object : CompositeTableReader() {
- override fun nextReader(): TableReader? {
- return if (it.hasNext()) {
- val (partition, partPath) = it.next()
- return BitbrainsResourceStateTableReader(partition, factory.createParser(partPath.toFile()))
- } else {
- null
- }
- }
-
- override fun toString(): String = "BitbrainsCompositeTableReader"
- }
- }
-}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/CarbonTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/CarbonTableReader.kt
index 226c8806..c5face9f 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/CarbonTableReader.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/CarbonTableReader.kt
@@ -23,9 +23,9 @@
package org.opendc.trace.formats.carbon
import org.opendc.trace.TableReader
-import org.opendc.trace.conv.CARBON_INTENSITY_TIMESTAMP
-import org.opendc.trace.conv.CARBON_INTENSITY_VALUE
-import org.opendc.trace.formats.carbon.parquet.CarbonIntensityFragment
+import org.opendc.trace.conv.CARBON_INTENSITY
+import org.opendc.trace.conv.CARBON_TIMESTAMP
+import org.opendc.trace.formats.carbon.parquet.CarbonFragment
import org.opendc.trace.util.parquet.LocalParquetReader
import java.time.Duration
import java.time.Instant
@@ -34,11 +34,11 @@ import java.util.UUID
/**
* A [TableReader] implementation for the WTF format.
*/
-internal class CarbonTableReader(private val reader: LocalParquetReader<CarbonIntensityFragment>) : TableReader {
+internal class CarbonTableReader(private val reader: LocalParquetReader<CarbonFragment>) : TableReader {
/**
* The current record.
*/
- private var record: CarbonIntensityFragment? = null
+ private var record: CarbonFragment? = null
override fun nextRow(): Boolean {
try {
@@ -57,8 +57,8 @@ internal class CarbonTableReader(private val reader: LocalParquetReader<CarbonIn
override fun resolve(name: String): Int {
return when (name) {
- CARBON_INTENSITY_TIMESTAMP -> colTimestamp
- CARBON_INTENSITY_VALUE -> colCarbonIntensity
+ CARBON_TIMESTAMP -> colTimestamp
+ CARBON_INTENSITY -> colCarbonIntensity
else -> -1
}
}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/CarbonTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/CarbonTraceFormat.kt
index d8adc739..764bb349 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/CarbonTraceFormat.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/CarbonTraceFormat.kt
@@ -26,10 +26,10 @@ import org.opendc.trace.TableColumn
import org.opendc.trace.TableColumnType
import org.opendc.trace.TableReader
import org.opendc.trace.TableWriter
-import org.opendc.trace.conv.CARBON_INTENSITY_TIMESTAMP
-import org.opendc.trace.conv.CARBON_INTENSITY_VALUE
-import org.opendc.trace.conv.TABLE_CARBON_INTENSITIES
-import org.opendc.trace.formats.carbon.parquet.CarbonIntensityReadSupport
+import org.opendc.trace.conv.CARBON_INTENSITY
+import org.opendc.trace.conv.CARBON_TIMESTAMP
+import org.opendc.trace.conv.TABLE_CARBON
+import org.opendc.trace.formats.carbon.parquet.CarbonReadSupport
import org.opendc.trace.spi.TableDetails
import org.opendc.trace.spi.TraceFormat
import org.opendc.trace.util.parquet.LocalParquetReader
@@ -45,18 +45,18 @@ public class CarbonTraceFormat : TraceFormat {
throw UnsupportedOperationException("Writing not supported for this format")
}
- override fun getTables(path: Path): List<String> = listOf(TABLE_CARBON_INTENSITIES)
+ override fun getTables(path: Path): List<String> = listOf(TABLE_CARBON)
override fun getDetails(
path: Path,
table: String,
): TableDetails {
return when (table) {
- TABLE_CARBON_INTENSITIES ->
+ TABLE_CARBON ->
TableDetails(
listOf(
- TableColumn(CARBON_INTENSITY_TIMESTAMP, TableColumnType.Instant),
- TableColumn(CARBON_INTENSITY_VALUE, TableColumnType.Double),
+ TableColumn(CARBON_TIMESTAMP, TableColumnType.Instant),
+ TableColumn(CARBON_INTENSITY, TableColumnType.Double),
),
)
else -> throw IllegalArgumentException("Table $table not supported")
@@ -69,8 +69,8 @@ public class CarbonTraceFormat : TraceFormat {
projection: List<String>?,
): TableReader {
return when (table) {
- TABLE_CARBON_INTENSITIES -> {
- val reader = LocalParquetReader(path, CarbonIntensityReadSupport(projection))
+ TABLE_CARBON -> {
+ val reader = LocalParquetReader(path, CarbonReadSupport(projection))
CarbonTableReader(reader)
}
else -> throw IllegalArgumentException("Table $table not supported")
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityFragment.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonFragment.kt
index 3211cb6c..fe05876b 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityFragment.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonFragment.kt
@@ -27,7 +27,7 @@ import java.time.Instant
/**
* A task in the Workflow Trace Format.
*/
-internal data class CarbonIntensityFragment(
+internal data class CarbonFragment(
val timestamp: Instant,
val carbonIntensity: Double,
)
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityReadSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonReadSupport.kt
index 2f4eac05..53087079 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityReadSupport.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonReadSupport.kt
@@ -26,26 +26,24 @@ import org.apache.hadoop.conf.Configuration
import org.apache.parquet.hadoop.api.InitContext
import org.apache.parquet.hadoop.api.ReadSupport
import org.apache.parquet.io.api.RecordMaterializer
-import org.apache.parquet.schema.LogicalTypeAnnotation
import org.apache.parquet.schema.MessageType
-import org.apache.parquet.schema.PrimitiveType
import org.apache.parquet.schema.Types
-import org.opendc.trace.conv.CARBON_INTENSITY_TIMESTAMP
-import org.opendc.trace.conv.CARBON_INTENSITY_VALUE
+import org.opendc.trace.conv.CARBON_INTENSITY
+import org.opendc.trace.conv.CARBON_TIMESTAMP
/**
- * A [ReadSupport] instance for [Task] objects.
+ * A [ReadSupport] instance for [CarbonFragment] objects.
*
* @param projection The projection of the table to read.
*/
-internal class CarbonIntensityReadSupport(private val projection: List<String>?) : ReadSupport<CarbonIntensityFragment>() {
+internal class CarbonReadSupport(private val projection: List<String>?) : ReadSupport<CarbonFragment>() {
/**
* Mapping of table columns to their Parquet column names.
*/
private val colMap =
mapOf(
- CARBON_INTENSITY_TIMESTAMP to "timestamp",
- CARBON_INTENSITY_VALUE to "carbon_intensity",
+ CARBON_TIMESTAMP to "timestamp",
+ CARBON_INTENSITY to "carbon_intensity",
)
override fun init(context: InitContext): ReadContext {
@@ -53,16 +51,16 @@ internal class CarbonIntensityReadSupport(private val projection: List<String>?)
if (projection != null) {
Types.buildMessage()
.apply {
- val fieldByName = READ_SCHEMA.fields.associateBy { it.name }
+ val fieldByName = CARBON_SCHEMA.fields.associateBy { it.name }
for (col in projection) {
val fieldName = colMap[col] ?: continue
addField(fieldByName.getValue(fieldName))
}
}
- .named(READ_SCHEMA.name)
+ .named(CARBON_SCHEMA.name)
} else {
- READ_SCHEMA
+ CARBON_SCHEMA
}
return ReadContext(projectedSchema)
}
@@ -72,24 +70,5 @@ internal class CarbonIntensityReadSupport(private val projection: List<String>?)
keyValueMetaData: Map<String, String>,
fileSchema: MessageType,
readContext: ReadContext,
- ): RecordMaterializer<CarbonIntensityFragment> = CarbonIntensityRecordMaterializer(readContext.requestedSchema)
-
- companion object {
- /**
- * Parquet read schema for the "tasks" table in the trace.
- */
- @JvmStatic
- val READ_SCHEMA: MessageType =
- Types.buildMessage()
- .addFields(
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT64)
- .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("timestamp"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("carbon_intensity"),
- )
- .named("carbon_intensity_fragment")
- }
+ ): RecordMaterializer<CarbonFragment> = CarbonRecordMaterializer(readContext.requestedSchema)
}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityRecordMaterializer.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonRecordMaterializer.kt
index f5d68f22..aa915a39 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonIntensityRecordMaterializer.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonRecordMaterializer.kt
@@ -30,9 +30,9 @@ import org.apache.parquet.schema.MessageType
import java.time.Instant
/**
- * A [RecordMaterializer] for [Task] records.
+ * A [RecordMaterializer] for [CarbonFragment] records.
*/
-internal class CarbonIntensityRecordMaterializer(schema: MessageType) : RecordMaterializer<CarbonIntensityFragment>() {
+internal class CarbonRecordMaterializer(schema: MessageType) : RecordMaterializer<CarbonFragment>() {
/**
* State of current record being read.
*/
@@ -76,8 +76,8 @@ internal class CarbonIntensityRecordMaterializer(schema: MessageType) : RecordMa
override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex]
}
- override fun getCurrentRecord(): CarbonIntensityFragment =
- CarbonIntensityFragment(
+ override fun getCurrentRecord(): CarbonFragment =
+ CarbonFragment(
localTimestamp,
localCarbonIntensity,
)
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonSchemas.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonSchemas.kt
new file mode 100644
index 00000000..c8b11968
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/carbon/parquet/CarbonSchemas.kt
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2025 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.formats.carbon.parquet
+
+import org.apache.parquet.schema.LogicalTypeAnnotation
+import org.apache.parquet.schema.MessageType
+import org.apache.parquet.schema.PrimitiveType
+import org.apache.parquet.schema.Types
+
+private val CARBON_SCHEMA_v1: MessageType =
+ Types.buildMessage()
+ .addFields(
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("timestamp"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("carbon_intensity"),
+ )
+ .named("carbon_intensity_fragment")
+
+public val CARBON_SCHEMA: MessageType = CARBON_SCHEMA_v1
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/parquet/FailureReadSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/parquet/FailureReadSupport.kt
index d49f86c6..9bd8fd72 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/parquet/FailureReadSupport.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/parquet/FailureReadSupport.kt
@@ -27,14 +27,13 @@ import org.apache.parquet.hadoop.api.InitContext
import org.apache.parquet.hadoop.api.ReadSupport
import org.apache.parquet.io.api.RecordMaterializer
import org.apache.parquet.schema.MessageType
-import org.apache.parquet.schema.PrimitiveType
import org.apache.parquet.schema.Types
import org.opendc.trace.conv.FAILURE_DURATION
import org.opendc.trace.conv.FAILURE_INTENSITY
import org.opendc.trace.conv.FAILURE_INTERVAL
/**
- * A [ReadSupport] instance for [Task] objects.
+ * A [ReadSupport] instance for [FailureFragment] objects.
*
* @param projection The projection of the table to read.
*/
@@ -54,16 +53,16 @@ internal class FailureReadSupport(private val projection: List<String>?) : ReadS
if (projection != null) {
Types.buildMessage()
.apply {
- val fieldByName = READ_SCHEMA.fields.associateBy { it.name }
+ val fieldByName = FAILURE_SCHEMA.fields.associateBy { it.name }
for (col in projection) {
val fieldName = colMap[col] ?: continue
addField(fieldByName.getValue(fieldName))
}
}
- .named(READ_SCHEMA.name)
+ .named(FAILURE_SCHEMA.name)
} else {
- READ_SCHEMA
+ FAILURE_SCHEMA
}
return ReadContext(projectedSchema)
}
@@ -74,25 +73,4 @@ internal class FailureReadSupport(private val projection: List<String>?) : ReadS
fileSchema: MessageType,
readContext: ReadContext,
): RecordMaterializer<FailureFragment> = FailureRecordMaterializer(readContext.requestedSchema)
-
- companion object {
- /**
- * Parquet read schema for the "tasks" table in the trace.
- */
- @JvmStatic
- val READ_SCHEMA: MessageType =
- Types.buildMessage()
- .addFields(
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT64)
- .named("failure_interval"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT64)
- .named("failure_duration"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("failure_intensity"),
- )
- .named("failure_fragment")
- }
}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/parquet/FailureRecordMaterializer.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/parquet/FailureRecordMaterializer.kt
index 5a00f8c9..83281984 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/parquet/FailureRecordMaterializer.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/parquet/FailureRecordMaterializer.kt
@@ -29,7 +29,7 @@ import org.apache.parquet.io.api.RecordMaterializer
import org.apache.parquet.schema.MessageType
/**
- * A [RecordMaterializer] for [Task] records.
+ * A [RecordMaterializer] for [FailureFragment] records.
*/
internal class FailureRecordMaterializer(schema: MessageType) : RecordMaterializer<FailureFragment>() {
/**
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/parquet/FailureSchemas.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/parquet/FailureSchemas.kt
new file mode 100644
index 00000000..bafac387
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/failure/parquet/FailureSchemas.kt
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2025 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.formats.failure.parquet
+
+import org.apache.parquet.schema.MessageType
+import org.apache.parquet.schema.PrimitiveType
+import org.apache.parquet.schema.Types
+
+private val FAILURE_SCHEMA_v1: MessageType =
+ Types.buildMessage()
+ .addFields(
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("failure_interval"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("failure_duration"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("failure_intensity"),
+ )
+ .named("failure_fragment")
+
+public val FAILURE_SCHEMA: MessageType = FAILURE_SCHEMA_v1
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/gwf/GwfTaskTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/gwf/GwfTaskTableReader.kt
deleted file mode 100644
index 8a2a99cb..00000000
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/gwf/GwfTaskTableReader.kt
+++ /dev/null
@@ -1,286 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.gwf
-
-import com.fasterxml.jackson.core.JsonToken
-import com.fasterxml.jackson.dataformat.csv.CsvParser
-import com.fasterxml.jackson.dataformat.csv.CsvSchema
-import org.opendc.trace.TableColumnType
-import org.opendc.trace.TableReader
-import org.opendc.trace.conv.TASK_ALLOC_NCPUS
-import org.opendc.trace.conv.TASK_ID
-import org.opendc.trace.conv.TASK_PARENTS
-import org.opendc.trace.conv.TASK_REQ_NCPUS
-import org.opendc.trace.conv.TASK_RUNTIME
-import org.opendc.trace.conv.TASK_SUBMIT_TIME
-import org.opendc.trace.conv.TASK_WORKFLOW_ID
-import org.opendc.trace.util.convertTo
-import java.time.Duration
-import java.time.Instant
-import java.util.UUID
-import java.util.regex.Pattern
-
-/**
- * A [TableReader] implementation for the GWF format.
- */
-internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader {
- /**
- * A flag to indicate whether a single row has been read already.
- */
- private var isStarted = false
-
- init {
- parser.schema = schema
- }
-
- override fun nextRow(): Boolean {
- if (!isStarted) {
- isStarted = true
- }
-
- // Reset the row state
- reset()
-
- if (parser.isClosed || !nextStart()) {
- return false
- }
-
- while (true) {
- val token = parser.nextValue()
-
- if (token == null || token == JsonToken.END_OBJECT) {
- break
- }
-
- when (parser.currentName) {
- "WorkflowID" -> workflowId = parser.text
- "JobID" -> jobId = parser.text
- "SubmitTime" -> submitTime = Instant.ofEpochSecond(parser.longValue)
- "RunTime" -> runtime = Duration.ofSeconds(parser.longValue)
- "NProcs" -> nProcs = parser.intValue
- "ReqNProcs" -> reqNProcs = parser.intValue
- "Dependencies" -> dependencies = parseParents(parser.valueAsString)
- }
- }
-
- return true
- }
-
- override fun resolve(name: String): Int {
- return when (name) {
- TASK_ID -> colJobID
- TASK_WORKFLOW_ID -> colWorkflowID
- TASK_SUBMIT_TIME -> colSubmitTime
- TASK_RUNTIME -> colRuntime
- TASK_ALLOC_NCPUS -> colNproc
- TASK_REQ_NCPUS -> colReqNproc
- TASK_PARENTS -> colDeps
- else -> -1
- }
- }
-
- override fun isNull(index: Int): Boolean {
- require(index in 0..colDeps) { "Invalid column" }
- return false
- }
-
- override fun getBoolean(index: Int): Boolean {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getInt(index: Int): Int {
- checkActive()
- return when (index) {
- colReqNproc -> reqNProcs
- colNproc -> nProcs
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun getLong(index: Int): Long {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getFloat(index: Int): Float {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getDouble(index: Int): Double {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getString(index: Int): String? {
- checkActive()
- return when (index) {
- colJobID -> jobId
- colWorkflowID -> workflowId
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun getUUID(index: Int): UUID? {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getInstant(index: Int): Instant? {
- checkActive()
- return when (index) {
- colSubmitTime -> submitTime
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun getDuration(index: Int): Duration? {
- checkActive()
- return when (index) {
- colRuntime -> runtime
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun <T> getList(
- index: Int,
- elementType: Class<T>,
- ): List<T>? {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun <K, V> getMap(
- index: Int,
- keyType: Class<K>,
- valueType: Class<V>,
- ): Map<K, V>? {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun <T> getSet(
- index: Int,
- elementType: Class<T>,
- ): Set<T>? {
- checkActive()
- return when (index) {
- colDeps -> typeDeps.convertTo(dependencies, elementType)
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun close() {
- parser.close()
- }
-
- /**
- * Helper method to check if the reader is active.
- */
- private fun checkActive() {
- check(isStarted && !parser.isClosed) { "No active row. Did you call nextRow()?" }
- }
-
- /**
- * The pattern used to parse the parents.
- */
- private val pattern = Pattern.compile("\\s+")
-
- /**
- * Parse the parents into a set of longs.
- */
- private fun parseParents(value: String): Set<String> {
- val result = mutableSetOf<String>()
- val deps = value.split(pattern)
-
- for (dep in deps) {
- if (dep.isBlank()) {
- continue
- }
-
- result.add(dep)
- }
-
- return result
- }
-
- /**
- * Advance the parser until the next object start.
- */
- private fun nextStart(): Boolean {
- var token = parser.nextValue()
-
- while (token != null && token != JsonToken.START_OBJECT) {
- token = parser.nextValue()
- }
-
- return token != null
- }
-
- /**
- * Reader state fields.
- */
- private var workflowId: String? = null
- private var jobId: String? = null
- private var submitTime: Instant? = null
- private var runtime: Duration? = null
- private var nProcs = -1
- private var reqNProcs = -1
- private var dependencies = emptySet<String>()
-
- /**
- * Reset the state.
- */
- private fun reset() {
- workflowId = null
- jobId = null
- submitTime = null
- runtime = null
- nProcs = -1
- reqNProcs = -1
- dependencies = emptySet()
- }
-
- private val colWorkflowID = 0
- private val colJobID = 1
- private val colSubmitTime = 2
- private val colRuntime = 3
- private val colNproc = 4
- private val colReqNproc = 5
- private val colDeps = 6
-
- private val typeDeps = TableColumnType.Set(TableColumnType.String)
-
- companion object {
- /**
- * The [CsvSchema] that is used to parse the trace.
- */
- private val schema =
- CsvSchema.builder()
- .addColumn("WorkflowID", CsvSchema.ColumnType.NUMBER)
- .addColumn("JobID", CsvSchema.ColumnType.NUMBER)
- .addColumn("SubmitTime", CsvSchema.ColumnType.NUMBER)
- .addColumn("RunTime", CsvSchema.ColumnType.NUMBER)
- .addColumn("NProcs", CsvSchema.ColumnType.NUMBER)
- .addColumn("ReqNProcs", CsvSchema.ColumnType.NUMBER)
- .addColumn("Dependencies", CsvSchema.ColumnType.STRING)
- .setAllowComments(true)
- .setUseHeader(true)
- .setColumnSeparator(',')
- .build()
- }
-}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/gwf/GwfTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/gwf/GwfTraceFormat.kt
deleted file mode 100644
index 097c5593..00000000
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/gwf/GwfTraceFormat.kt
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.gwf
-
-import com.fasterxml.jackson.dataformat.csv.CsvFactory
-import com.fasterxml.jackson.dataformat.csv.CsvParser
-import org.opendc.trace.TableColumn
-import org.opendc.trace.TableColumnType
-import org.opendc.trace.TableReader
-import org.opendc.trace.TableWriter
-import org.opendc.trace.conv.TABLE_TASKS
-import org.opendc.trace.conv.TASK_ALLOC_NCPUS
-import org.opendc.trace.conv.TASK_ID
-import org.opendc.trace.conv.TASK_PARENTS
-import org.opendc.trace.conv.TASK_REQ_NCPUS
-import org.opendc.trace.conv.TASK_RUNTIME
-import org.opendc.trace.conv.TASK_SUBMIT_TIME
-import org.opendc.trace.conv.TASK_WORKFLOW_ID
-import org.opendc.trace.spi.TableDetails
-import org.opendc.trace.spi.TraceFormat
-import java.nio.file.Path
-
-/**
- * A [TraceFormat] implementation for the GWF trace format.
- */
-public class GwfTraceFormat : TraceFormat {
- /**
- * The name of this trace format.
- */
- override val name: String = "gwf"
-
- /**
- * The [CsvFactory] used to create the parser.
- */
- private val factory =
- CsvFactory()
- .enable(CsvParser.Feature.ALLOW_COMMENTS)
- .enable(CsvParser.Feature.TRIM_SPACES)
-
- override fun create(path: Path) {
- throw UnsupportedOperationException("Writing not supported for this format")
- }
-
- override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS)
-
- override fun getDetails(
- path: Path,
- table: String,
- ): TableDetails {
- return when (table) {
- TABLE_TASKS ->
- TableDetails(
- listOf(
- TableColumn(TASK_WORKFLOW_ID, TableColumnType.String),
- TableColumn(TASK_ID, TableColumnType.String),
- TableColumn(TASK_SUBMIT_TIME, TableColumnType.Instant),
- TableColumn(TASK_RUNTIME, TableColumnType.Duration),
- TableColumn(TASK_REQ_NCPUS, TableColumnType.Int),
- TableColumn(TASK_ALLOC_NCPUS, TableColumnType.Int),
- TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)),
- ),
- )
- else -> throw IllegalArgumentException("Table $table not supported")
- }
- }
-
- override fun newReader(
- path: Path,
- table: String,
- projection: List<String>?,
- ): TableReader {
- return when (table) {
- TABLE_TASKS -> GwfTaskTableReader(factory.createParser(path.toFile()))
- else -> throw IllegalArgumentException("Table $table not supported")
- }
- }
-
- override fun newWriter(
- path: Path,
- table: String,
- ): TableWriter {
- throw UnsupportedOperationException("Writing not supported for this format")
- }
-}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmInterferenceJsonTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmInterferenceJsonTableReader.kt
deleted file mode 100644
index dba971d7..00000000
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmInterferenceJsonTableReader.kt
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.formats.opendc
-
-import com.fasterxml.jackson.core.JsonParseException
-import com.fasterxml.jackson.core.JsonParser
-import com.fasterxml.jackson.core.JsonToken
-import org.opendc.trace.TableColumnType
-import org.opendc.trace.TableReader
-import org.opendc.trace.conv.INTERFERENCE_GROUP_MEMBERS
-import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE
-import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET
-import org.opendc.trace.util.convertTo
-import java.time.Duration
-import java.time.Instant
-import java.util.UUID
-
-/**
- * A [TableReader] implementation for the OpenDC VM interference JSON format.
- */
-internal class OdcVmInterferenceJsonTableReader(private val parser: JsonParser) : TableReader {
- /**
- * A flag to indicate whether a single row has been read already.
- */
- private var isStarted = false
-
- override fun nextRow(): Boolean {
- if (!isStarted) {
- isStarted = true
-
- parser.nextToken()
-
- if (!parser.isExpectedStartArrayToken) {
- throw JsonParseException(parser, "Expected array at start, but got ${parser.currentToken()}")
- }
- }
-
- return if (parser.isClosed || parser.nextToken() == JsonToken.END_ARRAY) {
- parser.close()
- reset()
- false
- } else {
- parseGroup(parser)
- true
- }
- }
-
- private val colMembers = 0
- private val colTarget = 1
- private val colScore = 2
-
- private val typeMembers = TableColumnType.Set(TableColumnType.String)
-
- override fun resolve(name: String): Int {
- return when (name) {
- INTERFERENCE_GROUP_MEMBERS -> colMembers
- INTERFERENCE_GROUP_TARGET -> colTarget
- INTERFERENCE_GROUP_SCORE -> colScore
- else -> -1
- }
- }
-
- override fun isNull(index: Int): Boolean {
- return when (index) {
- colMembers, colTarget, colScore -> false
- else -> throw IllegalArgumentException("Invalid column index $index")
- }
- }
-
- override fun getBoolean(index: Int): Boolean {
- throw IllegalArgumentException("Invalid column $index")
- }
-
- override fun getInt(index: Int): Int {
- throw IllegalArgumentException("Invalid column $index")
- }
-
- override fun getLong(index: Int): Long {
- throw IllegalArgumentException("Invalid column $index")
- }
-
- override fun getFloat(index: Int): Float {
- throw IllegalArgumentException("Invalid column $index")
- }
-
- override fun getDouble(index: Int): Double {
- checkActive()
- return when (index) {
- colTarget -> targetLoad
- colScore -> score
- else -> throw IllegalArgumentException("Invalid column $index")
- }
- }
-
- override fun getString(index: Int): String? {
- throw IllegalArgumentException("Invalid column $index")
- }
-
- override fun getUUID(index: Int): UUID? {
- throw IllegalArgumentException("Invalid column $index")
- }
-
- override fun getInstant(index: Int): Instant? {
- throw IllegalArgumentException("Invalid column $index")
- }
-
- override fun getDuration(index: Int): Duration? {
- throw IllegalArgumentException("Invalid column $index")
- }
-
- override fun <T> getList(
- index: Int,
- elementType: Class<T>,
- ): List<T>? {
- throw IllegalArgumentException("Invalid column $index")
- }
-
- override fun <T> getSet(
- index: Int,
- elementType: Class<T>,
- ): Set<T>? {
- checkActive()
- return when (index) {
- colMembers -> typeMembers.convertTo(members, elementType)
- else -> throw IllegalArgumentException("Invalid column $index")
- }
- }
-
- override fun <K, V> getMap(
- index: Int,
- keyType: Class<K>,
- valueType: Class<V>,
- ): Map<K, V>? {
- throw IllegalArgumentException("Invalid column $index")
- }
-
- override fun close() {
- parser.close()
- }
-
- private var members = emptySet<String>()
- private var targetLoad = Double.POSITIVE_INFINITY
- private var score = 1.0
-
- /**
- * Helper method to check if the reader is active.
- */
- private fun checkActive() {
- check(isStarted && !parser.isClosed) { "No active row. Did you call nextRow()?" }
- }
-
- /**
- * Reset the state.
- */
- private fun reset() {
- members = emptySet()
- targetLoad = Double.POSITIVE_INFINITY
- score = 1.0
- }
-
- /**
- * Parse a group an interference JSON file.
- */
- private fun parseGroup(parser: JsonParser) {
- var targetLoad = Double.POSITIVE_INFINITY
- var score = 1.0
- val members = mutableSetOf<String>()
-
- if (!parser.isExpectedStartObjectToken) {
- throw JsonParseException(parser, "Expected object, but got ${parser.currentToken()}")
- }
-
- while (parser.nextValue() != JsonToken.END_OBJECT) {
- when (parser.currentName) {
- "vms" -> parseGroupMembers(parser, members)
- "minServerLoad" -> targetLoad = parser.doubleValue
- "performanceScore" -> score = parser.doubleValue
- }
- }
-
- this.members = members
- this.targetLoad = targetLoad
- this.score = score
- }
-
- /**
- * Parse the members of a group.
- */
- private fun parseGroupMembers(
- parser: JsonParser,
- members: MutableSet<String>,
- ) {
- if (!parser.isExpectedStartArrayToken) {
- throw JsonParseException(parser, "Expected array for group members, but got ${parser.currentToken()}")
- }
-
- while (parser.nextValue() != JsonToken.END_ARRAY) {
- if (parser.currentToken() != JsonToken.VALUE_STRING) {
- throw JsonParseException(parser, "Expected string value for group member")
- }
-
- members.add(parser.text)
- }
- }
-}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmInterferenceJsonTableWriter.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmInterferenceJsonTableWriter.kt
deleted file mode 100644
index b3286a1c..00000000
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmInterferenceJsonTableWriter.kt
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.formats.opendc
-
-import com.fasterxml.jackson.core.JsonGenerator
-import org.opendc.trace.TableWriter
-import org.opendc.trace.conv.INTERFERENCE_GROUP_MEMBERS
-import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE
-import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET
-import java.time.Duration
-import java.time.Instant
-import java.util.UUID
-
-/**
- * A [TableWriter] implementation for the OpenDC VM interference JSON format.
- */
-internal class OdcVmInterferenceJsonTableWriter(private val generator: JsonGenerator) : TableWriter {
- /**
- * A flag to indicate whether a row has been started.
- */
- private var isRowActive = false
-
- init {
- generator.writeStartArray()
- }
-
- override fun startRow() {
- // Reset state
- members = emptySet()
- targetLoad = Double.POSITIVE_INFINITY
- score = 1.0
-
- // Mark row as active
- isRowActive = true
- }
-
- override fun endRow() {
- check(isRowActive) { "No active row" }
-
- generator.writeStartObject()
- generator.writeArrayFieldStart("vms")
- for (member in members) {
- generator.writeString(member)
- }
- generator.writeEndArray()
- generator.writeNumberField("minServerLoad", targetLoad)
- generator.writeNumberField("performanceScore", score)
- generator.writeEndObject()
- }
-
- override fun resolve(name: String): Int {
- return when (name) {
- INTERFERENCE_GROUP_MEMBERS -> colMembers
- INTERFERENCE_GROUP_TARGET -> colTarget
- INTERFERENCE_GROUP_SCORE -> colScore
- else -> -1
- }
- }
-
- override fun setBoolean(
- index: Int,
- value: Boolean,
- ) {
- throw IllegalArgumentException("Invalid column $index")
- }
-
- override fun setInt(
- index: Int,
- value: Int,
- ) {
- throw IllegalArgumentException("Invalid column $index")
- }
-
- override fun setLong(
- index: Int,
- value: Long,
- ) {
- throw IllegalArgumentException("Invalid column $index")
- }
-
- override fun setFloat(
- index: Int,
- value: Float,
- ) {
- throw IllegalArgumentException("Invalid column $index")
- }
-
- override fun setDouble(
- index: Int,
- value: Double,
- ) {
- check(isRowActive) { "No active row" }
-
- when (index) {
- colTarget -> targetLoad = (value as Number).toDouble()
- colScore -> score = (value as Number).toDouble()
- else -> throw IllegalArgumentException("Invalid column $index")
- }
- }
-
- override fun setString(
- index: Int,
- value: String,
- ) {
- throw IllegalArgumentException("Invalid column $index")
- }
-
- override fun setUUID(
- index: Int,
- value: UUID,
- ) {
- throw IllegalArgumentException("Invalid column $index")
- }
-
- override fun setInstant(
- index: Int,
- value: Instant,
- ) {
- throw IllegalArgumentException("Invalid column $index")
- }
-
- override fun setDuration(
- index: Int,
- value: Duration,
- ) {
- throw IllegalArgumentException("Invalid column $index")
- }
-
- override fun <T> setList(
- index: Int,
- value: List<T>,
- ) {
- throw IllegalArgumentException("Invalid column $index")
- }
-
- override fun <T> setSet(
- index: Int,
- value: Set<T>,
- ) {
- check(isRowActive) { "No active row" }
-
- @Suppress("UNCHECKED_CAST")
- when (index) {
- colMembers -> members = value as Set<String>
- else -> throw IllegalArgumentException("Invalid column index $index")
- }
- }
-
- override fun <K, V> setMap(
- index: Int,
- value: Map<K, V>,
- ) {
- throw IllegalArgumentException("Invalid column $index")
- }
-
- override fun flush() {
- generator.flush()
- }
-
- override fun close() {
- generator.writeEndArray()
- generator.close()
- }
-
- private val colMembers = 0
- private val colTarget = 1
- private val colScore = 2
-
- private var members = emptySet<String>()
- private var targetLoad = Double.POSITIVE_INFINITY
- private var score = 1.0
-}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmTraceFormat.kt
deleted file mode 100644
index 74e880be..00000000
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmTraceFormat.kt
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.formats.opendc
-
-import com.fasterxml.jackson.core.JsonEncoding
-import com.fasterxml.jackson.core.JsonFactory
-import org.apache.parquet.column.ParquetProperties
-import org.apache.parquet.hadoop.ParquetFileWriter
-import org.apache.parquet.hadoop.metadata.CompressionCodecName
-import org.opendc.trace.TableColumn
-import org.opendc.trace.TableColumnType
-import org.opendc.trace.TableReader
-import org.opendc.trace.TableWriter
-import org.opendc.trace.conv.INTERFERENCE_GROUP_MEMBERS
-import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE
-import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET
-import org.opendc.trace.conv.TABLE_INTERFERENCE_GROUPS
-import org.opendc.trace.conv.TABLE_RESOURCES
-import org.opendc.trace.conv.TABLE_RESOURCE_STATES
-import org.opendc.trace.conv.resourceChildren
-import org.opendc.trace.conv.resourceCpuCapacity
-import org.opendc.trace.conv.resourceCpuCount
-import org.opendc.trace.conv.resourceDeadline
-import org.opendc.trace.conv.resourceDuration
-import org.opendc.trace.conv.resourceGpuCapacity
-import org.opendc.trace.conv.resourceGpuCount
-import org.opendc.trace.conv.resourceID
-import org.opendc.trace.conv.resourceMemCapacity
-import org.opendc.trace.conv.resourceNature
-import org.opendc.trace.conv.resourceParents
-import org.opendc.trace.conv.resourceStateCpuUsage
-import org.opendc.trace.conv.resourceStateDuration
-import org.opendc.trace.conv.resourceStateTimestamp
-import org.opendc.trace.conv.resourceSubmissionTime
-import org.opendc.trace.formats.opendc.parquet.ResourceReadSupport
-import org.opendc.trace.formats.opendc.parquet.ResourceStateReadSupport
-import org.opendc.trace.formats.opendc.parquet.ResourceStateWriteSupport
-import org.opendc.trace.formats.opendc.parquet.ResourceWriteSupport
-import org.opendc.trace.spi.TableDetails
-import org.opendc.trace.spi.TraceFormat
-import org.opendc.trace.util.parquet.LocalParquetReader
-import org.opendc.trace.util.parquet.LocalParquetWriter
-import java.nio.file.Files
-import java.nio.file.Path
-import kotlin.io.path.exists
-
-/**
- * A [TraceFormat] implementation of the OpenDC virtual machine trace format.
- */
-public class OdcVmTraceFormat : TraceFormat {
- /**
- * A [JsonFactory] that is used to parse the JSON-based interference model.
- */
- private val jsonFactory = JsonFactory()
-
- /**
- * The name of this trace format.
- */
- override val name: String = "opendc-vm"
-
- override fun create(path: Path) {
- // Construct directory containing the trace files
- Files.createDirectories(path)
-
- val tables = getTables(path)
-
- for (table in tables) {
- val writer = newWriter(path, table)
- writer.close()
- }
- }
-
- override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES, TABLE_INTERFERENCE_GROUPS)
-
- override fun getDetails(
- path: Path,
- table: String,
- ): TableDetails {
- return when (table) {
- TABLE_RESOURCES ->
- TableDetails(
- listOf(
- TableColumn(resourceID, TableColumnType.String),
- TableColumn(resourceSubmissionTime, TableColumnType.Instant),
- TableColumn(resourceDuration, TableColumnType.Long),
- TableColumn(resourceCpuCount, TableColumnType.Int),
- TableColumn(resourceCpuCapacity, TableColumnType.Double),
- TableColumn(resourceMemCapacity, TableColumnType.Double),
- TableColumn(resourceGpuCount, TableColumnType.Int),
- TableColumn(resourceGpuCapacity, TableColumnType.Double),
- TableColumn(resourceParents, TableColumnType.Set(TableColumnType.String)),
- TableColumn(resourceChildren, TableColumnType.Set(TableColumnType.String)),
- TableColumn(resourceNature, TableColumnType.String),
- TableColumn(resourceDeadline, TableColumnType.Long),
- ),
- )
- TABLE_RESOURCE_STATES ->
- TableDetails(
- listOf(
- TableColumn(resourceID, TableColumnType.String),
- TableColumn(resourceStateTimestamp, TableColumnType.Instant),
- TableColumn(resourceStateDuration, TableColumnType.Duration),
- TableColumn(resourceCpuCount, TableColumnType.Int),
- TableColumn(resourceStateCpuUsage, TableColumnType.Double),
- ),
- )
- TABLE_INTERFERENCE_GROUPS ->
- TableDetails(
- listOf(
- TableColumn(INTERFERENCE_GROUP_MEMBERS, TableColumnType.Set(TableColumnType.String)),
- TableColumn(INTERFERENCE_GROUP_TARGET, TableColumnType.Double),
- TableColumn(INTERFERENCE_GROUP_SCORE, TableColumnType.Double),
- ),
- )
- else -> throw IllegalArgumentException("Table $table not supported")
- }
- }
-
- override fun newReader(
- path: Path,
- table: String,
- projection: List<String>?,
- ): TableReader {
- return when (table) {
- TABLE_RESOURCES -> {
- val reader = LocalParquetReader(path.resolve("tasks.parquet"), ResourceReadSupport(projection))
- OdcVmResourceTableReader(reader)
- }
- TABLE_RESOURCE_STATES -> {
- val reader = LocalParquetReader(path.resolve("fragments.parquet"), ResourceStateReadSupport(projection))
- OdcVmResourceStateTableReader(reader)
- }
- TABLE_INTERFERENCE_GROUPS -> {
- val modelPath = path.resolve("interference-model.json")
- val parser =
- if (modelPath.exists()) {
- jsonFactory.createParser(modelPath.toFile())
- } else {
- jsonFactory.createParser("[]") // If model does not exist, return empty model
- }
-
- OdcVmInterferenceJsonTableReader(parser)
- }
- else -> throw IllegalArgumentException("Table $table not supported")
- }
- }
-
- override fun newWriter(
- path: Path,
- table: String,
- ): TableWriter {
- return when (table) {
- TABLE_RESOURCES -> {
- val writer =
- LocalParquetWriter.builder(path.resolve("tasks.parquet"), ResourceWriteSupport())
- .withCompressionCodec(CompressionCodecName.ZSTD)
- .withPageWriteChecksumEnabled(true)
- .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
- .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
- .build()
- OdcVmResourceTableWriter(writer)
- }
- TABLE_RESOURCE_STATES -> {
- val writer =
- LocalParquetWriter.builder(path.resolve("fragments.parquet"), ResourceStateWriteSupport())
- .withCompressionCodec(CompressionCodecName.ZSTD)
- .withDictionaryEncoding("id", true)
- .withBloomFilterEnabled("id", true)
- .withPageWriteChecksumEnabled(true)
- .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
- .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
- .build()
- OdcVmResourceStateTableWriter(writer)
- }
- TABLE_INTERFERENCE_GROUPS -> {
- val generator = jsonFactory.createGenerator(path.resolve("interference-model.json").toFile(), JsonEncoding.UTF8)
- OdcVmInterferenceJsonTableWriter(generator)
- }
- else -> throw IllegalArgumentException("Table $table not supported")
- }
- }
-}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceReadSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceReadSupport.kt
deleted file mode 100644
index cd2ccef7..00000000
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceReadSupport.kt
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.formats.opendc.parquet
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.parquet.hadoop.api.InitContext
-import org.apache.parquet.hadoop.api.ReadSupport
-import org.apache.parquet.io.api.RecordMaterializer
-import org.apache.parquet.schema.LogicalTypeAnnotation
-import org.apache.parquet.schema.MessageType
-import org.apache.parquet.schema.PrimitiveType
-import org.apache.parquet.schema.Type
-import org.apache.parquet.schema.Types
-import org.opendc.trace.TableColumn
-import org.opendc.trace.conv.resourceChildren
-import org.opendc.trace.conv.resourceCpuCapacity
-import org.opendc.trace.conv.resourceCpuCount
-import org.opendc.trace.conv.resourceDeadline
-import org.opendc.trace.conv.resourceDuration
-import org.opendc.trace.conv.resourceGpuCapacity
-import org.opendc.trace.conv.resourceGpuCount
-import org.opendc.trace.conv.resourceID
-import org.opendc.trace.conv.resourceMemCapacity
-import org.opendc.trace.conv.resourceNature
-import org.opendc.trace.conv.resourceParents
-import org.opendc.trace.conv.resourceSubmissionTime
-
-/**
- * A [ReadSupport] instance for [Resource] objects.
- */
-internal class ResourceReadSupport(private val projection: List<String>?) : ReadSupport<Resource>() {
- /**
- * Mapping from field names to [TableColumn]s.
- */
- private val fieldMap =
- mapOf(
- "id" to resourceID,
- "submissionTime" to resourceSubmissionTime,
- "submission_time" to resourceSubmissionTime,
- "duration" to resourceDuration,
- "maxCores" to resourceCpuCount,
- "cpu_count" to resourceCpuCount,
- "cpu_capacity" to resourceCpuCapacity,
- "requiredMemory" to resourceMemCapacity,
- "mem_capacity" to resourceMemCapacity,
- "gpu_count" to resourceGpuCount,
- "gpu_capacity" to resourceGpuCapacity,
- "parents" to resourceParents,
- "children" to resourceChildren,
- "nature" to resourceNature,
- "deadline" to resourceDeadline,
- )
-
- override fun init(context: InitContext): ReadContext {
- val projectedSchema =
- if (projection != null) {
- Types.buildMessage()
- .apply {
- val projectionSet = projection.toSet()
-
- for (field in READ_SCHEMA.fields) {
- val col = fieldMap[field.name] ?: continue
- if (col in projectionSet) {
- addField(field)
- }
- }
- }
- .named(READ_SCHEMA.name)
- } else {
- READ_SCHEMA
- }
-
- return ReadContext(projectedSchema)
- }
-
- override fun prepareForRead(
- configuration: Configuration,
- keyValueMetaData: Map<String, String>,
- fileSchema: MessageType,
- readContext: ReadContext,
- ): RecordMaterializer<Resource> = ResourceRecordMaterializer(readContext.requestedSchema)
-
- companion object {
- /**
- * Parquet read schema (version 2.0) for the "resources" table in the trace.
- */
- @JvmStatic
- val READ_SCHEMA_V2_0: MessageType =
- Types.buildMessage()
- .addFields(
- Types
- .required(PrimitiveType.PrimitiveTypeName.BINARY)
- .`as`(LogicalTypeAnnotation.stringType())
- .named("id"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("submissionTime"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("duration"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("maxCores"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("requiredMemory"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.BINARY)
- .`as`(LogicalTypeAnnotation.stringType())
- .named("nature"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT64)
- .named("deadline"),
- )
- .named("resource")
-
- /**
- * Parquet read schema (version 2.1) for the "resources" table in the trace.
- */
- @JvmStatic
- val READ_SCHEMA_V2_2: MessageType =
- Types.buildMessage()
- .addFields(
- Types
- .required(PrimitiveType.PrimitiveTypeName.BINARY)
- .`as`(LogicalTypeAnnotation.stringType())
- .named("id"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("submission_time"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("duration"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("cpu_count"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("cpu_capacity"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("mem_capacity"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT32)
- .named("gpu_count"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("gpu_capacity"),
- Types
- .buildGroup(Type.Repetition.OPTIONAL)
- .addField(
- Types.repeatedGroup()
- .addField(
- Types.optional(PrimitiveType.PrimitiveTypeName.BINARY)
- .`as`(LogicalTypeAnnotation.stringType())
- .named("element"),
- )
- .named("list"),
- )
- .`as`(LogicalTypeAnnotation.listType())
- .named("parents"),
- Types
- .buildGroup(Type.Repetition.OPTIONAL)
- .addField(
- Types.repeatedGroup()
- .addField(
- Types.optional(PrimitiveType.PrimitiveTypeName.BINARY)
- .`as`(LogicalTypeAnnotation.stringType())
- .named("element"),
- )
- .named("list"),
- )
- .`as`(LogicalTypeAnnotation.listType())
- .named("children"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.BINARY)
- .`as`(LogicalTypeAnnotation.stringType())
- .named("nature"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT64)
- .named("deadline"),
- )
- .named("resource")
-
- /**
- * Parquet read schema for the "resources" table in the trace.
- */
- @JvmStatic
- val READ_SCHEMA: MessageType =
- READ_SCHEMA_V2_2
- }
-}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateReadSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateReadSupport.kt
deleted file mode 100644
index 53e594de..00000000
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateReadSupport.kt
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.formats.opendc.parquet
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.parquet.hadoop.api.InitContext
-import org.apache.parquet.hadoop.api.ReadSupport
-import org.apache.parquet.io.api.RecordMaterializer
-import org.apache.parquet.schema.LogicalTypeAnnotation
-import org.apache.parquet.schema.MessageType
-import org.apache.parquet.schema.PrimitiveType
-import org.apache.parquet.schema.Types
-import org.opendc.trace.TableColumn
-import org.opendc.trace.conv.resourceCpuCount
-import org.opendc.trace.conv.resourceID
-import org.opendc.trace.conv.resourceStateCpuUsage
-import org.opendc.trace.conv.resourceStateDuration
-import org.opendc.trace.conv.resourceStateTimestamp
-
-/**
- * A [ReadSupport] instance for [ResourceState] objects.
- */
-internal class ResourceStateReadSupport(private val projection: List<String>?) : ReadSupport<ResourceState>() {
- /**
- * Mapping from field names to [TableColumn]s.
- */
- private val fieldMap =
- mapOf(
- "id" to resourceID,
- "time" to resourceStateTimestamp,
- "timestamp" to resourceStateTimestamp,
- "duration" to resourceStateDuration,
- "cores" to resourceCpuCount,
- "cpu_count" to resourceCpuCount,
- "cpuUsage" to resourceStateCpuUsage,
- "cpu_usage" to resourceStateCpuUsage,
- )
-
- override fun init(context: InitContext): ReadContext {
- val projectedSchema =
- if (projection != null) {
- Types.buildMessage()
- .apply {
- val projectionSet = projection.toSet()
-
- for (field in READ_SCHEMA.fields) {
- val col = fieldMap[field.name] ?: continue
- if (col in projectionSet) {
- addField(field)
- }
- }
- }
- .named(READ_SCHEMA.name)
- } else {
- READ_SCHEMA
- }
-
- return ReadContext(projectedSchema)
- }
-
- override fun prepareForRead(
- configuration: Configuration,
- keyValueMetaData: Map<String, String>,
- fileSchema: MessageType,
- readContext: ReadContext,
- ): RecordMaterializer<ResourceState> = ResourceStateRecordMaterializer(readContext.requestedSchema)
-
- companion object {
- /**
- * Parquet read schema (version 2.0) for the "resource states" table in the trace.
- */
- @JvmStatic
- val READ_SCHEMA_V2_0: MessageType =
- Types.buildMessage()
- .addFields(
- Types
- .required(PrimitiveType.PrimitiveTypeName.BINARY)
- .`as`(LogicalTypeAnnotation.stringType())
- .named("id"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("time"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("duration"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("cores"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("cpuUsage"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT32)
- .named("gpuCount"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("gpuUsage"),
- )
- .named("resource_state")
-
- /**
- * Parquet read schema (version 2.1) for the "resource states" table in the trace.
- */
- @JvmStatic
- val READ_SCHEMA_V2_1: MessageType =
- Types.buildMessage()
- .addFields(
- Types
- .required(PrimitiveType.PrimitiveTypeName.BINARY)
- .`as`(LogicalTypeAnnotation.stringType())
- .named("id"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("timestamp"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("duration"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("cpu_count"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("cpu_usage"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT32)
- .named("gpu_count"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("gpu_usage"),
- )
- .named("resource_state")
-
- /**
- * Parquet read schema for the "resource states" table in the trace.
- */
- @JvmStatic
- val READ_SCHEMA: MessageType = READ_SCHEMA_V2_0.union(READ_SCHEMA_V2_1)
- }
-}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/swf/SwfTaskTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/swf/SwfTaskTableReader.kt
deleted file mode 100644
index 5a79fd6f..00000000
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/swf/SwfTaskTableReader.kt
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.swf
-
-import org.opendc.trace.TableReader
-import org.opendc.trace.conv.TASK_ALLOC_NCPUS
-import org.opendc.trace.conv.TASK_GROUP_ID
-import org.opendc.trace.conv.TASK_ID
-import org.opendc.trace.conv.TASK_PARENTS
-import org.opendc.trace.conv.TASK_REQ_NCPUS
-import org.opendc.trace.conv.TASK_RUNTIME
-import org.opendc.trace.conv.TASK_STATUS
-import org.opendc.trace.conv.TASK_SUBMIT_TIME
-import org.opendc.trace.conv.TASK_USER_ID
-import org.opendc.trace.conv.TASK_WAIT_TIME
-import java.io.BufferedReader
-import java.time.Duration
-import java.time.Instant
-import java.util.UUID
-
-/**
- * A [TableReader] implementation for the SWF format.
- */
-internal class SwfTaskTableReader(private val reader: BufferedReader) : TableReader {
- /**
- * A flag to indicate the state of the reader
- */
- private var state = State.Pending
-
- /**
- * The current row.
- */
- private var fields = emptyList<String>()
-
- /**
- * A [Regex] object to match whitespace.
- */
- private val whitespace = "\\s+".toRegex()
-
- override fun nextRow(): Boolean {
- var line: String?
- var num = 0
-
- val state = state
- if (state == State.Closed) {
- return false
- } else if (state == State.Pending) {
- this.state = State.Active
- }
-
- while (true) {
- line = reader.readLine()
-
- if (line == null) {
- this.state = State.Closed
- return false
- }
- num++
-
- if (line.isBlank()) {
- // Ignore empty lines
- continue
- } else if (line.startsWith(";")) {
- // Ignore comments for now
- continue
- }
-
- break
- }
-
- fields = line!!.trim().split(whitespace)
-
- if (fields.size < 18) {
- throw IllegalArgumentException("Invalid format at line $line")
- }
-
- return true
- }
-
- override fun resolve(name: String): Int {
- return when (name) {
- TASK_ID -> colJobID
- TASK_SUBMIT_TIME -> colSubmitTime
- TASK_WAIT_TIME -> colWaitTime
- TASK_RUNTIME -> colRunTime
- TASK_ALLOC_NCPUS -> colAllocNcpus
- TASK_REQ_NCPUS -> colReqNcpus
- TASK_STATUS -> colStatus
- TASK_USER_ID -> colUserID
- TASK_GROUP_ID -> colGroupID
- TASK_PARENTS -> colParentJob
- else -> -1
- }
- }
-
- override fun isNull(index: Int): Boolean {
- require(index in colJobID..colParentThinkTime) { "Invalid column index" }
- return false
- }
-
- override fun getBoolean(index: Int): Boolean {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getInt(index: Int): Int {
- check(state == State.Active) { "No active row" }
- return when (index) {
- colReqNcpus, colAllocNcpus, colStatus, colGroupID, colUserID -> fields[index].toInt(10)
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun getLong(index: Int): Long {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getFloat(index: Int): Float {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getDouble(index: Int): Double {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getString(index: Int): String {
- check(state == State.Active) { "No active row" }
- return when (index) {
- colJobID -> fields[index]
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun getUUID(index: Int): UUID? {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getInstant(index: Int): Instant? {
- check(state == State.Active) { "No active row" }
- return when (index) {
- colSubmitTime -> Instant.ofEpochSecond(fields[index].toLong(10))
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun getDuration(index: Int): Duration? {
- check(state == State.Active) { "No active row" }
- return when (index) {
- colWaitTime, colRunTime -> Duration.ofSeconds(fields[index].toLong(10))
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun <T> getList(
- index: Int,
- elementType: Class<T>,
- ): List<T>? {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun <T> getSet(
- index: Int,
- elementType: Class<T>,
- ): Set<T>? {
- check(state == State.Active) { "No active row" }
- @Suppress("UNCHECKED_CAST")
- return when (index) {
- colParentJob -> {
- require(elementType.isAssignableFrom(String::class.java))
- val parent = fields[index].toLong(10)
- if (parent < 0) emptySet() else setOf(parent)
- }
- else -> throw IllegalArgumentException("Invalid column")
- } as Set<T>?
- }
-
- override fun <K, V> getMap(
- index: Int,
- keyType: Class<K>,
- valueType: Class<V>,
- ): Map<K, V>? {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun close() {
- reader.close()
- state = State.Closed
- }
-
- /**
- * Default column indices for the SWF format.
- */
- private val colJobID = 0
- private val colSubmitTime = 1
- private val colWaitTime = 2
- private val colRunTime = 3
- private val colAllocNcpus = 4
- private val colAvgCpuTime = 5
- private val colUsedMem = 6
- private val colReqNcpus = 7
- private val colReqTime = 8
- private val colReqMem = 9
- private val colStatus = 10
- private val colUserID = 11
- private val colGroupID = 12
- private val colExecNum = 13
- private val colQueueNum = 14
- private val colPartNum = 15
- private val colParentJob = 16
- private val colParentThinkTime = 17
-
- private enum class State {
- Pending,
- Active,
- Closed,
- }
-}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/swf/SwfTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/swf/SwfTraceFormat.kt
deleted file mode 100644
index d59b07b4..00000000
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/swf/SwfTraceFormat.kt
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.swf
-
-import org.opendc.trace.TableColumn
-import org.opendc.trace.TableColumnType
-import org.opendc.trace.TableReader
-import org.opendc.trace.TableWriter
-import org.opendc.trace.conv.TABLE_TASKS
-import org.opendc.trace.conv.TASK_ALLOC_NCPUS
-import org.opendc.trace.conv.TASK_GROUP_ID
-import org.opendc.trace.conv.TASK_ID
-import org.opendc.trace.conv.TASK_PARENTS
-import org.opendc.trace.conv.TASK_REQ_NCPUS
-import org.opendc.trace.conv.TASK_RUNTIME
-import org.opendc.trace.conv.TASK_STATUS
-import org.opendc.trace.conv.TASK_SUBMIT_TIME
-import org.opendc.trace.conv.TASK_USER_ID
-import org.opendc.trace.conv.TASK_WAIT_TIME
-import org.opendc.trace.spi.TableDetails
-import org.opendc.trace.spi.TraceFormat
-import java.nio.file.Path
-import kotlin.io.path.bufferedReader
-
-/**
- * Support for the Standard Workload Format (SWF) in OpenDC.
- *
- * The standard is defined by the PWA, see here: https://www.cse.huji.ac.il/labs/parallel/workload/swf.html
- */
-public class SwfTraceFormat : TraceFormat {
- override val name: String = "swf"
-
- override fun create(path: Path) {
- throw UnsupportedOperationException("Writing not supported for this format")
- }
-
- override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS)
-
- override fun getDetails(
- path: Path,
- table: String,
- ): TableDetails {
- return when (table) {
- TABLE_TASKS ->
- TableDetails(
- listOf(
- TableColumn(TASK_ID, TableColumnType.String),
- TableColumn(TASK_SUBMIT_TIME, TableColumnType.Instant),
- TableColumn(TASK_WAIT_TIME, TableColumnType.Duration),
- TableColumn(TASK_RUNTIME, TableColumnType.Duration),
- TableColumn(TASK_REQ_NCPUS, TableColumnType.Int),
- TableColumn(TASK_ALLOC_NCPUS, TableColumnType.Int),
- TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)),
- TableColumn(TASK_STATUS, TableColumnType.Int),
- TableColumn(TASK_GROUP_ID, TableColumnType.Int),
- TableColumn(TASK_USER_ID, TableColumnType.Int),
- ),
- )
- else -> throw IllegalArgumentException("Table $table not supported")
- }
- }
-
- override fun newReader(
- path: Path,
- table: String,
- projection: List<String>?,
- ): TableReader {
- return when (table) {
- TABLE_TASKS -> SwfTaskTableReader(path.bufferedReader())
- else -> throw IllegalArgumentException("Table $table not supported")
- }
- }
-
- override fun newWriter(
- path: Path,
- table: String,
- ): TableWriter {
- throw UnsupportedOperationException("Writing not supported for this format")
- }
-}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wfformat/WfFormatTaskTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wfformat/WfFormatTaskTableReader.kt
deleted file mode 100644
index 8f84e51f..00000000
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wfformat/WfFormatTaskTableReader.kt
+++ /dev/null
@@ -1,314 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.wfformat
-
-import com.fasterxml.jackson.core.JsonParseException
-import com.fasterxml.jackson.core.JsonParser
-import com.fasterxml.jackson.core.JsonToken
-import org.opendc.trace.TableColumnType
-import org.opendc.trace.TableReader
-import org.opendc.trace.conv.TASK_CHILDREN
-import org.opendc.trace.conv.TASK_ID
-import org.opendc.trace.conv.TASK_PARENTS
-import org.opendc.trace.conv.TASK_REQ_NCPUS
-import org.opendc.trace.conv.TASK_RUNTIME
-import org.opendc.trace.conv.TASK_WORKFLOW_ID
-import org.opendc.trace.util.convertTo
-import java.time.Duration
-import java.time.Instant
-import java.util.UUID
-import kotlin.math.roundToInt
-
-/**
- * A [TableReader] implementation for the WfCommons workload trace format.
- */
-internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableReader {
- /**
- * The current nesting of the parser.
- */
- private var level: ParserLevel = ParserLevel.TOP
-
- override fun nextRow(): Boolean {
- reset()
-
- var hasJob = false
-
- while (!hasJob) {
- when (level) {
- ParserLevel.TOP -> {
- val token = parser.nextToken()
-
- // Check whether the document is not empty and starts with an object
- if (token == null) {
- parser.close()
- break
- } else if (token != JsonToken.START_OBJECT) {
- throw JsonParseException(parser, "Expected object", parser.currentLocation)
- } else {
- level = ParserLevel.TRACE
- }
- }
- ParserLevel.TRACE -> {
- // Seek for the workflow object in the file
- if (!seekWorkflow()) {
- parser.close()
- break
- } else if (!parser.isExpectedStartObjectToken) {
- throw JsonParseException(parser, "Expected object", parser.currentLocation)
- } else {
- level = ParserLevel.WORKFLOW
- }
- }
- ParserLevel.WORKFLOW -> {
- // Seek for the jobs object in the file
- level =
- if (!seekJobs()) {
- ParserLevel.TRACE
- } else if (!parser.isExpectedStartArrayToken) {
- throw JsonParseException(parser, "Expected array", parser.currentLocation)
- } else {
- ParserLevel.JOB
- }
- }
- ParserLevel.JOB -> {
- when (parser.nextToken()) {
- JsonToken.END_ARRAY -> level = ParserLevel.WORKFLOW
- JsonToken.START_OBJECT -> {
- parseJob()
- hasJob = true
- break
- }
- else -> throw JsonParseException(parser, "Unexpected token", parser.currentLocation)
- }
- }
- }
- }
-
- return hasJob
- }
-
- override fun resolve(name: String): Int {
- return when (name) {
- TASK_ID -> colID
- TASK_WORKFLOW_ID -> colWorkflowID
- TASK_RUNTIME -> colRuntime
- TASK_REQ_NCPUS -> colNproc
- TASK_PARENTS -> colParents
- TASK_CHILDREN -> colChildren
- else -> -1
- }
- }
-
- override fun isNull(index: Int): Boolean {
- require(index in 0..colChildren) { "Invalid column value" }
- return false
- }
-
- override fun getBoolean(index: Int): Boolean {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getInt(index: Int): Int {
- checkActive()
- return when (index) {
- colNproc -> cores
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun getLong(index: Int): Long {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getFloat(index: Int): Float {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getDouble(index: Int): Double {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getString(index: Int): String? {
- checkActive()
- return when (index) {
- colID -> id
- colWorkflowID -> workflowId
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun getUUID(index: Int): UUID? {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getInstant(index: Int): Instant? {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getDuration(index: Int): Duration? {
- checkActive()
- return when (index) {
- colRuntime -> runtime
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun <T> getList(
- index: Int,
- elementType: Class<T>,
- ): List<T>? {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun <T> getSet(
- index: Int,
- elementType: Class<T>,
- ): Set<T>? {
- checkActive()
- return when (index) {
- colParents -> typeParents.convertTo(parents, elementType)
- colChildren -> typeChildren.convertTo(children, elementType)
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun <K, V> getMap(
- index: Int,
- keyType: Class<K>,
- valueType: Class<V>,
- ): Map<K, V>? {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun close() {
- parser.close()
- }
-
- /**
- * Helper method to check if the reader is active.
- */
- private fun checkActive() {
- check(level != ParserLevel.TOP && !parser.isClosed) { "No active row. Did you call nextRow()?" }
- }
-
- /**
- * Parse the trace and seek until the workflow description.
- */
- private fun seekWorkflow(): Boolean {
- while (parser.nextValue() != JsonToken.END_OBJECT && !parser.isClosed) {
- when (parser.currentName) {
- "name" -> workflowId = parser.text
- "workflow" -> return true
- else -> parser.skipChildren()
- }
- }
-
- return false
- }
-
- /**
- * Parse the workflow description in the file and seek until the first job.
- */
- private fun seekJobs(): Boolean {
- while (parser.nextValue() != JsonToken.END_OBJECT) {
- when (parser.currentName) {
- "jobs" -> return true
- else -> parser.skipChildren()
- }
- }
-
- return false
- }
-
- /**
- * Parse a single job in the file.
- */
- private fun parseJob() {
- while (parser.nextValue() != JsonToken.END_OBJECT) {
- when (parser.currentName) {
- "name" -> id = parser.text
- "parents" -> parents = parseIds()
- "children" -> children = parseIds()
- "runtime" -> runtime = Duration.ofSeconds(parser.numberValue.toLong())
- "cores" -> cores = parser.floatValue.roundToInt()
- else -> parser.skipChildren()
- }
- }
- }
-
- /**
- * Parse the parents/children of a job.
- */
- private fun parseIds(): Set<String> {
- if (!parser.isExpectedStartArrayToken) {
- throw JsonParseException(parser, "Expected array", parser.currentLocation)
- }
-
- val ids = mutableSetOf<String>()
-
- while (parser.nextToken() != JsonToken.END_ARRAY) {
- if (parser.currentToken != JsonToken.VALUE_STRING) {
- throw JsonParseException(parser, "Expected token", parser.currentLocation)
- }
-
- ids.add(parser.valueAsString)
- }
-
- return ids
- }
-
- private enum class ParserLevel {
- TOP,
- TRACE,
- WORKFLOW,
- JOB,
- }
-
- /**
- * State fields for the parser.
- */
- private var id: String? = null
- private var workflowId: String? = null
- private var runtime: Duration? = null
- private var parents: Set<String>? = null
- private var children: Set<String>? = null
- private var cores = -1
-
- private fun reset() {
- id = null
- runtime = null
- parents = null
- children = null
- cores = -1
- }
-
- private val colID = 0
- private val colWorkflowID = 1
- private val colRuntime = 3
- private val colNproc = 4
- private val colParents = 5
- private val colChildren = 6
-
- private val typeParents = TableColumnType.Set(TableColumnType.String)
- private val typeChildren = TableColumnType.Set(TableColumnType.String)
-}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wfformat/WfFormatTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wfformat/WfFormatTraceFormat.kt
deleted file mode 100644
index 2178fac6..00000000
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wfformat/WfFormatTraceFormat.kt
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.wfformat
-
-import com.fasterxml.jackson.core.JsonFactory
-import org.opendc.trace.TableColumn
-import org.opendc.trace.TableColumnType
-import org.opendc.trace.TableReader
-import org.opendc.trace.TableWriter
-import org.opendc.trace.conv.TABLE_TASKS
-import org.opendc.trace.conv.TASK_CHILDREN
-import org.opendc.trace.conv.TASK_ID
-import org.opendc.trace.conv.TASK_PARENTS
-import org.opendc.trace.conv.TASK_REQ_NCPUS
-import org.opendc.trace.conv.TASK_RUNTIME
-import org.opendc.trace.conv.TASK_WORKFLOW_ID
-import org.opendc.trace.spi.TableDetails
-import org.opendc.trace.spi.TraceFormat
-import java.nio.file.Path
-
-/**
- * A [TraceFormat] implementation for the WfCommons workload trace format.
- */
-public class WfFormatTraceFormat : TraceFormat {
- /**
- * The [JsonFactory] that is used to created JSON parsers.
- */
- private val factory = JsonFactory()
-
- override val name: String = "wfformat"
-
- override fun create(path: Path) {
- throw UnsupportedOperationException("Writing not supported for this format")
- }
-
- override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS)
-
- override fun getDetails(
- path: Path,
- table: String,
- ): TableDetails {
- return when (table) {
- TABLE_TASKS ->
- TableDetails(
- listOf(
- TableColumn(TASK_ID, TableColumnType.String),
- TableColumn(TASK_WORKFLOW_ID, TableColumnType.String),
- TableColumn(TASK_RUNTIME, TableColumnType.Duration),
- TableColumn(TASK_REQ_NCPUS, TableColumnType.Int),
- TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)),
- TableColumn(TASK_CHILDREN, TableColumnType.Set(TableColumnType.String)),
- ),
- )
- else -> throw IllegalArgumentException("Table $table not supported")
- }
- }
-
- override fun newReader(
- path: Path,
- table: String,
- projection: List<String>?,
- ): TableReader {
- return when (table) {
- TABLE_TASKS -> WfFormatTaskTableReader(factory.createParser(path.toFile()))
- else -> throw IllegalArgumentException("Table $table not supported")
- }
- }
-
- override fun newWriter(
- path: Path,
- table: String,
- ): TableWriter {
- throw UnsupportedOperationException("Writing not supported for this format")
- }
-}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceStateTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/FragmentTableReader.kt
index d474e0ec..947746c6 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceStateTableReader.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/FragmentTableReader.kt
@@ -20,17 +20,14 @@
* SOFTWARE.
*/
-package org.opendc.trace.formats.opendc
+package org.opendc.trace.formats.workload
import org.opendc.trace.TableReader
-import org.opendc.trace.conv.resourceCpuCount
-import org.opendc.trace.conv.resourceGpuCount
-import org.opendc.trace.conv.resourceID
-import org.opendc.trace.conv.resourceStateCpuUsage
-import org.opendc.trace.conv.resourceStateDuration
-import org.opendc.trace.conv.resourceStateGpuUsage
-import org.opendc.trace.conv.resourceStateTimestamp
-import org.opendc.trace.formats.opendc.parquet.ResourceState
+import org.opendc.trace.conv.FRAGMENT_CPU_USAGE
+import org.opendc.trace.conv.FRAGMENT_DURATION
+import org.opendc.trace.conv.FRAGMENT_GPU_USAGE
+import org.opendc.trace.conv.TASK_ID
+import org.opendc.trace.formats.workload.parquet.Fragment
import org.opendc.trace.util.parquet.LocalParquetReader
import java.time.Duration
import java.time.Instant
@@ -39,11 +36,11 @@ import java.util.UUID
/**
* A [TableReader] implementation for the OpenDC virtual machine trace format.
*/
-internal class OdcVmResourceStateTableReader(private val reader: LocalParquetReader<ResourceState>) : TableReader {
+internal class FragmentTableReader(private val reader: LocalParquetReader<Fragment>) : TableReader {
/**
* The current record.
*/
- private var record: ResourceState? = null
+ private var record: Fragment? = null
override fun nextRow(): Boolean {
try {
@@ -58,23 +55,16 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
}
private val colID = 0
- private val colTimestamp = 1
- private val colDuration = 2
- private val colCpuCount = 3
- private val colCpuUsage = 4
- private val colGpuCount = 5
- private val colGpuUsage = 6
- private val colMemoryCapacity = 7
+ private val colDuration = 1
+ private val colCpuUsage = 2
+ private val colGpuUsage = 3
override fun resolve(name: String): Int {
return when (name) {
- resourceID -> colID
- resourceStateTimestamp -> colTimestamp
- resourceStateDuration -> colDuration
- resourceCpuCount -> colCpuCount
- resourceStateCpuUsage -> colCpuUsage
- resourceGpuCount -> colGpuCount
- resourceStateGpuUsage -> colGpuUsage
+ TASK_ID -> colID
+ FRAGMENT_DURATION -> colDuration
+ FRAGMENT_CPU_USAGE -> colCpuUsage
+ FRAGMENT_GPU_USAGE -> colGpuUsage
else -> -1
}
}
@@ -91,8 +81,7 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
override fun getInt(index: Int): Int {
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- colCpuCount -> record.cpuCount
- colGpuCount -> record.gpuCount
+ colID -> record.id
else -> throw IllegalArgumentException("Invalid column or type [index $index]")
}
}
@@ -115,12 +104,7 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
}
override fun getString(index: Int): String {
- val record = checkNotNull(record) { "Reader in invalid state" }
-
- return when (index) {
- colID -> record.id
- else -> throw IllegalArgumentException("Invalid column index $index")
- }
+ throw IllegalArgumentException("Invalid column index $index")
}
override fun getUUID(index: Int): UUID? {
@@ -128,12 +112,7 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
}
override fun getInstant(index: Int): Instant {
- val record = checkNotNull(record) { "Reader in invalid state" }
-
- return when (index) {
- colTimestamp -> record.timestamp
- else -> throw IllegalArgumentException("Invalid column index $index")
- }
+ throw IllegalArgumentException("Invalid column index $index")
}
override fun getDuration(index: Int): Duration {
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceStateTableWriter.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/FragmentTableWriter.kt
index c6f117d2..33cd9e17 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceStateTableWriter.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/FragmentTableWriter.kt
@@ -20,18 +20,15 @@
* SOFTWARE.
*/
-package org.opendc.trace.formats.opendc
+package org.opendc.trace.formats.workload
import org.apache.parquet.hadoop.ParquetWriter
import org.opendc.trace.TableWriter
-import org.opendc.trace.conv.resourceCpuCount
-import org.opendc.trace.conv.resourceGpuCount
-import org.opendc.trace.conv.resourceID
-import org.opendc.trace.conv.resourceStateCpuUsage
-import org.opendc.trace.conv.resourceStateDuration
-import org.opendc.trace.conv.resourceStateGpuUsage
-import org.opendc.trace.conv.resourceStateTimestamp
-import org.opendc.trace.formats.opendc.parquet.ResourceState
+import org.opendc.trace.conv.FRAGMENT_CPU_USAGE
+import org.opendc.trace.conv.FRAGMENT_DURATION
+import org.opendc.trace.conv.FRAGMENT_GPU_USAGE
+import org.opendc.trace.conv.TASK_ID
+import org.opendc.trace.formats.workload.parquet.Fragment
import java.time.Duration
import java.time.Instant
import java.util.UUID
@@ -39,27 +36,21 @@ import java.util.UUID
/**
* A [TableWriter] implementation for the OpenDC virtual machine trace format.
*/
-internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter<ResourceState>) : TableWriter {
+internal class FragmentTableWriter(private val writer: ParquetWriter<Fragment>) : TableWriter {
/**
* The current state for the record that is being written.
*/
private var localIsActive = false
- private var localID: String = ""
- private var localTimestamp: Instant = Instant.MIN
+ private var localID: Int = -99
private var localDuration: Duration = Duration.ZERO
- private var localCpuCount: Int = 0
private var localCpuUsage: Double = Double.NaN
- private var localGpuCount: Int = 0
private var localGpuUsage: Double = Double.NaN
override fun startRow() {
localIsActive = true
- localID = ""
- localTimestamp = Instant.MIN
+ localID = -99
localDuration = Duration.ZERO
- localCpuCount = 0
localCpuUsage = Double.NaN
- localGpuCount = 0
localGpuUsage = Double.NaN
}
@@ -67,23 +58,19 @@ internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter<R
check(localIsActive) { "No active row" }
localIsActive = false
- check(lastId != localID || localTimestamp >= lastTimestamp) { "Records need to be ordered by (id, timestamp)" }
+ check(lastId != localID) { "Records need to be ordered by (id, timestamp)" }
- writer.write(ResourceState(localID, localTimestamp, localDuration, localCpuCount, localCpuUsage, localGpuCount, localGpuUsage))
+ writer.write(Fragment(localID, localDuration, localCpuUsage, localGpuUsage))
lastId = localID
- lastTimestamp = localTimestamp
}
override fun resolve(name: String): Int {
return when (name) {
- resourceID -> colID
- resourceStateTimestamp -> colTimestamp
- resourceStateDuration -> colDuration
- resourceCpuCount -> colCpuCount
- resourceStateCpuUsage -> colCpuUsage
- resourceGpuCount -> colGpuCount
- resourceStateGpuUsage -> colGpuUsage
+ TASK_ID -> colID
+ FRAGMENT_DURATION -> colDuration
+ FRAGMENT_CPU_USAGE -> colCpuUsage
+ FRAGMENT_GPU_USAGE -> colGpuUsage
else -> -1
}
}
@@ -101,8 +88,7 @@ internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter<R
) {
check(localIsActive) { "No active row" }
when (index) {
- colCpuCount -> localCpuCount = value
- colGpuCount -> localGpuCount = value
+ colID -> localID = value
else -> throw IllegalArgumentException("Invalid column or type [index $index]")
}
}
@@ -137,12 +123,7 @@ internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter<R
index: Int,
value: String,
) {
- check(localIsActive) { "No active row" }
-
- when (index) {
- colID -> localID = value
- else -> throw IllegalArgumentException("Invalid column or type [index $index]")
- }
+ throw IllegalArgumentException("Invalid column or type [index $index]")
}
override fun setUUID(
@@ -156,12 +137,7 @@ internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter<R
index: Int,
value: Instant,
) {
- check(localIsActive) { "No active row" }
-
- when (index) {
- colTimestamp -> localTimestamp = value
- else -> throw IllegalArgumentException("Invalid column or type [index $index]")
- }
+ throw IllegalArgumentException("Invalid column or type [index $index]")
}
override fun setDuration(
@@ -208,14 +184,10 @@ internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter<R
/**
* Last column values that are used to check for correct partitioning.
*/
- private var lastId: String? = null
- private var lastTimestamp: Instant = Instant.MAX
+ private var lastId: Int? = null
private val colID = 0
- private val colTimestamp = 1
- private val colDuration = 2
- private val colCpuCount = 3
- private val colCpuUsage = 4
- private val colGpuCount = 5
- private val colGpuUsage = 6
+ private val colDuration = 1
+ private val colCpuUsage = 2
+ private val colGpuUsage = 3
}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableReader.kt
index 495a5d75..6c700b0c 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableReader.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableReader.kt
@@ -20,23 +20,24 @@
* SOFTWARE.
*/
-package org.opendc.trace.formats.opendc
+package org.opendc.trace.formats.workload
import org.opendc.trace.TableColumnType
import org.opendc.trace.TableReader
-import org.opendc.trace.conv.resourceChildren
-import org.opendc.trace.conv.resourceCpuCapacity
-import org.opendc.trace.conv.resourceCpuCount
-import org.opendc.trace.conv.resourceDeadline
-import org.opendc.trace.conv.resourceDuration
-import org.opendc.trace.conv.resourceGpuCapacity
-import org.opendc.trace.conv.resourceGpuCount
-import org.opendc.trace.conv.resourceID
-import org.opendc.trace.conv.resourceMemCapacity
-import org.opendc.trace.conv.resourceNature
-import org.opendc.trace.conv.resourceParents
-import org.opendc.trace.conv.resourceSubmissionTime
-import org.opendc.trace.formats.opendc.parquet.Resource
+import org.opendc.trace.conv.TASK_CHILDREN
+import org.opendc.trace.conv.TASK_CPU_CAPACITY
+import org.opendc.trace.conv.TASK_CPU_COUNT
+import org.opendc.trace.conv.TASK_DEADLINE
+import org.opendc.trace.conv.TASK_DURATION
+import org.opendc.trace.conv.TASK_GPU_CAPACITY
+import org.opendc.trace.conv.TASK_GPU_COUNT
+import org.opendc.trace.conv.TASK_ID
+import org.opendc.trace.conv.TASK_MEM_CAPACITY
+import org.opendc.trace.conv.TASK_NAME
+import org.opendc.trace.conv.TASK_NATURE
+import org.opendc.trace.conv.TASK_PARENTS
+import org.opendc.trace.conv.TASK_SUBMISSION_TIME
+import org.opendc.trace.formats.workload.parquet.Task
import org.opendc.trace.util.convertTo
import org.opendc.trace.util.parquet.LocalParquetReader
import java.time.Duration
@@ -46,11 +47,11 @@ import java.util.UUID
/**
* A [TableReader] implementation for the "resources table" in the OpenDC virtual machine trace format.
*/
-internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<Resource>) : TableReader {
+internal class TaskTableReader(private val reader: LocalParquetReader<Task>) : TableReader {
/**
* The current record.
*/
- private var record: Resource? = null
+ private var record: Task? = null
override fun nextRow(): Boolean {
try {
@@ -65,35 +66,37 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R
}
private val colID = 0
- private val colSubmissionTime = 1
- private val colDurationTime = 2
- private val colCpuCount = 3
- private val colCpuCapacity = 4
- private val colMemCapacity = 5
- private val colGpuCapacity = 6
- private val colGpuCount = 7
- private val colParents = 8
- private val colChildren = 9
- private val colNature = 10
- private val colDeadline = 11
-
- private val typeParents = TableColumnType.Set(TableColumnType.String)
- private val typeChildren = TableColumnType.Set(TableColumnType.String)
+ private val colName = 1
+ private val colSubmissionTime = 2
+ private val colDurationTime = 3
+ private val colCpuCount = 4
+ private val colCpuCapacity = 5
+ private val colMemCapacity = 6
+ private val colGpuCapacity = 7
+ private val colGpuCount = 8
+ private val colParents = 9
+ private val colChildren = 10
+ private val colNature = 11
+ private val colDeadline = 12
+
+ private val typeParents = TableColumnType.Set(TableColumnType.Int)
+ private val typeChildren = TableColumnType.Set(TableColumnType.Int)
override fun resolve(name: String): Int {
return when (name) {
- resourceID -> colID
- resourceSubmissionTime -> colSubmissionTime
- resourceDuration -> colDurationTime
- resourceCpuCount -> colCpuCount
- resourceCpuCapacity -> colCpuCapacity
- resourceMemCapacity -> colMemCapacity
- resourceGpuCount -> colGpuCount
- resourceGpuCapacity -> colGpuCapacity
- resourceParents -> colParents
- resourceChildren -> colChildren
- resourceNature -> colNature
- resourceDeadline -> colDeadline
+ TASK_ID -> colID
+ TASK_NAME -> colName
+ TASK_SUBMISSION_TIME -> colSubmissionTime
+ TASK_DURATION -> colDurationTime
+ TASK_CPU_COUNT -> colCpuCount
+ TASK_CPU_CAPACITY -> colCpuCapacity
+ TASK_MEM_CAPACITY -> colMemCapacity
+ TASK_GPU_COUNT -> colGpuCount
+ TASK_GPU_CAPACITY -> colGpuCapacity
+ TASK_PARENTS -> colParents
+ TASK_CHILDREN -> colChildren
+ TASK_NATURE -> colNature
+ TASK_DEADLINE -> colDeadline
else -> -1
}
}
@@ -117,6 +120,7 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
+ colID -> record.id
colCpuCount -> record.cpuCount
colGpuCount -> record.gpuCount
else -> throw IllegalArgumentException("Invalid column")
@@ -151,7 +155,7 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- colID -> record.id
+ colName -> record.name
colNature -> record.nature
else -> throw IllegalArgumentException("Invalid column")
}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableWriter.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableWriter.kt
index 022e288a..39be36c1 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableWriter.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableWriter.kt
@@ -20,23 +20,24 @@
* SOFTWARE.
*/
-package org.opendc.trace.formats.opendc
+package org.opendc.trace.formats.workload
import org.apache.parquet.hadoop.ParquetWriter
import org.opendc.trace.TableWriter
-import org.opendc.trace.conv.resourceChildren
-import org.opendc.trace.conv.resourceCpuCapacity
-import org.opendc.trace.conv.resourceCpuCount
-import org.opendc.trace.conv.resourceDeadline
-import org.opendc.trace.conv.resourceDuration
-import org.opendc.trace.conv.resourceGpuCapacity
-import org.opendc.trace.conv.resourceGpuCount
-import org.opendc.trace.conv.resourceID
-import org.opendc.trace.conv.resourceMemCapacity
-import org.opendc.trace.conv.resourceNature
-import org.opendc.trace.conv.resourceParents
-import org.opendc.trace.conv.resourceSubmissionTime
-import org.opendc.trace.formats.opendc.parquet.Resource
+import org.opendc.trace.conv.TASK_CHILDREN
+import org.opendc.trace.conv.TASK_CPU_CAPACITY
+import org.opendc.trace.conv.TASK_CPU_COUNT
+import org.opendc.trace.conv.TASK_DEADLINE
+import org.opendc.trace.conv.TASK_DURATION
+import org.opendc.trace.conv.TASK_GPU_CAPACITY
+import org.opendc.trace.conv.TASK_GPU_COUNT
+import org.opendc.trace.conv.TASK_ID
+import org.opendc.trace.conv.TASK_MEM_CAPACITY
+import org.opendc.trace.conv.TASK_NAME
+import org.opendc.trace.conv.TASK_NATURE
+import org.opendc.trace.conv.TASK_PARENTS
+import org.opendc.trace.conv.TASK_SUBMISSION_TIME
+import org.opendc.trace.formats.workload.parquet.Task
import java.time.Duration
import java.time.Instant
import java.util.UUID
@@ -44,12 +45,13 @@ import java.util.UUID
/**
* A [TableWriter] implementation for the OpenDC virtual machine trace format.
*/
-internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resource>) : TableWriter {
+internal class TaskTableWriter(private val writer: ParquetWriter<Task>) : TableWriter {
/**
* The current state for the record that is being written.
*/
private var localIsActive = false
- private var localId: String = ""
+ private var localId: Int = -99
+ private var localName: String = ""
private var localSubmissionTime: Instant = Instant.MIN
private var localDuration: Long = 0L
private var localCpuCount: Int = 0
@@ -57,14 +59,15 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour
private var localMemCapacity: Double = Double.NaN
private var localGpuCount: Int = 0
private var localGpuCapacity: Double = Double.NaN
- private var localParents = mutableSetOf<String>()
- private var localChildren = mutableSetOf<String>()
+ private var localParents = mutableSetOf<Int>()
+ private var localChildren = mutableSetOf<Int>()
private var localNature: String? = null
private var localDeadline: Long = -1
override fun startRow() {
localIsActive = true
- localId = ""
+ localId = -99
+ localName = ""
localSubmissionTime = Instant.MIN
localDuration = 0L
localCpuCount = 0
@@ -82,8 +85,9 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour
check(localIsActive) { "No active row" }
localIsActive = false
writer.write(
- Resource(
+ Task(
localId,
+ localName,
localSubmissionTime,
localDuration,
localCpuCount,
@@ -101,18 +105,19 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour
override fun resolve(name: String): Int {
return when (name) {
- resourceID -> colID
- resourceSubmissionTime -> colSubmissionTime
- resourceDuration -> colDuration
- resourceCpuCount -> colCpuCount
- resourceCpuCapacity -> colCpuCapacity
- resourceMemCapacity -> colMemCapacity
- resourceGpuCount -> colGpuCount
- resourceGpuCapacity -> colGpuCapacity
- resourceParents -> colParents
- resourceChildren -> colChildren
- resourceNature -> colNature
- resourceDeadline -> colDeadline
+ TASK_ID -> colID
+ TASK_NAME -> colID
+ TASK_SUBMISSION_TIME -> colSubmissionTime
+ TASK_DURATION -> colDuration
+ TASK_CPU_COUNT -> colCpuCount
+ TASK_CPU_CAPACITY -> colCpuCapacity
+ TASK_MEM_CAPACITY -> colMemCapacity
+ TASK_GPU_COUNT -> colGpuCount
+ TASK_GPU_CAPACITY -> colGpuCapacity
+ TASK_PARENTS -> colParents
+ TASK_CHILDREN -> colChildren
+ TASK_NATURE -> colNature
+ TASK_DEADLINE -> colDeadline
else -> -1
}
}
@@ -130,6 +135,7 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour
) {
check(localIsActive) { "No active row" }
when (index) {
+ colID -> localId = value
colCpuCount -> localCpuCount = value
colGpuCount -> localGpuCount = value
else -> throw IllegalArgumentException("Invalid column or type [index $index]")
@@ -174,7 +180,7 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour
) {
check(localIsActive) { "No active row" }
when (index) {
- colID -> localId = value
+ colName -> localName = value
colNature -> localNature = value
else -> throw IllegalArgumentException("Invalid column index $index")
}
@@ -235,15 +241,16 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour
}
private val colID = 0
- private val colSubmissionTime = 1
- private val colDuration = 2
- private val colCpuCount = 3
- private val colCpuCapacity = 4
- private val colMemCapacity = 5
- private val colGpuCount = 6
- private val colGpuCapacity = 7
- private val colParents = 8
- private val colChildren = 9
- private val colNature = 10
- private val colDeadline = 11
+ private val colName = 1
+ private val colSubmissionTime = 2
+ private val colDuration = 3
+ private val colCpuCount = 4
+ private val colCpuCapacity = 5
+ private val colMemCapacity = 6
+ private val colGpuCount = 7
+ private val colGpuCapacity = 8
+ private val colParents = 9
+ private val colChildren = 10
+ private val colNature = 11
+ private val colDeadline = 12
}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/WorkloadTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/WorkloadTraceFormat.kt
new file mode 100644
index 00000000..7af0650e
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/WorkloadTraceFormat.kt
@@ -0,0 +1,165 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.formats.workload
+
+import org.apache.parquet.column.ParquetProperties
+import org.apache.parquet.hadoop.ParquetFileWriter
+import org.apache.parquet.hadoop.metadata.CompressionCodecName
+import org.opendc.trace.TableColumn
+import org.opendc.trace.TableColumnType
+import org.opendc.trace.TableReader
+import org.opendc.trace.TableWriter
+import org.opendc.trace.conv.FRAGMENT_CPU_USAGE
+import org.opendc.trace.conv.FRAGMENT_DURATION
+import org.opendc.trace.conv.TABLE_FRAGMENTS
+import org.opendc.trace.conv.TABLE_TASKS
+import org.opendc.trace.conv.TASK_CHILDREN
+import org.opendc.trace.conv.TASK_CPU_CAPACITY
+import org.opendc.trace.conv.TASK_CPU_COUNT
+import org.opendc.trace.conv.TASK_DEADLINE
+import org.opendc.trace.conv.TASK_DURATION
+import org.opendc.trace.conv.TASK_GPU_CAPACITY
+import org.opendc.trace.conv.TASK_GPU_COUNT
+import org.opendc.trace.conv.TASK_ID
+import org.opendc.trace.conv.TASK_MEM_CAPACITY
+import org.opendc.trace.conv.TASK_NATURE
+import org.opendc.trace.conv.TASK_PARENTS
+import org.opendc.trace.conv.TASK_SUBMISSION_TIME
+import org.opendc.trace.formats.workload.parquet.FragmentReadSupport
+import org.opendc.trace.formats.workload.parquet.FragmentWriteSupport
+import org.opendc.trace.formats.workload.parquet.TaskReadSupport
+import org.opendc.trace.formats.workload.parquet.TaskWriteSupport
+import org.opendc.trace.spi.TableDetails
+import org.opendc.trace.spi.TraceFormat
+import org.opendc.trace.util.parquet.LocalParquetReader
+import org.opendc.trace.util.parquet.LocalParquetWriter
+import java.nio.file.Files
+import java.nio.file.Path
+
+/**
+ * A [TraceFormat] implementation of the OpenDC virtual machine trace format.
+ */
+public class WorkloadTraceFormat : TraceFormat {
+ /**
+ * The name of this trace format.
+ */
+ override val name: String = "workload"
+
+ override fun create(path: Path) {
+ // Construct directory containing the trace files
+ Files.createDirectories(path)
+
+ val tables = getTables(path)
+
+ for (table in tables) {
+ val writer = newWriter(path, table)
+ writer.close()
+ }
+ }
+
+ override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS, TABLE_FRAGMENTS)
+
+ override fun getDetails(
+ path: Path,
+ table: String,
+ ): TableDetails {
+ return when (table) {
+ TABLE_TASKS ->
+ TableDetails(
+ listOf(
+ TableColumn(TASK_ID, TableColumnType.String),
+ TableColumn(TASK_SUBMISSION_TIME, TableColumnType.Instant),
+ TableColumn(TASK_DURATION, TableColumnType.Long),
+ TableColumn(TASK_CPU_COUNT, TableColumnType.Int),
+ TableColumn(TASK_CPU_CAPACITY, TableColumnType.Double),
+ TableColumn(TASK_MEM_CAPACITY, TableColumnType.Double),
+ TableColumn(TASK_GPU_COUNT, TableColumnType.Int),
+ TableColumn(TASK_GPU_CAPACITY, TableColumnType.Double),
+ TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)),
+ TableColumn(TASK_CHILDREN, TableColumnType.Set(TableColumnType.String)),
+ TableColumn(TASK_NATURE, TableColumnType.String),
+ TableColumn(TASK_DEADLINE, TableColumnType.Long),
+ ),
+ )
+ TABLE_FRAGMENTS ->
+ TableDetails(
+ listOf(
+ TableColumn(TASK_ID, TableColumnType.String),
+ TableColumn(FRAGMENT_DURATION, TableColumnType.Duration),
+ TableColumn(TASK_CPU_COUNT, TableColumnType.Int),
+ TableColumn(FRAGMENT_CPU_USAGE, TableColumnType.Double),
+ ),
+ )
+ else -> throw IllegalArgumentException("Table $table not supported")
+ }
+ }
+
+ override fun newReader(
+ path: Path,
+ table: String,
+ projection: List<String>?,
+ ): TableReader {
+ return when (table) {
+ TABLE_TASKS -> {
+ val reader = LocalParquetReader(path.resolve("tasks.parquet"), TaskReadSupport(projection))
+ TaskTableReader(reader)
+ }
+ TABLE_FRAGMENTS -> {
+ val reader = LocalParquetReader(path.resolve("fragments.parquet"), FragmentReadSupport(projection))
+ FragmentTableReader(reader)
+ }
+ else -> throw IllegalArgumentException("Table $table not supported")
+ }
+ }
+
+ override fun newWriter(
+ path: Path,
+ table: String,
+ ): TableWriter {
+ return when (table) {
+ TABLE_TASKS -> {
+ val writer =
+ LocalParquetWriter.builder(path.resolve("tasks.parquet"), TaskWriteSupport())
+ .withCompressionCodec(CompressionCodecName.ZSTD)
+ .withPageWriteChecksumEnabled(true)
+ .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
+ .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+ .build()
+ TaskTableWriter(writer)
+ }
+ TABLE_FRAGMENTS -> {
+ val writer =
+ LocalParquetWriter.builder(path.resolve("fragments.parquet"), FragmentWriteSupport())
+ .withCompressionCodec(CompressionCodecName.ZSTD)
+ .withDictionaryEncoding("id", true)
+ .withBloomFilterEnabled("id", true)
+ .withPageWriteChecksumEnabled(true)
+ .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
+ .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+ .build()
+ FragmentTableWriter(writer)
+ }
+ else -> throw IllegalArgumentException("Table $table not supported")
+ }
+ }
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceState.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/Fragment.kt
index 10fc6be4..44385088 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceState.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/Fragment.kt
@@ -20,17 +20,13 @@
* SOFTWARE.
*/
-package org.opendc.trace.formats.opendc.parquet
+package org.opendc.trace.formats.workload.parquet
import java.time.Duration
-import java.time.Instant
-internal class ResourceState(
- val id: String,
- val timestamp: Instant,
+internal class Fragment(
+ val id: Int,
val duration: Duration,
- val cpuCount: Int,
val cpuUsage: Double,
- val gpuCount: Int,
val gpuUsage: Double,
)
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentReadSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentReadSupport.kt
new file mode 100644
index 00000000..3fa914bc
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentReadSupport.kt
@@ -0,0 +1,79 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.formats.workload.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.api.InitContext
+import org.apache.parquet.hadoop.api.ReadSupport
+import org.apache.parquet.io.api.RecordMaterializer
+import org.apache.parquet.schema.MessageType
+import org.apache.parquet.schema.Types
+import org.opendc.trace.TableColumn
+import org.opendc.trace.conv.FRAGMENT_CPU_USAGE
+import org.opendc.trace.conv.FRAGMENT_DURATION
+import org.opendc.trace.conv.TASK_ID
+
+/**
+ * A [ReadSupport] instance for [Fragment] objects.
+ */
+internal class FragmentReadSupport(private val projection: List<String>?) : ReadSupport<Fragment>() {
+ /**
+ * Mapping from field names to [TableColumn]s.
+ */
+ private val fieldMap =
+ mapOf(
+ "id" to TASK_ID,
+ "duration" to FRAGMENT_DURATION,
+ "cpuUsage" to FRAGMENT_CPU_USAGE,
+ "cpu_usage" to FRAGMENT_CPU_USAGE,
+ )
+
+ override fun init(context: InitContext): ReadContext {
+ val projectedSchema =
+ if (projection != null) {
+ Types.buildMessage()
+ .apply {
+ val projectionSet = projection.toSet()
+
+ for (field in FRAGMENT_SCHEMA.fields) {
+ val col = fieldMap[field.name] ?: continue
+ if (col in projectionSet) {
+ addField(field)
+ }
+ }
+ }
+ .named(FRAGMENT_SCHEMA.name)
+ } else {
+ FRAGMENT_SCHEMA
+ }
+
+ return ReadContext(projectedSchema)
+ }
+
+ override fun prepareForRead(
+ configuration: Configuration,
+ keyValueMetaData: Map<String, String>,
+ fileSchema: MessageType,
+ readContext: ReadContext,
+ ): RecordMaterializer<Fragment> = FragmentRecordMaterializer(readContext.requestedSchema)
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateRecordMaterializer.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentRecordMaterializer.kt
index ee5e56aa..7902cab1 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateRecordMaterializer.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentRecordMaterializer.kt
@@ -20,9 +20,8 @@
* SOFTWARE.
*/
-package org.opendc.trace.formats.opendc.parquet
+package org.opendc.trace.formats.workload.parquet
-import org.apache.parquet.io.api.Binary
import org.apache.parquet.io.api.Converter
import org.apache.parquet.io.api.GroupConverter
import org.apache.parquet.io.api.PrimitiveConverter
@@ -32,13 +31,13 @@ import java.time.Duration
import java.time.Instant
/**
- * A [RecordMaterializer] for [ResourceState] records.
+ * A [RecordMaterializer] for [Fragment] records.
*/
-internal class ResourceStateRecordMaterializer(schema: MessageType) : RecordMaterializer<ResourceState>() {
+internal class FragmentRecordMaterializer(schema: MessageType) : RecordMaterializer<Fragment>() {
/**
* State of current record being read.
*/
- private var localId = ""
+ private var localId = -99
private var localTimestamp = Instant.MIN
private var localDuration = Duration.ZERO
private var localCpuCount = 0
@@ -59,8 +58,8 @@ internal class ResourceStateRecordMaterializer(schema: MessageType) : RecordMate
when (type.name) {
"id" ->
object : PrimitiveConverter() {
- override fun addBinary(value: Binary) {
- localId = value.toStringUsingUTF8()
+ override fun addInt(value: Int) {
+ localId = value
}
}
"timestamp", "time" ->
@@ -104,8 +103,7 @@ internal class ResourceStateRecordMaterializer(schema: MessageType) : RecordMate
}
override fun start() {
- localId = ""
- localTimestamp = Instant.MIN
+ localId = -99
localDuration = Duration.ZERO
localCpuCount = 0
localCpuUsage = 0.0
@@ -118,14 +116,11 @@ internal class ResourceStateRecordMaterializer(schema: MessageType) : RecordMate
override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex]
}
- override fun getCurrentRecord(): ResourceState =
- ResourceState(
+ override fun getCurrentRecord(): Fragment =
+ Fragment(
localId,
- localTimestamp,
localDuration,
- localCpuCount,
localCpuUsage,
- localGpuCount,
localGpuUsage,
)
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentSchemas.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentSchemas.kt
new file mode 100644
index 00000000..cd499e7e
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentSchemas.kt
@@ -0,0 +1,80 @@
+/*
+ * Copyright (c) 2025 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.formats.workload.parquet
+
+import org.apache.parquet.schema.LogicalTypeAnnotation
+import org.apache.parquet.schema.MessageType
+import org.apache.parquet.schema.PrimitiveType
+import org.apache.parquet.schema.Types
+
+private val FRAGMENT_SCHEMA_v1: MessageType =
+ Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("timestamp"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("duration"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cpu_count"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_usage"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("gpu_count"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("gpu_usage"),
+ )
+ .named("resource_state")
+
+private val FRAGMENT_SCHEMA_v2: MessageType =
+ Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("duration"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_usage"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("gpu_usage"),
+ )
+ .named("resource_state")
+
+/**
+ * Parquet read schema for the "resource states" table in the trace.
+ */
+public val FRAGMENT_SCHEMA: MessageType = FRAGMENT_SCHEMA_v2
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateWriteSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentWriteSupport.kt
index 58c43916..e6b7ba4f 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateWriteSupport.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentWriteSupport.kt
@@ -20,11 +20,10 @@
* SOFTWARE.
*/
-package org.opendc.trace.formats.opendc.parquet
+package org.opendc.trace.formats.workload.parquet
import org.apache.hadoop.conf.Configuration
import org.apache.parquet.hadoop.api.WriteSupport
-import org.apache.parquet.io.api.Binary
import org.apache.parquet.io.api.RecordConsumer
import org.apache.parquet.schema.LogicalTypeAnnotation
import org.apache.parquet.schema.MessageType
@@ -32,9 +31,9 @@ import org.apache.parquet.schema.PrimitiveType
import org.apache.parquet.schema.Types
/**
- * Support for writing [Resource] instances to Parquet format.
+ * Support for writing [Task] instances to Parquet format.
*/
-internal class ResourceStateWriteSupport : WriteSupport<ResourceState>() {
+internal class FragmentWriteSupport : WriteSupport<Fragment>() {
/**
* The current active record consumer.
*/
@@ -48,32 +47,24 @@ internal class ResourceStateWriteSupport : WriteSupport<ResourceState>() {
this.recordConsumer = recordConsumer
}
- override fun write(record: ResourceState) {
+ override fun write(record: Fragment) {
write(recordConsumer, record)
}
private fun write(
consumer: RecordConsumer,
- record: ResourceState,
+ record: Fragment,
) {
consumer.startMessage()
consumer.startField("id", 0)
- consumer.addBinary(Binary.fromCharSequence(record.id))
+ consumer.addInteger(record.id)
consumer.endField("id", 0)
- consumer.startField("timestamp", 1)
- consumer.addLong(record.timestamp.toEpochMilli())
- consumer.endField("timestamp", 1)
-
consumer.startField("duration", 2)
consumer.addLong(record.duration.toMillis())
consumer.endField("duration", 2)
- consumer.startField("cpu_count", 3)
- consumer.addInteger(record.cpuCount)
- consumer.endField("cpu_count", 3)
-
consumer.startField("cpu_usage", 4)
consumer.addDouble(record.cpuUsage)
consumer.endField("cpu_usage", 4)
@@ -101,9 +92,6 @@ internal class ResourceStateWriteSupport : WriteSupport<ResourceState>() {
.required(PrimitiveType.PrimitiveTypeName.INT64)
.named("duration"),
Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("cpu_count"),
- Types
.required(PrimitiveType.PrimitiveTypeName.DOUBLE)
.named("cpu_usage"),
)
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/Resource.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/Task.kt
index d727920a..f661d5a9 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/Resource.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/Task.kt
@@ -20,15 +20,16 @@
* SOFTWARE.
*/
-package org.opendc.trace.formats.opendc.parquet
+package org.opendc.trace.formats.workload.parquet
import java.time.Instant
/**
* A description of a resource in a trace.
*/
-internal data class Resource(
- val id: String,
+internal data class Task(
+ val id: Int,
+ val name: String,
val submissionTime: Instant,
val durationTime: Long,
val cpuCount: Int,
@@ -36,8 +37,8 @@ internal data class Resource(
val memCapacity: Double,
val gpuCount: Int = 0,
val gpuCapacity: Double = 0.0,
- val parents: Set<String> = emptySet(),
- val children: Set<String> = emptySet(),
+ val parents: Set<Int> = emptySet(),
+ val children: Set<Int> = emptySet(),
val nature: String? = null,
val deadline: Long = -1,
)
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskReadSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskReadSupport.kt
new file mode 100644
index 00000000..4bbb18ac
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskReadSupport.kt
@@ -0,0 +1,101 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.formats.workload.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.api.InitContext
+import org.apache.parquet.hadoop.api.ReadSupport
+import org.apache.parquet.io.api.RecordMaterializer
+import org.apache.parquet.schema.MessageType
+import org.apache.parquet.schema.Types
+import org.opendc.trace.TableColumn
+import org.opendc.trace.conv.TASK_CHILDREN
+import org.opendc.trace.conv.TASK_CPU_CAPACITY
+import org.opendc.trace.conv.TASK_CPU_COUNT
+import org.opendc.trace.conv.TASK_DEADLINE
+import org.opendc.trace.conv.TASK_DURATION
+import org.opendc.trace.conv.TASK_GPU_CAPACITY
+import org.opendc.trace.conv.TASK_GPU_COUNT
+import org.opendc.trace.conv.TASK_ID
+import org.opendc.trace.conv.TASK_MEM_CAPACITY
+import org.opendc.trace.conv.TASK_NAME
+import org.opendc.trace.conv.TASK_NATURE
+import org.opendc.trace.conv.TASK_PARENTS
+import org.opendc.trace.conv.TASK_SUBMISSION_TIME
+
+/**
+ * A [ReadSupport] instance for [Task] objects.
+ */
+internal class TaskReadSupport(private val projection: List<String>?) : ReadSupport<Task>() {
+ /**
+ * Mapping from field names to [TableColumn]s.
+ */
+ private val fieldMap =
+ mapOf(
+ "id" to TASK_ID,
+ "name" to TASK_NAME,
+ "submissionTime" to TASK_SUBMISSION_TIME,
+ "submission_time" to TASK_SUBMISSION_TIME,
+ "duration" to TASK_DURATION,
+ "maxCores" to TASK_CPU_COUNT,
+ "cpu_count" to TASK_CPU_COUNT,
+ "cpu_capacity" to TASK_CPU_CAPACITY,
+ "requiredMemory" to TASK_MEM_CAPACITY,
+ "mem_capacity" to TASK_MEM_CAPACITY,
+ "gpu_count" to TASK_GPU_COUNT,
+ "gpu_capacity" to TASK_GPU_CAPACITY,
+ "parents" to TASK_PARENTS,
+ "children" to TASK_CHILDREN,
+ "nature" to TASK_NATURE,
+ "deadline" to TASK_DEADLINE,
+ )
+
+ override fun init(context: InitContext): ReadContext {
+ val projectedSchema =
+ if (projection != null) {
+ Types.buildMessage()
+ .apply {
+ val projectionSet = projection.toSet()
+
+ for (field in TASK_SCHEMA.fields) {
+ val col = fieldMap[field.name] ?: continue
+ if (col in projectionSet) {
+ addField(field)
+ }
+ }
+ }
+ .named(TASK_SCHEMA.name)
+ } else {
+ TASK_SCHEMA
+ }
+
+ return ReadContext(projectedSchema)
+ }
+
+ override fun prepareForRead(
+ configuration: Configuration,
+ keyValueMetaData: Map<String, String>,
+ fileSchema: MessageType,
+ readContext: ReadContext,
+ ): RecordMaterializer<Task> = TaskRecordMaterializer(readContext.requestedSchema)
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceRecordMaterializer.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskRecordMaterializer.kt
index f9493721..12dc54b7 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceRecordMaterializer.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskRecordMaterializer.kt
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.trace.formats.opendc.parquet
+package org.opendc.trace.formats.workload.parquet
import org.apache.parquet.io.api.Binary
import org.apache.parquet.io.api.Converter
@@ -31,13 +31,14 @@ import org.apache.parquet.schema.MessageType
import java.time.Instant
/**
- * A [RecordMaterializer] for [Resource] records.
+ * A [RecordMaterializer] for [Task] records.
*/
-internal class ResourceRecordMaterializer(schema: MessageType) : RecordMaterializer<Resource>() {
+internal class TaskRecordMaterializer(schema: MessageType) : RecordMaterializer<Task>() {
/**
* State of current record being read.
*/
- private var localId = ""
+ private var localId = -99
+ private var localName = ""
private var localSubmissionTime = Instant.MIN
private var localDuration = 0L
private var localCpuCount = 0
@@ -45,8 +46,8 @@ internal class ResourceRecordMaterializer(schema: MessageType) : RecordMateriali
private var localMemCapacity = 0.0
private var localGpuCount = 0
private var localGpuCapacity = 0.0
- private var localParents = mutableSetOf<String>()
- private var localChildren = mutableSetOf<String>()
+ private var localParents = mutableSetOf<Int>()
+ private var localChildren = mutableSetOf<Int>()
private var localNature: String? = null
private var localDeadline = -1L
@@ -63,8 +64,14 @@ internal class ResourceRecordMaterializer(schema: MessageType) : RecordMateriali
when (type.name) {
"id" ->
object : PrimitiveConverter() {
+ override fun addInt(value: Int) {
+ localId = value
+ }
+ }
+ "name" ->
+ object : PrimitiveConverter() {
override fun addBinary(value: Binary) {
- localId = value.toStringUsingUTF8()
+ localName = value.toStringUsingUTF8()
}
}
"submission_time", "submissionTime" ->
@@ -132,7 +139,8 @@ internal class ResourceRecordMaterializer(schema: MessageType) : RecordMateriali
}
override fun start() {
- localId = ""
+ localId = -99
+ localName = ""
localSubmissionTime = Instant.MIN
localDuration = 0L
localCpuCount = 0
@@ -151,9 +159,10 @@ internal class ResourceRecordMaterializer(schema: MessageType) : RecordMateriali
override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex]
}
- override fun getCurrentRecord(): Resource =
- Resource(
+ override fun getCurrentRecord(): Task =
+ Task(
localId,
+ localName,
localSubmissionTime,
localDuration,
localCpuCount,
@@ -172,12 +181,11 @@ internal class ResourceRecordMaterializer(schema: MessageType) : RecordMateriali
/**
* Helper class to convert parent and child relations and add them to [relations].
*/
- private class RelationConverter(private val relations: MutableSet<String>) : GroupConverter() {
+ private class RelationConverter(private val relations: MutableSet<Int>) : GroupConverter() {
private val entryConverter =
object : PrimitiveConverter() {
- override fun addBinary(value: Binary) {
- val str = value.toStringUsingUTF8()
- relations.add(str)
+ override fun addInt(value: Int) {
+ relations.add(value)
}
}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskSchemas.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskSchemas.kt
new file mode 100644
index 00000000..f7f5e953
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskSchemas.kt
@@ -0,0 +1,166 @@
+/*
+ * Copyright (c) 2025 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.formats.workload.parquet
+
+import org.apache.parquet.schema.LogicalTypeAnnotation
+import org.apache.parquet.schema.MessageType
+import org.apache.parquet.schema.PrimitiveType
+import org.apache.parquet.schema.Type
+import org.apache.parquet.schema.Types
+
+private val TASK_SCHEMA_V1: MessageType =
+ Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("submission_time"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("duration"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cpu_count"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_capacity"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("mem_capacity"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("gpu_count"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("gpu_capacity"),
+ Types
+ .buildGroup(Type.Repetition.OPTIONAL)
+ .addField(
+ Types.repeatedGroup()
+ .addField(
+ Types.optional(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("element"),
+ )
+ .named("list"),
+ )
+ .`as`(LogicalTypeAnnotation.listType())
+ .named("parents"),
+ Types
+ .buildGroup(Type.Repetition.OPTIONAL)
+ .addField(
+ Types.repeatedGroup()
+ .addField(
+ Types.optional(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("element"),
+ )
+ .named("list"),
+ )
+ .`as`(LogicalTypeAnnotation.listType())
+ .named("children"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("nature"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("deadline"),
+ )
+ .named("resource")
+
+private val TASK_SCHEMA_V2: MessageType =
+ Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("id"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("name"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("submission_time"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("duration"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cpu_count"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_capacity"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("mem_capacity"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("gpu_count"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("gpu_capacity"),
+ Types
+ .buildGroup(Type.Repetition.OPTIONAL)
+ .addField(
+ Types.repeatedGroup()
+ .addField(
+ Types.optional(
+ PrimitiveType.PrimitiveTypeName.INT32,
+ )
+ .named("element"),
+ )
+ .named("list"),
+ )
+ .`as`(LogicalTypeAnnotation.listType())
+ .named("parents"),
+ Types
+ .buildGroup(Type.Repetition.OPTIONAL)
+ .addField(
+ Types.repeatedGroup()
+ .addField(
+ Types.optional(
+ PrimitiveType.PrimitiveTypeName.INT32,
+ )
+ .named("element"),
+ )
+ .named("list"),
+ )
+ .`as`(LogicalTypeAnnotation.listType())
+ .named("children"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("nature"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("deadline"),
+ )
+ .named("resource")
+
+public val TASK_SCHEMA: MessageType = TASK_SCHEMA_V2
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceWriteSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskWriteSupport.kt
index c3e984fb..a7ce62b8 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceWriteSupport.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskWriteSupport.kt
@@ -20,47 +20,43 @@
* SOFTWARE.
*/
-package org.opendc.trace.formats.opendc.parquet
+package org.opendc.trace.formats.workload.parquet
import org.apache.hadoop.conf.Configuration
import org.apache.parquet.hadoop.api.WriteSupport
import org.apache.parquet.io.api.Binary
import org.apache.parquet.io.api.RecordConsumer
-import org.apache.parquet.schema.LogicalTypeAnnotation
-import org.apache.parquet.schema.MessageType
-import org.apache.parquet.schema.PrimitiveType
-import org.apache.parquet.schema.Types
import kotlin.math.roundToLong
/**
- * Support for writing [Resource] instances to Parquet format.
+ * Support for writing [Task] instances to Parquet format.
*/
-internal class ResourceWriteSupport : WriteSupport<Resource>() {
+internal class TaskWriteSupport : WriteSupport<Task>() {
/**
* The current active record consumer.
*/
private lateinit var recordConsumer: RecordConsumer
override fun init(configuration: Configuration): WriteContext {
- return WriteContext(WRITE_SCHEMA, emptyMap())
+ return WriteContext(TASK_SCHEMA, emptyMap())
}
override fun prepareForWrite(recordConsumer: RecordConsumer) {
this.recordConsumer = recordConsumer
}
- override fun write(record: Resource) {
+ override fun write(record: Task) {
write(recordConsumer, record)
}
private fun write(
consumer: RecordConsumer,
- record: Resource,
+ record: Task,
) {
consumer.startMessage()
consumer.startField("id", 0)
- consumer.addBinary(Binary.fromCharSequence(record.id))
+ consumer.addInteger(record.id)
consumer.endField("id", 0)
consumer.startField("submission_time", 1)
@@ -97,43 +93,4 @@ internal class ResourceWriteSupport : WriteSupport<Resource>() {
consumer.endMessage()
}
-
- companion object {
- /**
- * Parquet schema for the "resources" table in the trace.
- */
- @JvmStatic
- val WRITE_SCHEMA: MessageType =
- Types.buildMessage()
- .addFields(
- Types
- .required(PrimitiveType.PrimitiveTypeName.BINARY)
- .`as`(LogicalTypeAnnotation.stringType())
- .named("id"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("submission_time"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("duration"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("cpu_count"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("cpu_capacity"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("mem_capacity"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.BINARY)
- .`as`(LogicalTypeAnnotation.stringType())
- .named("nature"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT64)
- .named("deadline"),
- )
- .named("resource")
- }
}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/WtfTaskTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/WtfTaskTableReader.kt
deleted file mode 100644
index 95582388..00000000
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/WtfTaskTableReader.kt
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.wtf
-
-import org.opendc.trace.TableColumnType
-import org.opendc.trace.TableReader
-import org.opendc.trace.conv.TASK_CHILDREN
-import org.opendc.trace.conv.TASK_GROUP_ID
-import org.opendc.trace.conv.TASK_ID
-import org.opendc.trace.conv.TASK_PARENTS
-import org.opendc.trace.conv.TASK_REQ_NCPUS
-import org.opendc.trace.conv.TASK_RUNTIME
-import org.opendc.trace.conv.TASK_SUBMIT_TIME
-import org.opendc.trace.conv.TASK_USER_ID
-import org.opendc.trace.conv.TASK_WAIT_TIME
-import org.opendc.trace.conv.TASK_WORKFLOW_ID
-import org.opendc.trace.util.convertTo
-import org.opendc.trace.util.parquet.LocalParquetReader
-import org.opendc.trace.wtf.parquet.Task
-import java.time.Duration
-import java.time.Instant
-import java.util.UUID
-
-/**
- * A [TableReader] implementation for the WTF format.
- */
-internal class WtfTaskTableReader(private val reader: LocalParquetReader<Task>) : TableReader {
- /**
- * The current record.
- */
- private var record: Task? = null
-
- override fun nextRow(): Boolean {
- try {
- val record = reader.read()
- this.record = record
-
- return record != null
- } catch (e: Throwable) {
- this.record = null
- throw e
- }
- }
-
- private val colID = 0
- private val colWorkflowID = 1
- private val colSubmitTime = 2
- private val colWaitTime = 3
- private val colRuntime = 4
- private val colReqNcpus = 5
- private val colParents = 6
- private val colChildren = 7
- private val colGroupID = 8
- private val colUserID = 9
-
- private val typeParents = TableColumnType.Set(TableColumnType.String)
- private val typeChildren = TableColumnType.Set(TableColumnType.String)
-
- override fun resolve(name: String): Int {
- return when (name) {
- TASK_ID -> colID
- TASK_WORKFLOW_ID -> colWorkflowID
- TASK_SUBMIT_TIME -> colSubmitTime
- TASK_WAIT_TIME -> colWaitTime
- TASK_RUNTIME -> colRuntime
- TASK_REQ_NCPUS -> colReqNcpus
- TASK_PARENTS -> colParents
- TASK_CHILDREN -> colChildren
- TASK_GROUP_ID -> colGroupID
- TASK_USER_ID -> colUserID
- else -> -1
- }
- }
-
- override fun isNull(index: Int): Boolean {
- require(index in colID..colUserID) { "Invalid column index" }
- return false
- }
-
- override fun getBoolean(index: Int): Boolean {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getInt(index: Int): Int {
- val record = checkNotNull(record) { "Reader in invalid state" }
-
- return when (index) {
- colReqNcpus -> record.requestedCpus
- colGroupID -> record.groupId
- colUserID -> record.userId
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun getLong(index: Int): Long {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getFloat(index: Int): Float {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getDouble(index: Int): Double {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getString(index: Int): String {
- val record = checkNotNull(record) { "Reader in invalid state" }
- return when (index) {
- colID -> record.id
- colWorkflowID -> record.workflowId
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun getUUID(index: Int): UUID? {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getInstant(index: Int): Instant {
- val record = checkNotNull(record) { "Reader in invalid state" }
- return when (index) {
- colSubmitTime -> record.submitTime
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun getDuration(index: Int): Duration {
- val record = checkNotNull(record) { "Reader in invalid state" }
- return when (index) {
- colWaitTime -> record.waitTime
- colRuntime -> record.runtime
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun <T> getList(
- index: Int,
- elementType: Class<T>,
- ): List<T>? {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun <T> getSet(
- index: Int,
- elementType: Class<T>,
- ): Set<T>? {
- val record = checkNotNull(record) { "Reader in invalid state" }
- return when (index) {
- colParents -> typeParents.convertTo(record.parents, elementType)
- colChildren -> typeChildren.convertTo(record.children, elementType)
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun <K, V> getMap(
- index: Int,
- keyType: Class<K>,
- valueType: Class<V>,
- ): Map<K, V>? {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun close() {
- reader.close()
- }
-}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/WtfTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/WtfTraceFormat.kt
deleted file mode 100644
index 1386d2ef..00000000
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/WtfTraceFormat.kt
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.wtf
-
-import org.opendc.trace.TableColumn
-import org.opendc.trace.TableColumnType
-import org.opendc.trace.TableReader
-import org.opendc.trace.TableWriter
-import org.opendc.trace.conv.TABLE_TASKS
-import org.opendc.trace.conv.TASK_CHILDREN
-import org.opendc.trace.conv.TASK_GROUP_ID
-import org.opendc.trace.conv.TASK_ID
-import org.opendc.trace.conv.TASK_PARENTS
-import org.opendc.trace.conv.TASK_REQ_NCPUS
-import org.opendc.trace.conv.TASK_RUNTIME
-import org.opendc.trace.conv.TASK_SUBMIT_TIME
-import org.opendc.trace.conv.TASK_USER_ID
-import org.opendc.trace.conv.TASK_WAIT_TIME
-import org.opendc.trace.conv.TASK_WORKFLOW_ID
-import org.opendc.trace.spi.TableDetails
-import org.opendc.trace.spi.TraceFormat
-import org.opendc.trace.util.parquet.LocalParquetReader
-import org.opendc.trace.wtf.parquet.TaskReadSupport
-import java.nio.file.Path
-
-/**
- * A [TraceFormat] implementation for the Workflow Trace Format (WTF).
- */
-public class WtfTraceFormat : TraceFormat {
- override val name: String = "wtf"
-
- override fun create(path: Path) {
- throw UnsupportedOperationException("Writing not supported for this format")
- }
-
- override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS)
-
- override fun getDetails(
- path: Path,
- table: String,
- ): TableDetails {
- return when (table) {
- TABLE_TASKS ->
- TableDetails(
- listOf(
- TableColumn(TASK_ID, TableColumnType.String),
- TableColumn(TASK_WORKFLOW_ID, TableColumnType.String),
- TableColumn(TASK_SUBMIT_TIME, TableColumnType.Instant),
- TableColumn(TASK_WAIT_TIME, TableColumnType.Duration),
- TableColumn(TASK_RUNTIME, TableColumnType.Duration),
- TableColumn(TASK_REQ_NCPUS, TableColumnType.Int),
- TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)),
- TableColumn(TASK_CHILDREN, TableColumnType.Set(TableColumnType.String)),
- TableColumn(TASK_GROUP_ID, TableColumnType.Int),
- TableColumn(TASK_USER_ID, TableColumnType.Int),
- ),
- )
- else -> throw IllegalArgumentException("Table $table not supported")
- }
- }
-
- override fun newReader(
- path: Path,
- table: String,
- projection: List<String>?,
- ): TableReader {
- return when (table) {
- TABLE_TASKS -> {
- val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), TaskReadSupport(projection), strictTyping = false)
- WtfTaskTableReader(reader)
- }
- else -> throw IllegalArgumentException("Table $table not supported")
- }
- }
-
- override fun newWriter(
- path: Path,
- table: String,
- ): TableWriter {
- throw UnsupportedOperationException("Writing not supported for this format")
- }
-}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/Task.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/Task.kt
deleted file mode 100644
index a1db0cab..00000000
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/Task.kt
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.wtf.parquet
-
-import java.time.Duration
-import java.time.Instant
-
-/**
- * A task in the Workflow Trace Format.
- */
-internal data class Task(
- val id: String,
- val workflowId: String,
- val submitTime: Instant,
- val waitTime: Duration,
- val runtime: Duration,
- val requestedCpus: Int,
- val groupId: Int,
- val userId: Int,
- val parents: Set<String>,
- val children: Set<String>,
-)
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/TaskReadSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/TaskReadSupport.kt
deleted file mode 100644
index 1f9c506d..00000000
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/TaskReadSupport.kt
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.wtf.parquet
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.parquet.hadoop.api.InitContext
-import org.apache.parquet.hadoop.api.ReadSupport
-import org.apache.parquet.io.api.RecordMaterializer
-import org.apache.parquet.schema.LogicalTypeAnnotation
-import org.apache.parquet.schema.MessageType
-import org.apache.parquet.schema.PrimitiveType
-import org.apache.parquet.schema.Type
-import org.apache.parquet.schema.Types
-import org.opendc.trace.conv.TASK_CHILDREN
-import org.opendc.trace.conv.TASK_GROUP_ID
-import org.opendc.trace.conv.TASK_ID
-import org.opendc.trace.conv.TASK_PARENTS
-import org.opendc.trace.conv.TASK_REQ_NCPUS
-import org.opendc.trace.conv.TASK_RUNTIME
-import org.opendc.trace.conv.TASK_SUBMIT_TIME
-import org.opendc.trace.conv.TASK_USER_ID
-import org.opendc.trace.conv.TASK_WAIT_TIME
-import org.opendc.trace.conv.TASK_WORKFLOW_ID
-
-/**
- * A [ReadSupport] instance for [Task] objects.
- *
- * @param projection The projection of the table to read.
- */
-internal class TaskReadSupport(private val projection: List<String>?) : ReadSupport<Task>() {
- /**
- * Mapping of table columns to their Parquet column names.
- */
- private val colMap =
- mapOf(
- TASK_ID to "id",
- TASK_WORKFLOW_ID to "workflow_id",
- TASK_SUBMIT_TIME to "ts_submit",
- TASK_WAIT_TIME to "wait_time",
- TASK_RUNTIME to "runtime",
- TASK_REQ_NCPUS to "resource_amount_requested",
- TASK_PARENTS to "parents",
- TASK_CHILDREN to "children",
- TASK_GROUP_ID to "group_id",
- TASK_USER_ID to "user_id",
- )
-
- override fun init(context: InitContext): ReadContext {
- val projectedSchema =
- if (projection != null) {
- Types.buildMessage()
- .apply {
- val fieldByName = READ_SCHEMA.fields.associateBy { it.name }
-
- for (col in projection) {
- val fieldName = colMap[col] ?: continue
- addField(fieldByName.getValue(fieldName))
- }
- }
- .named(READ_SCHEMA.name)
- } else {
- READ_SCHEMA
- }
- return ReadContext(projectedSchema)
- }
-
- override fun prepareForRead(
- configuration: Configuration,
- keyValueMetaData: Map<String, String>,
- fileSchema: MessageType,
- readContext: ReadContext,
- ): RecordMaterializer<Task> = TaskRecordMaterializer(readContext.requestedSchema)
-
- companion object {
- /**
- * Parquet read schema for the "tasks" table in the trace.
- */
- @JvmStatic
- val READ_SCHEMA: MessageType =
- Types.buildMessage()
- .addFields(
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT64)
- .named("id"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT64)
- .named("workflow_id"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT64)
- .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("ts_submit"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT64)
- .named("wait_time"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT64)
- .named("runtime"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("resource_amount_requested"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT32)
- .named("user_id"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT32)
- .named("group_id"),
- Types
- .buildGroup(Type.Repetition.OPTIONAL)
- .addField(
- Types.repeatedGroup()
- .addField(Types.optional(PrimitiveType.PrimitiveTypeName.INT64).named("item"))
- .named("list"),
- )
- .`as`(LogicalTypeAnnotation.listType())
- .named("children"),
- Types
- .buildGroup(Type.Repetition.OPTIONAL)
- .addField(
- Types.repeatedGroup()
- .addField(Types.optional(PrimitiveType.PrimitiveTypeName.INT64).named("item"))
- .named("list"),
- )
- .`as`(LogicalTypeAnnotation.listType())
- .named("parents"),
- )
- .named("task")
- }
-}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/TaskRecordMaterializer.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/TaskRecordMaterializer.kt
deleted file mode 100644
index 412a4f8b..00000000
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/TaskRecordMaterializer.kt
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.wtf.parquet
-
-import org.apache.parquet.io.api.Converter
-import org.apache.parquet.io.api.GroupConverter
-import org.apache.parquet.io.api.PrimitiveConverter
-import org.apache.parquet.io.api.RecordMaterializer
-import org.apache.parquet.schema.MessageType
-import java.time.Duration
-import java.time.Instant
-import kotlin.math.roundToInt
-import kotlin.math.roundToLong
-
-/**
- * A [RecordMaterializer] for [Task] records.
- */
-internal class TaskRecordMaterializer(schema: MessageType) : RecordMaterializer<Task>() {
- /**
- * State of current record being read.
- */
- private var localID = ""
- private var localWorkflowID = ""
- private var localSubmitTime = Instant.MIN
- private var localWaitTime = Duration.ZERO
- private var localRuntime = Duration.ZERO
- private var localRequestedCpus = 0
- private var localGroupId = 0
- private var localUserId = 0
- private var localParents = mutableSetOf<String>()
- private var localChildren = mutableSetOf<String>()
-
- /**
- * Root converter for the record.
- */
- private val root =
- object : GroupConverter() {
- /**
- * The converters for the columns of the schema.
- */
- private val converters =
- schema.fields.map { type ->
- when (type.name) {
- "id" ->
- object : PrimitiveConverter() {
- override fun addLong(value: Long) {
- localID = value.toString()
- }
- }
- "workflow_id" ->
- object : PrimitiveConverter() {
- override fun addLong(value: Long) {
- localWorkflowID = value.toString()
- }
- }
- "ts_submit" ->
- object : PrimitiveConverter() {
- override fun addLong(value: Long) {
- localSubmitTime = Instant.ofEpochMilli(value)
- }
- }
- "wait_time" ->
- object : PrimitiveConverter() {
- override fun addLong(value: Long) {
- localWaitTime = Duration.ofMillis(value)
- }
- }
- "runtime" ->
- object : PrimitiveConverter() {
- override fun addLong(value: Long) {
- localRuntime = Duration.ofMillis(value)
- }
- }
- "resource_amount_requested" ->
- object : PrimitiveConverter() {
- override fun addDouble(value: Double) {
- localRequestedCpus = value.roundToInt()
- }
- }
- "group_id" ->
- object : PrimitiveConverter() {
- override fun addInt(value: Int) {
- localGroupId = value
- }
- }
- "user_id" ->
- object : PrimitiveConverter() {
- override fun addInt(value: Int) {
- localUserId = value
- }
- }
- "children" -> RelationConverter(localChildren)
- "parents" -> RelationConverter(localParents)
- else -> error("Unknown column $type")
- }
- }
-
- override fun start() {
- localID = ""
- localWorkflowID = ""
- localSubmitTime = Instant.MIN
- localWaitTime = Duration.ZERO
- localRuntime = Duration.ZERO
- localRequestedCpus = 0
- localGroupId = 0
- localUserId = 0
- localParents.clear()
- localChildren.clear()
- }
-
- override fun end() {}
-
- override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex]
- }
-
- override fun getCurrentRecord(): Task =
- Task(
- localID,
- localWorkflowID,
- localSubmitTime,
- localWaitTime,
- localRuntime,
- localRequestedCpus,
- localGroupId,
- localUserId,
- localParents.toSet(),
- localChildren.toSet(),
- )
-
- override fun getRootConverter(): GroupConverter = root
-
- /**
- * Helper class to convert parent and child relations and add them to [relations].
- */
- private class RelationConverter(private val relations: MutableSet<String>) : GroupConverter() {
- private val entryConverter =
- object : PrimitiveConverter() {
- override fun addLong(value: Long) {
- relations.add(value.toString())
- }
-
- override fun addDouble(value: Double) {
- relations.add(value.roundToLong().toString())
- }
- }
-
- private val listConverter =
- object : GroupConverter() {
- override fun getConverter(fieldIndex: Int): Converter {
- require(fieldIndex == 0)
- return entryConverter
- }
-
- override fun start() {}
-
- override fun end() {}
- }
-
- override fun getConverter(fieldIndex: Int): Converter {
- require(fieldIndex == 0)
- return listConverter
- }
-
- override fun start() {}
-
- override fun end() {}
- }
-}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt
index e586f90a..945d8f2f 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt
@@ -24,15 +24,9 @@ package org.opendc.trace.spi
import org.opendc.trace.TableReader
import org.opendc.trace.TableWriter
-import org.opendc.trace.azure.AzureTraceFormat
-import org.opendc.trace.bitbrains.BitbrainsTraceFormat
import org.opendc.trace.formats.carbon.CarbonTraceFormat
import org.opendc.trace.formats.failure.FailureTraceFormat
-import org.opendc.trace.formats.opendc.OdcVmTraceFormat
-import org.opendc.trace.gwf.GwfTraceFormat
-import org.opendc.trace.swf.SwfTraceFormat
-import org.opendc.trace.wfformat.WfFormatTraceFormat
-import org.opendc.trace.wtf.WtfTraceFormat
+import org.opendc.trace.formats.workload.WorkloadTraceFormat
import java.nio.file.Path
import java.util.ServiceLoader
@@ -122,15 +116,9 @@ public interface TraceFormat {
@JvmStatic
public fun byName(name: String): TraceFormat? {
return when (name) {
- "azure" -> AzureTraceFormat()
- "bitbrains" -> BitbrainsTraceFormat()
"carbon" -> CarbonTraceFormat()
"failure" -> FailureTraceFormat()
- "gwf" -> GwfTraceFormat()
- "opendc-vm" -> OdcVmTraceFormat()
- "swf" -> SwfTraceFormat()
- "wfformat" -> WfFormatTraceFormat()
- "wtf" -> WtfTraceFormat()
+ "workload" -> WorkloadTraceFormat()
else -> null
}
}