summaryrefslogtreecommitdiff
path: root/opendc
diff options
context:
space:
mode:
Diffstat (limited to 'opendc')
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt2
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt18
-rw-r--r--opendc/opendc-experiments-sc20/build.gradle.kts2
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt229
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt7
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt190
-rw-r--r--opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt11
7 files changed, 441 insertions, 18 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
index b37f05a7..e3227540 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
@@ -23,7 +23,7 @@ class VmImage(
val clock = simulationContext.clock
val job = coroutineContext[Job]!!
- for (fragments in flopsHistory.chunked(1024)) {
+ for (fragments in flopsHistory.chunked(128)) {
for (fragment in fragments) {
job.ensureActive()
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
index 2c25c0fa..8a32bc43 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
@@ -57,6 +57,9 @@ import kotlinx.coroutines.selects.select
import java.util.Objects
import java.util.TreeSet
import java.util.UUID
+import kotlin.coroutines.Continuation
+import kotlin.coroutines.resume
+import kotlin.coroutines.suspendCoroutine
import kotlin.math.ceil
import kotlin.math.max
import kotlin.math.min
@@ -325,7 +328,7 @@ class SimpleVirtDriver(
requests.removeAll(vmRequests)
// Return vCPU `run` call: the requested burst was completed or deadline was exceeded
- vm.chan.send(Unit)
+ vm.chan?.resume(Unit)
}
}
@@ -371,7 +374,7 @@ class SimpleVirtDriver(
val vm: VmServerContext,
val vcpu: ProcessingUnit,
var burst: Long,
- val limit: Double
+ var limit: Double
) {
/**
* The usage that was actually granted.
@@ -395,7 +398,7 @@ class SimpleVirtDriver(
private var finalized: Boolean = false
lateinit var burst: LongArray
var deadline: Long = 0L
- val chan: Channel<Unit> = Channel(Channel.CONFLATED)
+ var chan: Continuation<Unit>? = null
private var initialized: Boolean = false
internal val job: Job = launch {
@@ -452,6 +455,7 @@ class SimpleVirtDriver(
require(burst.size == limit.size) { "Array dimensions do not match" }
this.deadline = deadline
this.burst = burst
+
val requests = cpus.asSequence()
.take(burst.size)
.mapIndexed { i, cpu ->
@@ -466,13 +470,13 @@ class SimpleVirtDriver(
// Wait until the burst has been run or the coroutine is cancelled
try {
- schedulingQueue.send(SchedulerCommand.Schedule(this, requests))
- chan.receive()
+ schedulingQueue.offer(SchedulerCommand.Schedule(this, requests))
+ suspendCoroutine<Unit> { chan = it }
} catch (e: CancellationException) {
// Deschedule the VM
requests.forEach { it.isCancelled = true }
- schedulingQueue.send(SchedulerCommand.Interrupt)
- chan.receive()
+ schedulingQueue.offer(SchedulerCommand.Interrupt)
+ suspendCoroutine<Unit> { chan = it }
e.assertFailure()
}
}
diff --git a/opendc/opendc-experiments-sc20/build.gradle.kts b/opendc/opendc-experiments-sc20/build.gradle.kts
index 4b73cedd..a78ea745 100644
--- a/opendc/opendc-experiments-sc20/build.gradle.kts
+++ b/opendc/opendc-experiments-sc20/build.gradle.kts
@@ -32,7 +32,7 @@ plugins {
application {
mainClassName = "com.atlarge.opendc.experiments.sc20.TestExperimentKt"
- applicationDefaultJvmArgs = listOf("-Xmx3096M")
+ applicationDefaultJvmArgs = listOf("-Xmx2200M", "-Xms1800M")
}
dependencies {
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt
new file mode 100644
index 00000000..30456204
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt
@@ -0,0 +1,229 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20
+
+import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment
+import com.atlarge.opendc.compute.core.image.VmImage
+import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL
+import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
+import com.atlarge.opendc.compute.core.workload.VmWorkload
+import com.atlarge.opendc.core.User
+import com.atlarge.opendc.format.trace.TraceEntry
+import com.atlarge.opendc.format.trace.TraceReader
+import org.apache.avro.generic.GenericData
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.avro.AvroParquetReader
+import org.apache.parquet.filter2.compat.FilterCompat
+import org.apache.parquet.filter2.predicate.FilterApi
+import org.apache.parquet.filter2.predicate.Statistics
+import org.apache.parquet.filter2.predicate.UserDefinedPredicate
+import org.apache.parquet.hadoop.ParquetReader
+import org.apache.parquet.io.api.Binary
+import java.io.File
+import java.io.Serializable
+import java.util.Deque
+import java.util.SortedSet
+import java.util.TreeSet
+import java.util.UUID
+import java.util.concurrent.LinkedBlockingDeque
+import kotlin.random.Random
+
+/**
+ * A [TraceReader] for the internal VM workload trace format.
+ *
+ * @param traceFile The directory of the traces.
+ * @param performanceInterferenceModel The performance model covering the workload in the VM trace.
+ */
+@OptIn(ExperimentalStdlibApi::class)
+class Sc20ParquetTraceReader(
+ traceFile: File,
+ performanceInterferenceModel: PerformanceInterferenceModel,
+ selectedVms: List<String>,
+ random: Random
+) : TraceReader<VmWorkload> {
+ /**
+ * The internal iterator to use for this reader.
+ */
+ private val iterator: Iterator<TraceEntry<VmWorkload>>
+
+ /**
+ * Fill the buffers of the VMs
+ */
+ private fun pull(reader: ParquetReader<GenericData.Record>, buffers: Map<String, Deque<FlopsHistoryFragment>>) {
+ if (!hasNext) {
+ return
+ }
+
+ repeat(buffers.size) {
+ val record = reader.read()
+
+ if (record == null) {
+ hasNext = false
+ reader.close()
+ return
+ }
+
+ val id = record["id"].toString()
+ val tick = record["time"] as Long
+ val duration = record["duration"] as Long
+ val cores = record["cores"] as Int
+ val cpuUsage = record["cpuUsage"] as Double
+ val flops = record["flops"] as Long
+
+ val fragment = FlopsHistoryFragment(
+ tick,
+ flops,
+ duration,
+ cpuUsage,
+ cores
+ )
+
+ buffers[id]?.add(fragment)
+ }
+ }
+
+ /**
+ * A flag to indicate whether the reader has more entries.
+ */
+ private var hasNext: Boolean = true
+
+ /**
+ * Initialize the reader.
+ */
+ init {
+ val entries = mutableMapOf<UUID, TraceEntry<VmWorkload>>()
+ val buffers = mutableMapOf<String, Deque<FlopsHistoryFragment>>()
+
+ val filter =
+ if (selectedVms.isEmpty())
+ null
+ else
+ FilterCompat.get(FilterApi.userDefined(FilterApi.binaryColumn("id"), SelectedVmFilter(TreeSet(selectedVms))))
+
+ val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "meta.parquet"))
+ .disableCompatibility()
+ .run { if (filter != null) withFilter(filter) else this }
+ .build()
+
+ val reader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "trace.parquet"))
+ .disableCompatibility()
+ .run { if (filter != null) withFilter(filter) else this }
+ .build()
+
+ var idx = 0
+ while (true) {
+ val record = metaReader.read() ?: break
+ val id = record["id"].toString()
+ val submissionTime = record["submissionTime"] as Long
+ val maxCores = record["maxCores"] as Int
+ val requiredMemory = record["requiredMemory"] as Long
+
+ println(id)
+
+ val buffer = LinkedBlockingDeque<FlopsHistoryFragment>()
+ buffers[id] = buffer
+ val fragments = sequence<FlopsHistoryFragment> {
+ while (true) {
+ if (buffer.isEmpty()) {
+ if (hasNext) {
+ pull(reader, buffers)
+ continue
+ } else {
+ break
+ }
+ }
+ yield(buffer.poll())
+ }
+ }
+ val uuid = UUID(0, (idx++).toLong())
+ val relevantPerformanceInterferenceModelItems =
+ PerformanceInterferenceModel(
+ performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSet(),
+ Random(random.nextInt())
+ )
+ val vmWorkload = VmWorkload(
+ uuid, "VM Workload $id", UnnamedUser,
+ VmImage(
+ uuid,
+ id,
+ mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems),
+ fragments,
+ maxCores,
+ requiredMemory
+ )
+ )
+
+ entries[uuid] = TraceEntryImpl(
+ submissionTime,
+ vmWorkload
+ )
+ }
+
+ metaReader.close()
+
+ // Create the entry iterator
+ iterator = entries.values.sortedBy { it.submissionTime }.iterator()
+ }
+
+ override fun hasNext(): Boolean = iterator.hasNext()
+
+ override fun next(): TraceEntry<VmWorkload> = iterator.next()
+
+ override fun close() {}
+
+ private class SelectedVmFilter(val selectedVms: SortedSet<String>) : UserDefinedPredicate<Binary>(), Serializable {
+ override fun keep(value: Binary?): Boolean = value != null && selectedVms.contains(value.toStringUsingUTF8())
+
+ override fun canDrop(statistics: Statistics<Binary>): Boolean {
+ val min = statistics.min
+ val max = statistics.max
+
+ return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isEmpty()
+ }
+
+ override fun inverseCanDrop(statistics: Statistics<Binary>): Boolean {
+ val min = statistics.min
+ val max = statistics.max
+
+ return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isNotEmpty()
+ }
+ }
+
+ /**
+ * An unnamed user.
+ */
+ private object UnnamedUser : User {
+ override val name: String = "<unnamed>"
+ override val uid: UUID = UUID.randomUUID()
+ }
+
+ /**
+ * An entry in the trace.
+ */
+ private data class TraceEntryImpl(
+ override var submissionTime: Long,
+ override val workload: VmWorkload
+ ) : TraceEntry<VmWorkload>
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
index 552dcb63..19055ad3 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
@@ -44,7 +44,6 @@ import com.atlarge.opendc.core.failure.FailureDomain
import com.atlarge.opendc.core.failure.FaultInjector
import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader
-import com.atlarge.opendc.format.trace.sc20.Sc20TraceReader
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import com.xenomachina.argparser.ArgParser
@@ -241,7 +240,7 @@ fun main(args: Array<String>) {
var submitted = 0L
val finish = Channel<Unit>(Channel.RENDEZVOUS)
- val reader = Sc20TraceReader(File(traceDirectory), performanceInterferenceModel, getSelectedVmList(), Random(seed))
+ val reader = Sc20ParquetTraceReader(File(traceDirectory), performanceInterferenceModel, getSelectedVmList(), Random(seed))
while (reader.hasNext()) {
val (time, workload) = reader.next()
submitted++
@@ -267,8 +266,10 @@ fun main(args: Array<String>) {
}
finish.receive()
- scheduler.terminate()
failureDomain?.cancel()
+ launch {
+ scheduler.terminate()
+ }
println(simulationContext.clock.instant())
println("${System.currentTimeMillis() - start} milliseconds")
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt
new file mode 100644
index 00000000..7f429b89
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt
@@ -0,0 +1,190 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20
+
+import org.apache.avro.SchemaBuilder
+import org.apache.avro.generic.GenericData
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.parquet.hadoop.metadata.CompressionCodecName
+import java.io.BufferedReader
+import java.io.File
+import java.io.FileReader
+import kotlin.math.max
+import kotlin.math.min
+
+/**
+ *
+ * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
+ */
+fun main() {
+ val metaSchema = SchemaBuilder
+ .record("meta")
+ .namespace("com.atlarge.opendc.format.sc20")
+ .fields()
+ .name("id").type().stringType().noDefault()
+ .name("submissionTime").type().longType().noDefault()
+ .name("maxCores").type().intType().noDefault()
+ .name("requiredMemory").type().longType().noDefault()
+ .endRecord()
+ val schema = SchemaBuilder
+ .record("trace")
+ .namespace("com.atlarge.opendc.format.sc20")
+ .fields()
+ .name("id").type().stringType().noDefault()
+ .name("time").type().longType().noDefault()
+ .name("duration").type().longType().noDefault()
+ .name("cores").type().intType().noDefault()
+ .name("cpuUsage").type().doubleType().noDefault()
+ .name("flops").type().longType().noDefault()
+ .endRecord()
+
+ val timestampCol = 0
+ val cpuUsageCol = 1
+ val coreCol = 12
+ val vmIdCol = 19
+ val provisionedMemoryCol = 20
+ val traceInterval = 5 * 60 * 1000L
+
+ val dest = File("../traces/solvinity/small-parquet")
+ val traceDirectory = File("../traces/solvinity/small")
+ val vms =
+ traceDirectory.walk()
+ .filterNot { it.isDirectory }
+ .filter { it.extension == "csv" || it.extension == "txt" }
+ .toList()
+
+ val metaWriter = AvroParquetWriter.builder<GenericData.Record>(Path(dest.absolutePath, "meta.parquet"))
+ .withSchema(metaSchema)
+ .withCompressionCodec(CompressionCodecName.SNAPPY)
+ .withPageSize(4 * 1024 * 1024) // For compression
+ .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
+ .build()
+
+ val allFragments = mutableListOf<Fragment>()
+
+ vms
+ .forEachIndexed { idx, vmFile ->
+ println(vmFile)
+
+ var vmId = ""
+ var maxCores = -1
+ var requiredMemory = -1L
+ var timestamp = -1L
+ var cores = -1
+ var minTime = Long.MAX_VALUE
+
+ val flopsFragments = sequence {
+ var last: Fragment? = null
+
+ BufferedReader(FileReader(vmFile)).use { reader ->
+ reader.lineSequence()
+ .chunked(128)
+ .forEach { lines ->
+ for (line in lines) {
+ // Ignore comments in the trace
+ if (line.startsWith("#") || line.isBlank()) {
+ continue
+ }
+
+ val values = line.split(" ")
+
+ vmId = vmFile.name
+ timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L
+ cores = values[coreCol].trim().toInt()
+ requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong())
+ maxCores = max(maxCores, cores)
+ minTime = min(minTime, timestamp)
+ val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz
+ requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong())
+ maxCores = max(maxCores, cores)
+
+ val flops: Long = (cpuUsage * 5 * 60).toLong()
+
+ last = if (last != null && last!!.flops == 0L && flops == 0L) {
+ val oldFragment = last!!
+ Fragment(
+ vmId,
+ oldFragment.tick,
+ oldFragment.flops + flops,
+ oldFragment.duration + traceInterval,
+ cpuUsage,
+ cores
+ )
+ } else {
+ val fragment =
+ Fragment(vmId, timestamp, flops, traceInterval, cpuUsage, cores)
+ if (last != null) {
+ yield(last!!)
+ }
+ fragment
+ }
+ }
+ }
+ }
+
+ if (last != null) {
+ yield(last!!)
+ }
+ }
+
+ flopsFragments.forEach { fragment ->
+ allFragments.add(fragment)
+ }
+
+ val metaRecord = GenericData.Record(metaSchema)
+ metaRecord.put("id", vmId)
+ metaRecord.put("submissionTime", minTime)
+ metaRecord.put("maxCores", maxCores)
+ metaRecord.put("requiredMemory", requiredMemory)
+ metaWriter.write(metaRecord)
+ }
+
+ val writer = AvroParquetWriter.builder<GenericData.Record>(Path(dest.absolutePath, "trace.parquet"))
+ .withSchema(schema)
+ .withCompressionCodec(CompressionCodecName.SNAPPY)
+ .withPageSize(4 * 1024 * 1024) // For compression
+ .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
+ .build()
+
+ allFragments.sortWith(compareBy<Fragment> { it.tick }.thenBy { it.id })
+
+ for (fragment in allFragments) {
+ val record = GenericData.Record(schema)
+ record.put("id", fragment.id)
+ record.put("time", fragment.tick)
+ record.put("duration", fragment.duration)
+ record.put("cores", fragment.cores)
+ record.put("cpuUsage", fragment.usage)
+ record.put("flops", fragment.flops)
+
+ writer.write(record)
+ }
+
+ writer.close()
+ metaWriter.close()
+}
+
+data class Fragment(val id: String, val tick: Long, val flops: Long, val duration: Long, val usage: Double, val cores: Int)
diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt
index da678c07..2e2159ba 100644
--- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt
+++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt
@@ -115,9 +115,9 @@ class Sc20TraceReader(
BufferedReader(FileReader(vmFile)).use { reader ->
reader.lineSequence()
- .chunked(1024)
+ .chunked(128)
.forEach { lines ->
- val res = ArrayList<FlopsHistoryFragment>(lines.size)
+ // val res = ArrayList<FlopsHistoryFragment>(lines.size)
for (line in lines) {
// Ignore comments in the trace
if (line.startsWith("#") || line.isBlank()) {
@@ -144,13 +144,12 @@ class Sc20TraceReader(
val fragment =
FlopsHistoryFragment(timestamp, flops, traceInterval, cpuUsage, cores)
if (last != null) {
- res.add(last!!)
+ yield(last!!)
}
fragment
}
}
-
- yieldAll(res)
+ // yieldAll(res)
}
if (last != null) {
@@ -172,7 +171,7 @@ class Sc20TraceReader(
uuid,
vmId,
mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems),
- flopsFragments,
+ flopsFragments.asSequence(),
maxCores,
requiredMemory
)