diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-05-23 16:24:05 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-05-23 16:24:05 +0200 |
| commit | ebcacf96fbc1cd16a91523f95dd01db046fb7f90 (patch) | |
| tree | e40d28015796c81dc80ef211a875cca4662a5642 /opendc | |
| parent | 128ee618c81820f5d44f5d309a5b222bf0787acd (diff) | |
| parent | e05aba274accc6006ffb2fb3fccc1d4399b4c4a0 (diff) | |
Merge branch '2.x-update-total-load' into '2.x'
Update total load to match filtered VM set
See merge request opendc/opendc-simulator!71
Diffstat (limited to 'opendc')
4 files changed, 52 insertions, 37 deletions
diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt index 2904fbec..50261db5 100644 --- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt @@ -31,6 +31,7 @@ import kotlinx.coroutines.delay import kotlinx.coroutines.ensureActive import kotlinx.coroutines.launch import kotlin.math.exp +import kotlin.math.max import kotlin.random.Random import kotlin.random.asJavaRandom @@ -107,7 +108,7 @@ public class CorrelatedFaultInjector( failureDomain.fail() } - val df = lognvariate(dScale, dShape) * 6e4 + val df = max(lognvariate(dScale, dShape) * 6e4, 15 * 6e4) // Handle long overflow if (simulationContext.clock.millis() + df <= 0) { diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt index be40e3ca..a70297d2 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt @@ -55,7 +55,6 @@ import kotlinx.coroutines.launch import kotlinx.coroutines.withContext import mu.KotlinLogging import java.io.File -import java.util.TreeSet import kotlin.math.ln import kotlin.math.max import kotlin.random.Random @@ -106,7 +105,7 @@ fun createFaultInjector(domain: Domain, random: Random, failureInterval: Double) domain, iatScale = ln(failureInterval), iatShape = 1.03, // Hours sizeScale = ln(2.0), sizeShape = ln(1.0), // Expect 2 machines, with variation of 1 - dScale = 9.51, dShape = 3.21, // Minutes + dScale = ln(60.0), dShape = ln(60.0 * 8), // Minutes random = random ) } @@ -211,26 +210,10 @@ suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtP try { var submitted = 0 val finished = Channel<Unit>(Channel.CONFLATED) - val hypervisors = TreeSet(scheduler.drivers().map { (it as SimpleVirtDriver).server.name }) while (reader.hasNext()) { val (time, workload) = reader.next() - if (vmPlacements.isNotEmpty() && workload.name.contains(".txt")) { - val vmId = workload.name.replace("VM Workload ", "") - // Check if VM in topology - val clusterName = vmPlacements[vmId] - if (clusterName == null) { - logger.warn { "Could not find placement data in VM placement file for VM $vmId" } - continue - } - val machineInCluster = hypervisors.ceiling(clusterName)?.contains(clusterName) ?: false - if (machineInCluster) { - logger.info { "Ignored VM $vmId" } - continue - } - } - submitted++ delay(max(0, time - simulationContext.clock.millis())) domain.launch { diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt index 89012036..4d8e2f1d 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt @@ -84,7 +84,7 @@ public class MoreVelocityPortfolio(parent: Experiment, id: Int) : Portfolio(pare } public class CompositeWorkloadPortfolio(parent: Experiment, id: Int) : Portfolio(parent, id, "composite-workload") { - private val totalSampleLoad = 3425709788935.9976 + private val totalSampleLoad = 1.3301733005049648E12 override val topologies = listOf( Topology("base"), @@ -96,27 +96,27 @@ public class CompositeWorkloadPortfolio(parent: Experiment, id: Int) : Portfolio override val workloads = listOf( CompositeWorkload( "all-azure", - listOf(Workload("solvinity", 0.0), Workload("azure", 1.0)), + listOf(Workload("solvinity-short", 0.0), Workload("azure", 1.0)), totalSampleLoad ), CompositeWorkload( "solvinity-25-azure-75", - listOf(Workload("solvinity", 0.25), Workload("azure", 0.75)), + listOf(Workload("solvinity-short", 0.25), Workload("azure", 0.75)), totalSampleLoad ), CompositeWorkload( "solvinity-50-azure-50", - listOf(Workload("solvinity", 0.5), Workload("azure", 0.5)), + listOf(Workload("solvinity-short", 0.5), Workload("azure", 0.5)), totalSampleLoad ), CompositeWorkload( "solvinity-75-azure-25", - listOf(Workload("solvinity", 0.75), Workload("azure", 0.25)), + listOf(Workload("solvinity-short", 0.75), Workload("azure", 0.25)), totalSampleLoad ), CompositeWorkload( "all-solvinity", - listOf(Workload("solvinity", 1.0), Workload("azure", 0.0)), + listOf(Workload("solvinity-short", 1.0), Workload("azure", 0.0)), totalSampleLoad ) ) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt index 32a188b5..0877ad52 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt @@ -24,6 +24,7 @@ package com.atlarge.opendc.experiments.sc20.trace +import com.atlarge.opendc.format.trace.sc20.Sc20VmPlacementReader import me.tongfei.progressbar.ProgressBar import org.apache.avro.Schema import org.apache.avro.SchemaBuilder @@ -43,8 +44,8 @@ import kotlin.math.min * A script to convert a trace in text format into a Parquet trace. */ fun main(args: Array<String>) { - if (args.size < 3) { - println("error: expected <OUTPUT> <INPUT> <TRACE-TYPE> [<SEED>]") + if (args.size < 4) { + println("error: expected <OUTPUT> <INPUT> <TRACE-TYPE> <SEED/CLUSTERS+MAPPING>") return } @@ -98,7 +99,11 @@ fun main(args: Array<String>) { val traceType = args[2] val allFragments = if (traceType == "solvinity") { - readSolvinityTrace(traceDirectory, metaSchema, metaWriter) + val clusters = args[3].split(",") + val vmPlacementFile = File(args[4]) + val vmPlacements = Sc20VmPlacementReader(vmPlacementFile.inputStream().buffered()).construct() + + readSolvinityTrace(traceDirectory, metaSchema, metaWriter, clusters, vmPlacements) } else { val seed = args[3].toLong() readAzureTrace(traceDirectory, metaSchema, metaWriter, seed) @@ -136,7 +141,9 @@ data class Fragment( fun readSolvinityTrace( traceDirectory: File, metaSchema: Schema, - metaWriter: ParquetWriter<GenericData.Record> + metaWriter: ParquetWriter<GenericData.Record>, + clusters: List<String>, + vmPlacements: Map<String, String> ): MutableList<Fragment> { val timestampCol = 0 val cpuUsageCol = 1 @@ -161,6 +168,14 @@ fun readSolvinityTrace( continue } + val vmId = vmFile.name + + // Check if VM in topology + val clusterName = vmPlacements[vmId] + if (clusterName == null || !clusters.contains(clusterName)) { + continue + } + val values = line.split(" ") val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L @@ -177,6 +192,9 @@ fun readSolvinityTrace( val allFragments = mutableListOf<Fragment>() + val begin = 15 * 24 * 60 * 60 * 1000L + val end = 45 * 24 * 60 * 60 * 1000L + traceDirectory.walk() .filterNot { it.isDirectory } .filter { it.extension == "csv" || it.extension == "txt" } @@ -206,7 +224,18 @@ fun readSolvinityTrace( val values = line.split(" ") vmId = vmFile.name + + // Check if VM in topology + val clusterName = vmPlacements[vmId] + if (clusterName == null || !clusters.contains(clusterName)) { + continue + } + val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - minTimestamp + if (begin > timestamp || timestamp > end) { + continue + } + cores = values[coreCol].trim().toInt() requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) maxCores = max(maxCores, cores) @@ -252,18 +281,20 @@ fun readSolvinityTrace( } var maxTime = Long.MIN_VALUE - flopsFragments.forEach { fragment -> + flopsFragments.filter { it.tick in begin until end }.forEach { fragment -> allFragments.add(fragment) maxTime = max(maxTime, fragment.tick) } - val metaRecord = GenericData.Record(metaSchema) - metaRecord.put("id", vmId) - metaRecord.put("submissionTime", minTime) - metaRecord.put("endTime", maxTime) - metaRecord.put("maxCores", maxCores) - metaRecord.put("requiredMemory", requiredMemory) - metaWriter.write(metaRecord) + if (minTime in begin until end) { + val metaRecord = GenericData.Record(metaSchema) + metaRecord.put("id", vmId) + metaRecord.put("submissionTime", minTime) + metaRecord.put("endTime", maxTime) + metaRecord.put("maxCores", maxCores) + metaRecord.put("requiredMemory", requiredMemory) + metaWriter.write(metaRecord) + } } return allFragments |
