From 5eb491bf7929bfc08b601727fe75f461b3b07a89 Mon Sep 17 00:00:00 2001 From: Georgios Andreadis Date: Thu, 21 May 2020 15:40:14 +0200 Subject: Move filter to converter --- .../sc20/experiment/ExperimentHelpers.kt | 17 ------------ .../experiments/sc20/trace/Sc20TraceConverter.kt | 30 +++++++++++++++++++--- 2 files changed, 26 insertions(+), 21 deletions(-) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt index be40e3ca..5b208ff8 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt @@ -55,7 +55,6 @@ import kotlinx.coroutines.launch import kotlinx.coroutines.withContext import mu.KotlinLogging import java.io.File -import java.util.TreeSet import kotlin.math.ln import kotlin.math.max import kotlin.random.Random @@ -211,26 +210,10 @@ suspend fun processTrace(reader: TraceReader, scheduler: SimpleVirtP try { var submitted = 0 val finished = Channel(Channel.CONFLATED) - val hypervisors = TreeSet(scheduler.drivers().map { (it as SimpleVirtDriver).server.name }) while (reader.hasNext()) { val (time, workload) = reader.next() - if (vmPlacements.isNotEmpty() && workload.name.contains(".txt")) { - val vmId = workload.name.replace("VM Workload ", "") - // Check if VM in topology - val clusterName = vmPlacements[vmId] - if (clusterName == null) { - logger.warn { "Could not find placement data in VM placement file for VM $vmId" } - continue - } - val machineInCluster = hypervisors.ceiling(clusterName)?.contains(clusterName) ?: false - if (machineInCluster) { - logger.info { "Ignored VM $vmId" } - continue - } - } - submitted++ delay(max(0, time - simulationContext.clock.millis())) domain.launch { diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt index 32a188b5..652551c6 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt @@ -24,6 +24,7 @@ package com.atlarge.opendc.experiments.sc20.trace +import com.atlarge.opendc.format.trace.sc20.Sc20VmPlacementReader import me.tongfei.progressbar.ProgressBar import org.apache.avro.Schema import org.apache.avro.SchemaBuilder @@ -43,8 +44,8 @@ import kotlin.math.min * A script to convert a trace in text format into a Parquet trace. */ fun main(args: Array) { - if (args.size < 3) { - println("error: expected []") + if (args.size < 4) { + println("error: expected ") return } @@ -98,7 +99,11 @@ fun main(args: Array) { val traceType = args[2] val allFragments = if (traceType == "solvinity") { - readSolvinityTrace(traceDirectory, metaSchema, metaWriter) + val clusters = args[3].split(",") + val vmPlacementFile = File(args[4]) + val vmPlacements = Sc20VmPlacementReader(vmPlacementFile.inputStream().buffered()).construct() + + readSolvinityTrace(traceDirectory, metaSchema, metaWriter, clusters, vmPlacements) } else { val seed = args[3].toLong() readAzureTrace(traceDirectory, metaSchema, metaWriter, seed) @@ -136,7 +141,9 @@ data class Fragment( fun readSolvinityTrace( traceDirectory: File, metaSchema: Schema, - metaWriter: ParquetWriter + metaWriter: ParquetWriter, + clusters: List, + vmPlacements: Map ): MutableList { val timestampCol = 0 val cpuUsageCol = 1 @@ -161,6 +168,14 @@ fun readSolvinityTrace( continue } + val vmId = vmFile.name + + // Check if VM in topology + val clusterName = vmPlacements[vmId] + if (clusterName == null || !clusters.contains(clusterName)) { + continue + } + val values = line.split(" ") val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L @@ -206,6 +221,13 @@ fun readSolvinityTrace( val values = line.split(" ") vmId = vmFile.name + + // Check if VM in topology + val clusterName = vmPlacements[vmId] + if (clusterName == null || !clusters.contains(clusterName)) { + continue + } + val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - minTimestamp cores = values[coreCol].trim().toInt() requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) -- cgit v1.2.3