summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGeorgios Andreadis <info@gandreadis.com>2020-05-21 15:40:14 +0200
committerGeorgios Andreadis <info@gandreadis.com>2020-05-21 15:40:14 +0200
commit5eb491bf7929bfc08b601727fe75f461b3b07a89 (patch)
tree391a8203d2e4972a1595d50035ec692e0bc50340
parent69186ef7f8b5eab10fb83e28e7c481126b3398c6 (diff)
Move filter to converter
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt17
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt30
2 files changed, 26 insertions, 21 deletions
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
index be40e3ca..5b208ff8 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
@@ -55,7 +55,6 @@ import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import mu.KotlinLogging
import java.io.File
-import java.util.TreeSet
import kotlin.math.ln
import kotlin.math.max
import kotlin.random.Random
@@ -211,26 +210,10 @@ suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtP
try {
var submitted = 0
val finished = Channel<Unit>(Channel.CONFLATED)
- val hypervisors = TreeSet(scheduler.drivers().map { (it as SimpleVirtDriver).server.name })
while (reader.hasNext()) {
val (time, workload) = reader.next()
- if (vmPlacements.isNotEmpty() && workload.name.contains(".txt")) {
- val vmId = workload.name.replace("VM Workload ", "")
- // Check if VM in topology
- val clusterName = vmPlacements[vmId]
- if (clusterName == null) {
- logger.warn { "Could not find placement data in VM placement file for VM $vmId" }
- continue
- }
- val machineInCluster = hypervisors.ceiling(clusterName)?.contains(clusterName) ?: false
- if (machineInCluster) {
- logger.info { "Ignored VM $vmId" }
- continue
- }
- }
-
submitted++
delay(max(0, time - simulationContext.clock.millis()))
domain.launch {
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt
index 32a188b5..652551c6 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt
@@ -24,6 +24,7 @@
package com.atlarge.opendc.experiments.sc20.trace
+import com.atlarge.opendc.format.trace.sc20.Sc20VmPlacementReader
import me.tongfei.progressbar.ProgressBar
import org.apache.avro.Schema
import org.apache.avro.SchemaBuilder
@@ -43,8 +44,8 @@ import kotlin.math.min
* A script to convert a trace in text format into a Parquet trace.
*/
fun main(args: Array<String>) {
- if (args.size < 3) {
- println("error: expected <OUTPUT> <INPUT> <TRACE-TYPE> [<SEED>]")
+ if (args.size < 4) {
+ println("error: expected <OUTPUT> <INPUT> <TRACE-TYPE> <SEED/CLUSTERS+MAPPING>")
return
}
@@ -98,7 +99,11 @@ fun main(args: Array<String>) {
val traceType = args[2]
val allFragments = if (traceType == "solvinity") {
- readSolvinityTrace(traceDirectory, metaSchema, metaWriter)
+ val clusters = args[3].split(",")
+ val vmPlacementFile = File(args[4])
+ val vmPlacements = Sc20VmPlacementReader(vmPlacementFile.inputStream().buffered()).construct()
+
+ readSolvinityTrace(traceDirectory, metaSchema, metaWriter, clusters, vmPlacements)
} else {
val seed = args[3].toLong()
readAzureTrace(traceDirectory, metaSchema, metaWriter, seed)
@@ -136,7 +141,9 @@ data class Fragment(
fun readSolvinityTrace(
traceDirectory: File,
metaSchema: Schema,
- metaWriter: ParquetWriter<GenericData.Record>
+ metaWriter: ParquetWriter<GenericData.Record>,
+ clusters: List<String>,
+ vmPlacements: Map<String, String>
): MutableList<Fragment> {
val timestampCol = 0
val cpuUsageCol = 1
@@ -161,6 +168,14 @@ fun readSolvinityTrace(
continue
}
+ val vmId = vmFile.name
+
+ // Check if VM in topology
+ val clusterName = vmPlacements[vmId]
+ if (clusterName == null || !clusters.contains(clusterName)) {
+ continue
+ }
+
val values = line.split(" ")
val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L
@@ -206,6 +221,13 @@ fun readSolvinityTrace(
val values = line.split(" ")
vmId = vmFile.name
+
+ // Check if VM in topology
+ val clusterName = vmPlacements[vmId]
+ if (clusterName == null || !clusters.contains(clusterName)) {
+ continue
+ }
+
val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - minTimestamp
cores = values[coreCol].trim().toInt()
requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong())