summaryrefslogtreecommitdiff
path: root/opendc/opendc-experiments-sc20/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-05-23 16:24:05 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-05-23 16:24:05 +0200
commitebcacf96fbc1cd16a91523f95dd01db046fb7f90 (patch)
treee40d28015796c81dc80ef211a875cca4662a5642 /opendc/opendc-experiments-sc20/src
parent128ee618c81820f5d44f5d309a5b222bf0787acd (diff)
parente05aba274accc6006ffb2fb3fccc1d4399b4c4a0 (diff)
Merge branch '2.x-update-total-load' into '2.x'
Update total load to match filtered VM set See merge request opendc/opendc-simulator!71
Diffstat (limited to 'opendc/opendc-experiments-sc20/src')
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt19
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt12
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt55
3 files changed, 50 insertions, 36 deletions
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
index be40e3ca..a70297d2 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
@@ -55,7 +55,6 @@ import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import mu.KotlinLogging
import java.io.File
-import java.util.TreeSet
import kotlin.math.ln
import kotlin.math.max
import kotlin.random.Random
@@ -106,7 +105,7 @@ fun createFaultInjector(domain: Domain, random: Random, failureInterval: Double)
domain,
iatScale = ln(failureInterval), iatShape = 1.03, // Hours
sizeScale = ln(2.0), sizeShape = ln(1.0), // Expect 2 machines, with variation of 1
- dScale = 9.51, dShape = 3.21, // Minutes
+ dScale = ln(60.0), dShape = ln(60.0 * 8), // Minutes
random = random
)
}
@@ -211,26 +210,10 @@ suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtP
try {
var submitted = 0
val finished = Channel<Unit>(Channel.CONFLATED)
- val hypervisors = TreeSet(scheduler.drivers().map { (it as SimpleVirtDriver).server.name })
while (reader.hasNext()) {
val (time, workload) = reader.next()
- if (vmPlacements.isNotEmpty() && workload.name.contains(".txt")) {
- val vmId = workload.name.replace("VM Workload ", "")
- // Check if VM in topology
- val clusterName = vmPlacements[vmId]
- if (clusterName == null) {
- logger.warn { "Could not find placement data in VM placement file for VM $vmId" }
- continue
- }
- val machineInCluster = hypervisors.ceiling(clusterName)?.contains(clusterName) ?: false
- if (machineInCluster) {
- logger.info { "Ignored VM $vmId" }
- continue
- }
- }
-
submitted++
delay(max(0, time - simulationContext.clock.millis()))
domain.launch {
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt
index 89012036..4d8e2f1d 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt
@@ -84,7 +84,7 @@ public class MoreVelocityPortfolio(parent: Experiment, id: Int) : Portfolio(pare
}
public class CompositeWorkloadPortfolio(parent: Experiment, id: Int) : Portfolio(parent, id, "composite-workload") {
- private val totalSampleLoad = 3425709788935.9976
+ private val totalSampleLoad = 1.3301733005049648E12
override val topologies = listOf(
Topology("base"),
@@ -96,27 +96,27 @@ public class CompositeWorkloadPortfolio(parent: Experiment, id: Int) : Portfolio
override val workloads = listOf(
CompositeWorkload(
"all-azure",
- listOf(Workload("solvinity", 0.0), Workload("azure", 1.0)),
+ listOf(Workload("solvinity-short", 0.0), Workload("azure", 1.0)),
totalSampleLoad
),
CompositeWorkload(
"solvinity-25-azure-75",
- listOf(Workload("solvinity", 0.25), Workload("azure", 0.75)),
+ listOf(Workload("solvinity-short", 0.25), Workload("azure", 0.75)),
totalSampleLoad
),
CompositeWorkload(
"solvinity-50-azure-50",
- listOf(Workload("solvinity", 0.5), Workload("azure", 0.5)),
+ listOf(Workload("solvinity-short", 0.5), Workload("azure", 0.5)),
totalSampleLoad
),
CompositeWorkload(
"solvinity-75-azure-25",
- listOf(Workload("solvinity", 0.75), Workload("azure", 0.25)),
+ listOf(Workload("solvinity-short", 0.75), Workload("azure", 0.25)),
totalSampleLoad
),
CompositeWorkload(
"all-solvinity",
- listOf(Workload("solvinity", 1.0), Workload("azure", 0.0)),
+ listOf(Workload("solvinity-short", 1.0), Workload("azure", 0.0)),
totalSampleLoad
)
)
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt
index 32a188b5..0877ad52 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt
@@ -24,6 +24,7 @@
package com.atlarge.opendc.experiments.sc20.trace
+import com.atlarge.opendc.format.trace.sc20.Sc20VmPlacementReader
import me.tongfei.progressbar.ProgressBar
import org.apache.avro.Schema
import org.apache.avro.SchemaBuilder
@@ -43,8 +44,8 @@ import kotlin.math.min
* A script to convert a trace in text format into a Parquet trace.
*/
fun main(args: Array<String>) {
- if (args.size < 3) {
- println("error: expected <OUTPUT> <INPUT> <TRACE-TYPE> [<SEED>]")
+ if (args.size < 4) {
+ println("error: expected <OUTPUT> <INPUT> <TRACE-TYPE> <SEED/CLUSTERS+MAPPING>")
return
}
@@ -98,7 +99,11 @@ fun main(args: Array<String>) {
val traceType = args[2]
val allFragments = if (traceType == "solvinity") {
- readSolvinityTrace(traceDirectory, metaSchema, metaWriter)
+ val clusters = args[3].split(",")
+ val vmPlacementFile = File(args[4])
+ val vmPlacements = Sc20VmPlacementReader(vmPlacementFile.inputStream().buffered()).construct()
+
+ readSolvinityTrace(traceDirectory, metaSchema, metaWriter, clusters, vmPlacements)
} else {
val seed = args[3].toLong()
readAzureTrace(traceDirectory, metaSchema, metaWriter, seed)
@@ -136,7 +141,9 @@ data class Fragment(
fun readSolvinityTrace(
traceDirectory: File,
metaSchema: Schema,
- metaWriter: ParquetWriter<GenericData.Record>
+ metaWriter: ParquetWriter<GenericData.Record>,
+ clusters: List<String>,
+ vmPlacements: Map<String, String>
): MutableList<Fragment> {
val timestampCol = 0
val cpuUsageCol = 1
@@ -161,6 +168,14 @@ fun readSolvinityTrace(
continue
}
+ val vmId = vmFile.name
+
+ // Check if VM in topology
+ val clusterName = vmPlacements[vmId]
+ if (clusterName == null || !clusters.contains(clusterName)) {
+ continue
+ }
+
val values = line.split(" ")
val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L
@@ -177,6 +192,9 @@ fun readSolvinityTrace(
val allFragments = mutableListOf<Fragment>()
+ val begin = 15 * 24 * 60 * 60 * 1000L
+ val end = 45 * 24 * 60 * 60 * 1000L
+
traceDirectory.walk()
.filterNot { it.isDirectory }
.filter { it.extension == "csv" || it.extension == "txt" }
@@ -206,7 +224,18 @@ fun readSolvinityTrace(
val values = line.split(" ")
vmId = vmFile.name
+
+ // Check if VM in topology
+ val clusterName = vmPlacements[vmId]
+ if (clusterName == null || !clusters.contains(clusterName)) {
+ continue
+ }
+
val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - minTimestamp
+ if (begin > timestamp || timestamp > end) {
+ continue
+ }
+
cores = values[coreCol].trim().toInt()
requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong())
maxCores = max(maxCores, cores)
@@ -252,18 +281,20 @@ fun readSolvinityTrace(
}
var maxTime = Long.MIN_VALUE
- flopsFragments.forEach { fragment ->
+ flopsFragments.filter { it.tick in begin until end }.forEach { fragment ->
allFragments.add(fragment)
maxTime = max(maxTime, fragment.tick)
}
- val metaRecord = GenericData.Record(metaSchema)
- metaRecord.put("id", vmId)
- metaRecord.put("submissionTime", minTime)
- metaRecord.put("endTime", maxTime)
- metaRecord.put("maxCores", maxCores)
- metaRecord.put("requiredMemory", requiredMemory)
- metaWriter.write(metaRecord)
+ if (minTime in begin until end) {
+ val metaRecord = GenericData.Record(metaSchema)
+ metaRecord.put("id", vmId)
+ metaRecord.put("submissionTime", minTime)
+ metaRecord.put("endTime", maxTime)
+ metaRecord.put("maxCores", maxCores)
+ metaRecord.put("requiredMemory", requiredMemory)
+ metaWriter.write(metaRecord)
+ }
}
return allFragments