summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-05-13 14:04:01 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-05-13 14:04:01 +0200
commit5300e0f1df51a2b41be0e76d7c2061315ffea467 (patch)
treeb8920d81ec9f84e8fecce270a78a96375d393601
parent5c2270d058c312c94ee0970560009e8008042d10 (diff)
refactor: Share trace across simulations
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt21
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20FilteringParquetTraceReader.kt94
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt159
3 files changed, 267 insertions, 7 deletions
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt
index a4b26ddb..a9ae7c6d 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt
@@ -27,7 +27,8 @@ package com.atlarge.opendc.experiments.sc20
import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
import com.atlarge.opendc.compute.core.workload.VmWorkload
import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporterProvider
-import com.atlarge.opendc.experiments.sc20.trace.Sc20StreamingParquetTraceReader
+import com.atlarge.opendc.experiments.sc20.trace.Sc20FilteringParquetTraceReader
+import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader
import com.atlarge.opendc.experiments.sc20.util.DatabaseHelper
import com.atlarge.opendc.format.environment.EnvironmentReader
import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
@@ -63,7 +64,7 @@ public class ExperimentRunner(
private val reporterProvider: ExperimentReporterProvider,
private val environmentPath: File,
private val tracePath: File,
- private val performanceInterferenceModel: PerformanceInterferenceModel
+ private val performanceInterferenceModel: PerformanceInterferenceModel?
) : Closeable {
/**
* The database helper to write the execution plan.
@@ -120,17 +121,23 @@ public class ExperimentRunner(
}
/**
+ * The raw parquet trace readers that are shared across simulations.
+ */
+ private val rawTraceReaders = mutableMapOf<String, Sc20RawParquetTraceReader>()
+
+ /**
* Create a trace reader for the specified trace.
*/
private fun createTraceReader(
name: String,
- performanceInterferenceModel: PerformanceInterferenceModel,
+ performanceInterferenceModel: PerformanceInterferenceModel?,
seed: Int
): TraceReader<VmWorkload> {
- return Sc20StreamingParquetTraceReader(
- File(tracePath, name),
+ val raw = rawTraceReaders.getOrPut(name) { Sc20RawParquetTraceReader(File(tracePath, name)) }
+ return Sc20FilteringParquetTraceReader(
+ raw,
performanceInterferenceModel,
- emptyList(),
+ emptySet(),
Random(seed)
)
}
@@ -167,7 +174,7 @@ public class ExperimentRunner(
val plan = createPlan()
val total = plan.size
val finished = AtomicInteger()
- val dispatcher = Executors.newWorkStealingPool().asCoroutineDispatcher()
+ val dispatcher = Executors.newWorkStealingPool(2).asCoroutineDispatcher()
runBlocking {
val mainDispatcher = coroutineContext[CoroutineDispatcher.Key]!!
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20FilteringParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20FilteringParquetTraceReader.kt
new file mode 100644
index 00000000..1e9950de
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20FilteringParquetTraceReader.kt
@@ -0,0 +1,94 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.trace
+
+import com.atlarge.opendc.compute.core.image.VmImage
+import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL
+import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
+import com.atlarge.opendc.compute.core.workload.VmWorkload
+import com.atlarge.opendc.format.trace.TraceEntry
+import com.atlarge.opendc.format.trace.TraceReader
+import kotlin.random.Random
+
+/**
+ * A [TraceReader] for the internal VM workload trace format that streams workloads on the fly.
+ *
+ * @param traceFile The directory of the traces.
+ * @param performanceInterferenceModel The performance model covering the workload in the VM trace.
+ */
+@OptIn(ExperimentalStdlibApi::class)
+class Sc20FilteringParquetTraceReader(
+ raw: Sc20RawParquetTraceReader,
+ performanceInterferenceModel: PerformanceInterferenceModel?,
+ selectedVms: Set<String>,
+ random: Random
+) : TraceReader<VmWorkload> {
+ /**
+ * The iterator over the actual trace.
+ */
+ private val iterator: Iterator<TraceEntry<VmWorkload>> =
+ raw.read()
+ .run {
+ // Apply VM selection filter
+ if (selectedVms.isEmpty())
+ this
+ else
+ filter { it.workload.image.name in selectedVms }
+ }
+ .run {
+ // Apply performance interference model
+ if (performanceInterferenceModel == null)
+ this
+ else
+ map { entry ->
+ val image = entry.workload.image
+ val id = image.name
+ val relevantPerformanceInterferenceModelItems =
+ PerformanceInterferenceModel(
+ performanceInterferenceModel.items.filter { id in it.workloadNames }.toSet(),
+ Random(random.nextInt())
+ )
+ val newImage =
+ VmImage(
+ image.uid,
+ image.name,
+ mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems),
+ image.flopsHistory,
+ image.maxCores,
+ image.requiredMemory
+ )
+ val newWorkload = entry.workload.copy(image = newImage)
+ Sc20RawParquetTraceReader.TraceEntryImpl(entry.submissionTime, newWorkload)
+ }
+ }
+ .iterator()
+
+
+ override fun hasNext(): Boolean = iterator.hasNext()
+
+ override fun next(): TraceEntry<VmWorkload> = iterator.next()
+
+ override fun close() {}
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt
new file mode 100644
index 00000000..632746a2
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt
@@ -0,0 +1,159 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.trace
+
+import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment
+import com.atlarge.opendc.compute.core.image.VmImage
+import com.atlarge.opendc.compute.core.workload.VmWorkload
+import com.atlarge.opendc.core.User
+import com.atlarge.opendc.format.trace.TraceEntry
+import com.atlarge.opendc.format.trace.TraceReader
+import mu.KotlinLogging
+import org.apache.avro.generic.GenericData
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.avro.AvroParquetReader
+import java.io.File
+import java.util.UUID
+
+private val logger = KotlinLogging.logger {}
+
+/**
+ * A [TraceReader] for the internal VM workload trace format.
+ *
+ * @param path The directory of the traces.
+ */
+@OptIn(ExperimentalStdlibApi::class)
+class Sc20RawParquetTraceReader(private val path: File) {
+ /**
+ * Read the fragments into memory.
+ */
+ private fun parseFragments(path: File): Map<String, List<FlopsHistoryFragment>> {
+ val reader = AvroParquetReader.builder<GenericData.Record>(Path(path.absolutePath, "trace.parquet"))
+ .disableCompatibility()
+ .build()
+
+ val fragments = mutableMapOf<String, MutableList<FlopsHistoryFragment>>()
+
+ return try {
+ while (true) {
+ val record = reader.read() ?: break
+
+ val id = record["id"].toString()
+ val tick = record["time"] as Long
+ val duration = record["duration"] as Long
+ val cores = record["cores"] as Int
+ val cpuUsage = record["cpuUsage"] as Double
+ val flops = record["flops"] as Long
+
+ val fragment = FlopsHistoryFragment(
+ tick,
+ flops,
+ duration,
+ cpuUsage,
+ cores
+ )
+
+ fragments.getOrPut(id) { mutableListOf() }.add(fragment)
+ }
+
+ fragments
+ } finally {
+ reader.close()
+ }
+ }
+
+ /**
+ * Read the metadata into a workload.
+ */
+ private fun parseMeta(path: File, fragments: Map<String, List<FlopsHistoryFragment>>): List<TraceEntryImpl> {
+ val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(path.absolutePath, "meta.parquet"))
+ .disableCompatibility()
+ .build()
+
+ var counter = 0
+ val entries = mutableListOf<TraceEntryImpl>()
+
+ return try {
+ while (true) {
+ val record = metaReader.read() ?: break
+ val id = record["id"].toString()
+ val submissionTime = record["submissionTime"] as Long
+ val maxCores = record["maxCores"] as Int
+ val requiredMemory = record["requiredMemory"] as Long
+ val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray())
+
+ val vmWorkload = VmWorkload(
+ uid, "VM Workload $id",
+ UnnamedUser,
+ VmImage(
+ uid,
+ id,
+ emptyMap(),
+ fragments.getValue(id).asSequence(),
+ maxCores,
+ requiredMemory
+ )
+ )
+
+ entries.add(TraceEntryImpl(submissionTime, vmWorkload))
+ }
+
+ entries
+ } finally {
+ metaReader.close()
+ }
+ }
+
+ /**
+ * The entries in the trace.
+ */
+ private val entries: Sequence<TraceEntryImpl>
+
+ init {
+ val fragments = parseFragments(path)
+ entries = parseMeta(path, fragments).asSequence()
+ }
+
+ /**
+ * Read the entries in the trace.
+ */
+ public fun read(): Sequence<TraceEntry<VmWorkload>> = entries
+
+ /**
+ * An unnamed user.
+ */
+ private object UnnamedUser : User {
+ override val name: String = "<unnamed>"
+ override val uid: UUID = UUID.randomUUID()
+ }
+
+ /**
+ * An entry in the trace.
+ */
+ internal data class TraceEntryImpl(
+ override var submissionTime: Long,
+ override val workload: VmWorkload
+ ) : TraceEntry<VmWorkload>
+}