summaryrefslogtreecommitdiff
path: root/opendc-trace/opendc-trace-api/src
diff options
context:
space:
mode:
authorDante Niewenhuis <d.niewenhuis@hotmail.com>2025-11-04 21:09:38 +0100
committerGitHub <noreply@github.com>2025-11-04 21:09:38 +0100
commit71f63618fb83c8e19ae48d5dc4a6e3927031cc10 (patch)
tree6bf4048b1e683bbcac53e162be787e80828e48e2 /opendc-trace/opendc-trace-api/src
parent59898b873eabc72719376854770c55e8d8efaa0f (diff)
Memory update (#379)
* Updated the memory usage of Tasks. Still in Progress. * Merged Task and ServiceTask -> Currently not fully working!!! * Fixed bugs that made the merger between Task and ServiceTask not work well. * Updated jdk version for Dockerfile * Removed ServiceFlavor.java and Task.kt
Diffstat (limited to 'opendc-trace/opendc-trace-api/src')
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/FragmentTableReader.kt6
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/FragmentTableWriter.kt6
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableReader.kt10
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableWriter.kt6
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentParquetSchema.kt (renamed from opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/Fragment.kt)2
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentReadSupport.kt6
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentRecordMaterializer.kt8
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentWriteSupport.kt8
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskParquetSchema.kt (renamed from opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/Task.kt)4
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskReadSupport.kt6
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskRecordMaterializer.kt12
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskWriteSupport.kt8
12 files changed, 41 insertions, 41 deletions
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/FragmentTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/FragmentTableReader.kt
index 947746c6..71ab7b64 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/FragmentTableReader.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/FragmentTableReader.kt
@@ -27,7 +27,7 @@ import org.opendc.trace.conv.FRAGMENT_CPU_USAGE
import org.opendc.trace.conv.FRAGMENT_DURATION
import org.opendc.trace.conv.FRAGMENT_GPU_USAGE
import org.opendc.trace.conv.TASK_ID
-import org.opendc.trace.formats.workload.parquet.Fragment
+import org.opendc.trace.formats.workload.parquet.FragmentParquetSchema
import org.opendc.trace.util.parquet.LocalParquetReader
import java.time.Duration
import java.time.Instant
@@ -36,11 +36,11 @@ import java.util.UUID
/**
* A [TableReader] implementation for the OpenDC virtual machine trace format.
*/
-internal class FragmentTableReader(private val reader: LocalParquetReader<Fragment>) : TableReader {
+internal class FragmentTableReader(private val reader: LocalParquetReader<FragmentParquetSchema>) : TableReader {
/**
* The current record.
*/
- private var record: Fragment? = null
+ private var record: FragmentParquetSchema? = null
override fun nextRow(): Boolean {
try {
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/FragmentTableWriter.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/FragmentTableWriter.kt
index 33cd9e17..154e5bf4 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/FragmentTableWriter.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/FragmentTableWriter.kt
@@ -28,7 +28,7 @@ import org.opendc.trace.conv.FRAGMENT_CPU_USAGE
import org.opendc.trace.conv.FRAGMENT_DURATION
import org.opendc.trace.conv.FRAGMENT_GPU_USAGE
import org.opendc.trace.conv.TASK_ID
-import org.opendc.trace.formats.workload.parquet.Fragment
+import org.opendc.trace.formats.workload.parquet.FragmentParquetSchema
import java.time.Duration
import java.time.Instant
import java.util.UUID
@@ -36,7 +36,7 @@ import java.util.UUID
/**
* A [TableWriter] implementation for the OpenDC virtual machine trace format.
*/
-internal class FragmentTableWriter(private val writer: ParquetWriter<Fragment>) : TableWriter {
+internal class FragmentTableWriter(private val writer: ParquetWriter<FragmentParquetSchema>) : TableWriter {
/**
* The current state for the record that is being written.
*/
@@ -60,7 +60,7 @@ internal class FragmentTableWriter(private val writer: ParquetWriter<Fragment>)
check(lastId != localID) { "Records need to be ordered by (id, timestamp)" }
- writer.write(Fragment(localID, localDuration, localCpuUsage, localGpuUsage))
+ writer.write(FragmentParquetSchema(localID, localDuration, localCpuUsage, localGpuUsage))
lastId = localID
}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableReader.kt
index 51ab9242..97b48232 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableReader.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableReader.kt
@@ -37,7 +37,7 @@ import org.opendc.trace.conv.TASK_MEM_CAPACITY
import org.opendc.trace.conv.TASK_NAME
import org.opendc.trace.conv.TASK_PARENTS
import org.opendc.trace.conv.TASK_SUBMISSION_TIME
-import org.opendc.trace.formats.workload.parquet.Task
+import org.opendc.trace.formats.workload.parquet.TaskParquetSchema
import org.opendc.trace.util.convertTo
import org.opendc.trace.util.parquet.LocalParquetReader
import java.time.Duration
@@ -47,11 +47,11 @@ import java.util.UUID
/**
* A [TableReader] implementation for the "resources table" in the OpenDC virtual machine trace format.
*/
-internal class TaskTableReader(private val reader: LocalParquetReader<Task>) : TableReader {
+internal class TaskTableReader(private val reader: LocalParquetReader<TaskParquetSchema>) : TableReader {
/**
* The current record.
*/
- private var record: Task? = null
+ private var record: TaskParquetSchema? = null
override fun nextRow(): Boolean {
try {
@@ -163,7 +163,7 @@ internal class TaskTableReader(private val reader: LocalParquetReader<Task>) : T
}
}
- override fun getUUID(index: Int): UUID? {
+ override fun getUUID(index: Int): UUID {
throw IllegalArgumentException("Invalid column")
}
@@ -176,7 +176,7 @@ internal class TaskTableReader(private val reader: LocalParquetReader<Task>) : T
}
}
- override fun getDuration(index: Int): Duration? {
+ override fun getDuration(index: Int): Duration {
throw IllegalArgumentException("Invalid column")
}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableWriter.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableWriter.kt
index 5e57fd84..7c34c1b5 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableWriter.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableWriter.kt
@@ -37,7 +37,7 @@ import org.opendc.trace.conv.TASK_MEM_CAPACITY
import org.opendc.trace.conv.TASK_NAME
import org.opendc.trace.conv.TASK_PARENTS
import org.opendc.trace.conv.TASK_SUBMISSION_TIME
-import org.opendc.trace.formats.workload.parquet.Task
+import org.opendc.trace.formats.workload.parquet.TaskParquetSchema
import java.time.Duration
import java.time.Instant
import java.util.UUID
@@ -45,7 +45,7 @@ import java.util.UUID
/**
* A [TableWriter] implementation for the OpenDC virtual machine trace format.
*/
-internal class TaskTableWriter(private val writer: ParquetWriter<Task>) : TableWriter {
+internal class TaskTableWriter(private val writer: ParquetWriter<TaskParquetSchema>) : TableWriter {
/**
* The current state for the record that is being written.
*/
@@ -85,7 +85,7 @@ internal class TaskTableWriter(private val writer: ParquetWriter<Task>) : TableW
check(localIsActive) { "No active row" }
localIsActive = false
writer.write(
- Task(
+ TaskParquetSchema(
localId,
localName,
localSubmissionTime,
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/Fragment.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentParquetSchema.kt
index 44385088..e5e0a134 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/Fragment.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentParquetSchema.kt
@@ -24,7 +24,7 @@ package org.opendc.trace.formats.workload.parquet
import java.time.Duration
-internal class Fragment(
+internal class FragmentParquetSchema(
val id: Int,
val duration: Duration,
val cpuUsage: Double,
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentReadSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentReadSupport.kt
index 3fa914bc..1166b980 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentReadSupport.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentReadSupport.kt
@@ -34,9 +34,9 @@ import org.opendc.trace.conv.FRAGMENT_DURATION
import org.opendc.trace.conv.TASK_ID
/**
- * A [ReadSupport] instance for [Fragment] objects.
+ * A [ReadSupport] instance for [FragmentParquetSchema] objects.
*/
-internal class FragmentReadSupport(private val projection: List<String>?) : ReadSupport<Fragment>() {
+internal class FragmentReadSupport(private val projection: List<String>?) : ReadSupport<FragmentParquetSchema>() {
/**
* Mapping from field names to [TableColumn]s.
*/
@@ -75,5 +75,5 @@ internal class FragmentReadSupport(private val projection: List<String>?) : Read
keyValueMetaData: Map<String, String>,
fileSchema: MessageType,
readContext: ReadContext,
- ): RecordMaterializer<Fragment> = FragmentRecordMaterializer(readContext.requestedSchema)
+ ): RecordMaterializer<FragmentParquetSchema> = FragmentRecordMaterializer(readContext.requestedSchema)
}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentRecordMaterializer.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentRecordMaterializer.kt
index 7902cab1..e220d527 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentRecordMaterializer.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentRecordMaterializer.kt
@@ -31,9 +31,9 @@ import java.time.Duration
import java.time.Instant
/**
- * A [RecordMaterializer] for [Fragment] records.
+ * A [RecordMaterializer] for [FragmentParquetSchema] records.
*/
-internal class FragmentRecordMaterializer(schema: MessageType) : RecordMaterializer<Fragment>() {
+internal class FragmentRecordMaterializer(schema: MessageType) : RecordMaterializer<FragmentParquetSchema>() {
/**
* State of current record being read.
*/
@@ -116,8 +116,8 @@ internal class FragmentRecordMaterializer(schema: MessageType) : RecordMateriali
override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex]
}
- override fun getCurrentRecord(): Fragment =
- Fragment(
+ override fun getCurrentRecord(): FragmentParquetSchema =
+ FragmentParquetSchema(
localId,
localDuration,
localCpuUsage,
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentWriteSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentWriteSupport.kt
index e6b7ba4f..06e2cfc3 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentWriteSupport.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentWriteSupport.kt
@@ -31,9 +31,9 @@ import org.apache.parquet.schema.PrimitiveType
import org.apache.parquet.schema.Types
/**
- * Support for writing [Task] instances to Parquet format.
+ * Support for writing [TaskParquetSchema] instances to Parquet format.
*/
-internal class FragmentWriteSupport : WriteSupport<Fragment>() {
+internal class FragmentWriteSupport : WriteSupport<FragmentParquetSchema>() {
/**
* The current active record consumer.
*/
@@ -47,13 +47,13 @@ internal class FragmentWriteSupport : WriteSupport<Fragment>() {
this.recordConsumer = recordConsumer
}
- override fun write(record: Fragment) {
+ override fun write(record: FragmentParquetSchema) {
write(recordConsumer, record)
}
private fun write(
consumer: RecordConsumer,
- record: Fragment,
+ record: FragmentParquetSchema,
) {
consumer.startMessage()
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/Task.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskParquetSchema.kt
index ccc44bde..452bda57 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/Task.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskParquetSchema.kt
@@ -27,9 +27,9 @@ import java.time.Instant
/**
* A description of a resource in a trace.
*/
-internal data class Task(
+internal data class TaskParquetSchema(
val id: Int,
- val name: String,
+ val name: String?,
val submissionTime: Instant,
val durationTime: Long,
val cpuCount: Int,
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskReadSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskReadSupport.kt
index 5b743fbe..c8917cf4 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskReadSupport.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskReadSupport.kt
@@ -44,9 +44,9 @@ import org.opendc.trace.conv.TASK_PARENTS
import org.opendc.trace.conv.TASK_SUBMISSION_TIME
/**
- * A [ReadSupport] instance for [Task] objects.
+ * A [ReadSupport] instance for [TaskParquetSchema] objects.
*/
-internal class TaskReadSupport(private val projection: List<String>?) : ReadSupport<Task>() {
+internal class TaskReadSupport(private val projection: List<String>?) : ReadSupport<TaskParquetSchema>() {
/**
* Mapping from field names to [TableColumn]s.
*/
@@ -97,5 +97,5 @@ internal class TaskReadSupport(private val projection: List<String>?) : ReadSupp
keyValueMetaData: Map<String, String>,
fileSchema: MessageType,
readContext: ReadContext,
- ): RecordMaterializer<Task> = TaskRecordMaterializer(readContext.requestedSchema)
+ ): RecordMaterializer<TaskParquetSchema> = TaskRecordMaterializer(readContext.requestedSchema)
}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskRecordMaterializer.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskRecordMaterializer.kt
index b4946ed3..520e7858 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskRecordMaterializer.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskRecordMaterializer.kt
@@ -31,14 +31,14 @@ import org.apache.parquet.schema.MessageType
import java.time.Instant
/**
- * A [RecordMaterializer] for [Task] records.
+ * A [RecordMaterializer] for [TaskParquetSchema] records.
*/
-internal class TaskRecordMaterializer(schema: MessageType) : RecordMaterializer<Task>() {
+internal class TaskRecordMaterializer(schema: MessageType) : RecordMaterializer<TaskParquetSchema>() {
/**
* State of current record being read.
*/
private var localId = -99
- private var localName = ""
+ private var localName: String? = null
private var localSubmissionTime = Instant.MIN
private var localDuration = 0L
private var localCpuCount = 0
@@ -140,7 +140,7 @@ internal class TaskRecordMaterializer(schema: MessageType) : RecordMaterializer<
override fun start() {
localId = -99
- localName = ""
+ localName = null
localSubmissionTime = Instant.MIN
localDuration = 0L
localCpuCount = 0
@@ -159,8 +159,8 @@ internal class TaskRecordMaterializer(schema: MessageType) : RecordMaterializer<
override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex]
}
- override fun getCurrentRecord(): Task =
- Task(
+ override fun getCurrentRecord(): TaskParquetSchema =
+ TaskParquetSchema(
localId,
localName,
localSubmissionTime,
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskWriteSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskWriteSupport.kt
index c245f804..ae83e9d4 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskWriteSupport.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskWriteSupport.kt
@@ -28,9 +28,9 @@ import org.apache.parquet.io.api.RecordConsumer
import kotlin.math.roundToLong
/**
- * Support for writing [Task] instances to Parquet format.
+ * Support for writing [TaskParquetSchema] instances to Parquet format.
*/
-internal class TaskWriteSupport : WriteSupport<Task>() {
+internal class TaskWriteSupport : WriteSupport<TaskParquetSchema>() {
/**
* The current active record consumer.
*/
@@ -44,13 +44,13 @@ internal class TaskWriteSupport : WriteSupport<Task>() {
this.recordConsumer = recordConsumer
}
- override fun write(record: Task) {
+ override fun write(record: TaskParquetSchema) {
write(recordConsumer, record)
}
private fun write(
consumer: RecordConsumer,
- record: Task,
+ record: TaskParquetSchema,
) {
consumer.startMessage()