From 71f63618fb83c8e19ae48d5dc4a6e3927031cc10 Mon Sep 17 00:00:00 2001 From: Dante Niewenhuis Date: Tue, 4 Nov 2025 21:09:38 +0100 Subject: Memory update (#379) * Updated the memory usage of Tasks. Still in Progress. * Merged Task and ServiceTask -> Currently not fully working!!! * Fixed bugs that made the merger between Task and ServiceTask not work well. * Updated jdk version for Dockerfile * Removed ServiceFlavor.java and Task.kt --- .../trace/formats/workload/FragmentTableReader.kt | 6 +-- .../trace/formats/workload/FragmentTableWriter.kt | 6 +-- .../trace/formats/workload/TaskTableReader.kt | 10 ++--- .../trace/formats/workload/TaskTableWriter.kt | 6 +-- .../trace/formats/workload/parquet/Fragment.kt | 32 ---------------- .../workload/parquet/FragmentParquetSchema.kt | 32 ++++++++++++++++ .../workload/parquet/FragmentReadSupport.kt | 6 +-- .../workload/parquet/FragmentRecordMaterializer.kt | 8 ++-- .../workload/parquet/FragmentWriteSupport.kt | 8 ++-- .../opendc/trace/formats/workload/parquet/Task.kt | 44 ---------------------- .../formats/workload/parquet/TaskParquetSchema.kt | 44 ++++++++++++++++++++++ .../formats/workload/parquet/TaskReadSupport.kt | 6 +-- .../workload/parquet/TaskRecordMaterializer.kt | 12 +++--- .../formats/workload/parquet/TaskWriteSupport.kt | 8 ++-- 14 files changed, 114 insertions(+), 114 deletions(-) delete mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/Fragment.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentParquetSchema.kt delete mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/Task.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskParquetSchema.kt (limited to 'opendc-trace/opendc-trace-api/src/main') diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/FragmentTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/FragmentTableReader.kt index 947746c6..71ab7b64 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/FragmentTableReader.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/FragmentTableReader.kt @@ -27,7 +27,7 @@ import org.opendc.trace.conv.FRAGMENT_CPU_USAGE import org.opendc.trace.conv.FRAGMENT_DURATION import org.opendc.trace.conv.FRAGMENT_GPU_USAGE import org.opendc.trace.conv.TASK_ID -import org.opendc.trace.formats.workload.parquet.Fragment +import org.opendc.trace.formats.workload.parquet.FragmentParquetSchema import org.opendc.trace.util.parquet.LocalParquetReader import java.time.Duration import java.time.Instant @@ -36,11 +36,11 @@ import java.util.UUID /** * A [TableReader] implementation for the OpenDC virtual machine trace format. */ -internal class FragmentTableReader(private val reader: LocalParquetReader) : TableReader { +internal class FragmentTableReader(private val reader: LocalParquetReader) : TableReader { /** * The current record. */ - private var record: Fragment? = null + private var record: FragmentParquetSchema? = null override fun nextRow(): Boolean { try { diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/FragmentTableWriter.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/FragmentTableWriter.kt index 33cd9e17..154e5bf4 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/FragmentTableWriter.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/FragmentTableWriter.kt @@ -28,7 +28,7 @@ import org.opendc.trace.conv.FRAGMENT_CPU_USAGE import org.opendc.trace.conv.FRAGMENT_DURATION import org.opendc.trace.conv.FRAGMENT_GPU_USAGE import org.opendc.trace.conv.TASK_ID -import org.opendc.trace.formats.workload.parquet.Fragment +import org.opendc.trace.formats.workload.parquet.FragmentParquetSchema import java.time.Duration import java.time.Instant import java.util.UUID @@ -36,7 +36,7 @@ import java.util.UUID /** * A [TableWriter] implementation for the OpenDC virtual machine trace format. */ -internal class FragmentTableWriter(private val writer: ParquetWriter) : TableWriter { +internal class FragmentTableWriter(private val writer: ParquetWriter) : TableWriter { /** * The current state for the record that is being written. */ @@ -60,7 +60,7 @@ internal class FragmentTableWriter(private val writer: ParquetWriter) check(lastId != localID) { "Records need to be ordered by (id, timestamp)" } - writer.write(Fragment(localID, localDuration, localCpuUsage, localGpuUsage)) + writer.write(FragmentParquetSchema(localID, localDuration, localCpuUsage, localGpuUsage)) lastId = localID } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableReader.kt index 51ab9242..97b48232 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableReader.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableReader.kt @@ -37,7 +37,7 @@ import org.opendc.trace.conv.TASK_MEM_CAPACITY import org.opendc.trace.conv.TASK_NAME import org.opendc.trace.conv.TASK_PARENTS import org.opendc.trace.conv.TASK_SUBMISSION_TIME -import org.opendc.trace.formats.workload.parquet.Task +import org.opendc.trace.formats.workload.parquet.TaskParquetSchema import org.opendc.trace.util.convertTo import org.opendc.trace.util.parquet.LocalParquetReader import java.time.Duration @@ -47,11 +47,11 @@ import java.util.UUID /** * A [TableReader] implementation for the "resources table" in the OpenDC virtual machine trace format. */ -internal class TaskTableReader(private val reader: LocalParquetReader) : TableReader { +internal class TaskTableReader(private val reader: LocalParquetReader) : TableReader { /** * The current record. */ - private var record: Task? = null + private var record: TaskParquetSchema? = null override fun nextRow(): Boolean { try { @@ -163,7 +163,7 @@ internal class TaskTableReader(private val reader: LocalParquetReader) : T } } - override fun getUUID(index: Int): UUID? { + override fun getUUID(index: Int): UUID { throw IllegalArgumentException("Invalid column") } @@ -176,7 +176,7 @@ internal class TaskTableReader(private val reader: LocalParquetReader) : T } } - override fun getDuration(index: Int): Duration? { + override fun getDuration(index: Int): Duration { throw IllegalArgumentException("Invalid column") } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableWriter.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableWriter.kt index 5e57fd84..7c34c1b5 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableWriter.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableWriter.kt @@ -37,7 +37,7 @@ import org.opendc.trace.conv.TASK_MEM_CAPACITY import org.opendc.trace.conv.TASK_NAME import org.opendc.trace.conv.TASK_PARENTS import org.opendc.trace.conv.TASK_SUBMISSION_TIME -import org.opendc.trace.formats.workload.parquet.Task +import org.opendc.trace.formats.workload.parquet.TaskParquetSchema import java.time.Duration import java.time.Instant import java.util.UUID @@ -45,7 +45,7 @@ import java.util.UUID /** * A [TableWriter] implementation for the OpenDC virtual machine trace format. */ -internal class TaskTableWriter(private val writer: ParquetWriter) : TableWriter { +internal class TaskTableWriter(private val writer: ParquetWriter) : TableWriter { /** * The current state for the record that is being written. */ @@ -85,7 +85,7 @@ internal class TaskTableWriter(private val writer: ParquetWriter) : TableW check(localIsActive) { "No active row" } localIsActive = false writer.write( - Task( + TaskParquetSchema( localId, localName, localSubmissionTime, diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/Fragment.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/Fragment.kt deleted file mode 100644 index 44385088..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/Fragment.kt +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.formats.workload.parquet - -import java.time.Duration - -internal class Fragment( - val id: Int, - val duration: Duration, - val cpuUsage: Double, - val gpuUsage: Double, -) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentParquetSchema.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentParquetSchema.kt new file mode 100644 index 00000000..e5e0a134 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentParquetSchema.kt @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.formats.workload.parquet + +import java.time.Duration + +internal class FragmentParquetSchema( + val id: Int, + val duration: Duration, + val cpuUsage: Double, + val gpuUsage: Double, +) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentReadSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentReadSupport.kt index 3fa914bc..1166b980 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentReadSupport.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentReadSupport.kt @@ -34,9 +34,9 @@ import org.opendc.trace.conv.FRAGMENT_DURATION import org.opendc.trace.conv.TASK_ID /** - * A [ReadSupport] instance for [Fragment] objects. + * A [ReadSupport] instance for [FragmentParquetSchema] objects. */ -internal class FragmentReadSupport(private val projection: List?) : ReadSupport() { +internal class FragmentReadSupport(private val projection: List?) : ReadSupport() { /** * Mapping from field names to [TableColumn]s. */ @@ -75,5 +75,5 @@ internal class FragmentReadSupport(private val projection: List?) : Read keyValueMetaData: Map, fileSchema: MessageType, readContext: ReadContext, - ): RecordMaterializer = FragmentRecordMaterializer(readContext.requestedSchema) + ): RecordMaterializer = FragmentRecordMaterializer(readContext.requestedSchema) } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentRecordMaterializer.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentRecordMaterializer.kt index 7902cab1..e220d527 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentRecordMaterializer.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentRecordMaterializer.kt @@ -31,9 +31,9 @@ import java.time.Duration import java.time.Instant /** - * A [RecordMaterializer] for [Fragment] records. + * A [RecordMaterializer] for [FragmentParquetSchema] records. */ -internal class FragmentRecordMaterializer(schema: MessageType) : RecordMaterializer() { +internal class FragmentRecordMaterializer(schema: MessageType) : RecordMaterializer() { /** * State of current record being read. */ @@ -116,8 +116,8 @@ internal class FragmentRecordMaterializer(schema: MessageType) : RecordMateriali override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex] } - override fun getCurrentRecord(): Fragment = - Fragment( + override fun getCurrentRecord(): FragmentParquetSchema = + FragmentParquetSchema( localId, localDuration, localCpuUsage, diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentWriteSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentWriteSupport.kt index e6b7ba4f..06e2cfc3 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentWriteSupport.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/FragmentWriteSupport.kt @@ -31,9 +31,9 @@ import org.apache.parquet.schema.PrimitiveType import org.apache.parquet.schema.Types /** - * Support for writing [Task] instances to Parquet format. + * Support for writing [TaskParquetSchema] instances to Parquet format. */ -internal class FragmentWriteSupport : WriteSupport() { +internal class FragmentWriteSupport : WriteSupport() { /** * The current active record consumer. */ @@ -47,13 +47,13 @@ internal class FragmentWriteSupport : WriteSupport() { this.recordConsumer = recordConsumer } - override fun write(record: Fragment) { + override fun write(record: FragmentParquetSchema) { write(recordConsumer, record) } private fun write( consumer: RecordConsumer, - record: Fragment, + record: FragmentParquetSchema, ) { consumer.startMessage() diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/Task.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/Task.kt deleted file mode 100644 index ccc44bde..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/Task.kt +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.formats.workload.parquet - -import java.time.Instant - -/** - * A description of a resource in a trace. - */ -internal data class Task( - val id: Int, - val name: String, - val submissionTime: Instant, - val durationTime: Long, - val cpuCount: Int, - val cpuCapacity: Double, - val memCapacity: Double, - val gpuCount: Int = 0, - val gpuCapacity: Double = 0.0, - val parents: MutableSet = mutableSetOf(), - val children: Set = emptySet(), - val deferrable: Boolean = false, - val deadline: Long = -1, -) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskParquetSchema.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskParquetSchema.kt new file mode 100644 index 00000000..452bda57 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskParquetSchema.kt @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.formats.workload.parquet + +import java.time.Instant + +/** + * A description of a resource in a trace. + */ +internal data class TaskParquetSchema( + val id: Int, + val name: String?, + val submissionTime: Instant, + val durationTime: Long, + val cpuCount: Int, + val cpuCapacity: Double, + val memCapacity: Double, + val gpuCount: Int = 0, + val gpuCapacity: Double = 0.0, + val parents: MutableSet = mutableSetOf(), + val children: Set = emptySet(), + val deferrable: Boolean = false, + val deadline: Long = -1, +) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskReadSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskReadSupport.kt index 5b743fbe..c8917cf4 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskReadSupport.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskReadSupport.kt @@ -44,9 +44,9 @@ import org.opendc.trace.conv.TASK_PARENTS import org.opendc.trace.conv.TASK_SUBMISSION_TIME /** - * A [ReadSupport] instance for [Task] objects. + * A [ReadSupport] instance for [TaskParquetSchema] objects. */ -internal class TaskReadSupport(private val projection: List?) : ReadSupport() { +internal class TaskReadSupport(private val projection: List?) : ReadSupport() { /** * Mapping from field names to [TableColumn]s. */ @@ -97,5 +97,5 @@ internal class TaskReadSupport(private val projection: List?) : ReadSupp keyValueMetaData: Map, fileSchema: MessageType, readContext: ReadContext, - ): RecordMaterializer = TaskRecordMaterializer(readContext.requestedSchema) + ): RecordMaterializer = TaskRecordMaterializer(readContext.requestedSchema) } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskRecordMaterializer.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskRecordMaterializer.kt index b4946ed3..520e7858 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskRecordMaterializer.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskRecordMaterializer.kt @@ -31,14 +31,14 @@ import org.apache.parquet.schema.MessageType import java.time.Instant /** - * A [RecordMaterializer] for [Task] records. + * A [RecordMaterializer] for [TaskParquetSchema] records. */ -internal class TaskRecordMaterializer(schema: MessageType) : RecordMaterializer() { +internal class TaskRecordMaterializer(schema: MessageType) : RecordMaterializer() { /** * State of current record being read. */ private var localId = -99 - private var localName = "" + private var localName: String? = null private var localSubmissionTime = Instant.MIN private var localDuration = 0L private var localCpuCount = 0 @@ -140,7 +140,7 @@ internal class TaskRecordMaterializer(schema: MessageType) : RecordMaterializer< override fun start() { localId = -99 - localName = "" + localName = null localSubmissionTime = Instant.MIN localDuration = 0L localCpuCount = 0 @@ -159,8 +159,8 @@ internal class TaskRecordMaterializer(schema: MessageType) : RecordMaterializer< override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex] } - override fun getCurrentRecord(): Task = - Task( + override fun getCurrentRecord(): TaskParquetSchema = + TaskParquetSchema( localId, localName, localSubmissionTime, diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskWriteSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskWriteSupport.kt index c245f804..ae83e9d4 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskWriteSupport.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskWriteSupport.kt @@ -28,9 +28,9 @@ import org.apache.parquet.io.api.RecordConsumer import kotlin.math.roundToLong /** - * Support for writing [Task] instances to Parquet format. + * Support for writing [TaskParquetSchema] instances to Parquet format. */ -internal class TaskWriteSupport : WriteSupport() { +internal class TaskWriteSupport : WriteSupport() { /** * The current active record consumer. */ @@ -44,13 +44,13 @@ internal class TaskWriteSupport : WriteSupport() { this.recordConsumer = recordConsumer } - override fun write(record: Task) { + override fun write(record: TaskParquetSchema) { write(recordConsumer, record) } private fun write( consumer: RecordConsumer, - record: Task, + record: TaskParquetSchema, ) { consumer.startMessage() -- cgit v1.2.3