diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-05-21 11:42:56 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-05-21 11:42:56 +0200 |
| commit | 128ee618c81820f5d44f5d309a5b222bf0787acd (patch) | |
| tree | a8c83723dae5223b2c5443f64587fcbc38c492b7 | |
| parent | 70ad01d793f88b1bef7d7988d24bff384ddbb3b9 (diff) | |
| parent | f6f685196d6579d9866d2a04c2c01a63e8c169d7 (diff) | |
Merge branch '2.x-azure' into '2.x'
Add Azure trace reader
See merge request opendc/opendc-simulator!68
14 files changed, 412 insertions, 78 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/ReplayAllocationPolicy.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/ReplayAllocationPolicy.kt index f88eaed8..59acfce2 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/ReplayAllocationPolicy.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/ReplayAllocationPolicy.kt @@ -2,9 +2,12 @@ package com.atlarge.opendc.compute.virt.service.allocation import com.atlarge.opendc.compute.virt.service.HypervisorView import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService +import mu.KotlinLogging + +private val logger = KotlinLogging.logger {} /** - * Policy replaying VM-cluster assignnment. + * Policy replaying VM-cluster assignment. * * Within each cluster, the active servers on each node determine which node gets * assigned the VM image. @@ -18,8 +21,14 @@ class ReplayAllocationPolicy(val vmPlacements: Map<String, String>) : Allocation val clusterName = vmPlacements[image.name] ?: throw IllegalStateException("Could not find placement data in VM placement file for VM ${image.name}") val machinesInCluster = hypervisors.filter { it.server.name.contains(clusterName) } + + if (machinesInCluster.isEmpty()) { + logger.info { "Could not find any machines belonging to cluster $clusterName for image ${image.name}, assigning randomly." } + return hypervisors.maxBy { it.availableMemory } + } + return machinesInCluster.maxBy { it.availableMemory } - ?: throw IllegalStateException("Cloud not find any machines belonging to cluster $clusterName for image ${image.name}") + ?: throw IllegalStateException("Cloud not find any machine and could not randomly assign") } } } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt index 14de52b8..677af381 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt @@ -24,12 +24,13 @@ package com.atlarge.opendc.experiments.sc20 +import com.atlarge.opendc.experiments.sc20.experiment.CompositeWorkloadPortfolio import com.atlarge.opendc.experiments.sc20.experiment.Experiment import com.atlarge.opendc.experiments.sc20.experiment.HorVerPortfolio -import com.atlarge.opendc.experiments.sc20.experiment.MoreHpcPortfolio import com.atlarge.opendc.experiments.sc20.experiment.MoreVelocityPortfolio import com.atlarge.opendc.experiments.sc20.experiment.OperationalPhenomenaPortfolio import com.atlarge.opendc.experiments.sc20.experiment.Portfolio +import com.atlarge.opendc.experiments.sc20.experiment.ReplayPortfolio import com.atlarge.opendc.experiments.sc20.experiment.TestPortfolio import com.atlarge.opendc.experiments.sc20.reporter.ConsoleExperimentReporter import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor @@ -96,10 +97,12 @@ class ExperimentCli : CliktCommand(name = "sc20-experiment") { */ private val portfolios by option("--portfolio") .choice( - "hor-ver" to { experiment: Experiment, i: Int -> HorVerPortfolio(experiment, i) } as (Experiment, Int) -> Portfolio, + "hor-ver" to { experiment: Experiment, i: Int -> HorVerPortfolio(experiment, i) } + as (Experiment, Int) -> Portfolio, "more-velocity" to { experiment, i -> MoreVelocityPortfolio(experiment, i) }, - "more-hpc" to { experiment, i -> MoreHpcPortfolio(experiment, i) }, + "composite-workload" to { experiment, i -> CompositeWorkloadPortfolio(experiment, i) }, "operational-phenomena" to { experiment, i -> OperationalPhenomenaPortfolio(experiment, i) }, + "replay" to { experiment, i -> ReplayPortfolio(experiment, i) }, "test" to { experiment, i -> TestPortfolio(experiment, i) }, ignoreCase = true ) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt index 9d2b0247..be40e3ca 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt @@ -216,7 +216,7 @@ suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtP while (reader.hasNext()) { val (time, workload) = reader.next() - if (vmPlacements.isNotEmpty()) { + if (vmPlacements.isNotEmpty() && workload.name.contains(".txt")) { val vmId = workload.name.replace("VM Workload ", "") // Check if VM in topology val clusterName = vmPlacements[vmId] diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt index 362144ae..89012036 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt @@ -24,6 +24,7 @@ package com.atlarge.opendc.experiments.sc20.experiment +import com.atlarge.opendc.experiments.sc20.experiment.model.CompositeWorkload import com.atlarge.opendc.experiments.sc20.experiment.model.OperationalPhenomena import com.atlarge.opendc.experiments.sc20.experiment.model.Topology import com.atlarge.opendc.experiments.sc20.experiment.model.Workload @@ -82,7 +83,9 @@ public class MoreVelocityPortfolio(parent: Experiment, id: Int) : Portfolio(pare ) } -public class MoreHpcPortfolio(parent: Experiment, id: Int) : Portfolio(parent, id, "more_hpc") { +public class CompositeWorkloadPortfolio(parent: Experiment, id: Int) : Portfolio(parent, id, "composite-workload") { + private val totalSampleLoad = 3425709788935.9976 + override val topologies = listOf( Topology("base"), Topology("exp-vol-hor-hom"), @@ -91,14 +94,35 @@ public class MoreHpcPortfolio(parent: Experiment, id: Int) : Portfolio(parent, i ) override val workloads = listOf( - Workload("solvinity", 0.1), - Workload("solvinity", 0.25), - Workload("solvinity", 0.5), - Workload("solvinity", 1.0) + CompositeWorkload( + "all-azure", + listOf(Workload("solvinity", 0.0), Workload("azure", 1.0)), + totalSampleLoad + ), + CompositeWorkload( + "solvinity-25-azure-75", + listOf(Workload("solvinity", 0.25), Workload("azure", 0.75)), + totalSampleLoad + ), + CompositeWorkload( + "solvinity-50-azure-50", + listOf(Workload("solvinity", 0.5), Workload("azure", 0.5)), + totalSampleLoad + ), + CompositeWorkload( + "solvinity-75-azure-25", + listOf(Workload("solvinity", 0.75), Workload("azure", 0.25)), + totalSampleLoad + ), + CompositeWorkload( + "all-solvinity", + listOf(Workload("solvinity", 1.0), Workload("azure", 0.0)), + totalSampleLoad + ) ) override val operationalPhenomenas = listOf( - OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true) + OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = false) ) override val allocationPolicies = listOf( @@ -136,6 +160,25 @@ public class OperationalPhenomenaPortfolio(parent: Experiment, id: Int) : Portfo ) } +public class ReplayPortfolio(parent: Experiment, id: Int) : Portfolio(parent, id, "replay") { + override val topologies = listOf( + Topology("base") + ) + + override val workloads = listOf( + Workload("solvinity", 1.0) + ) + + override val operationalPhenomenas = listOf( + OperationalPhenomena(failureFrequency = 0.0, hasInterference = false) + ) + + override val allocationPolicies = listOf( + "replay", + "active-servers" + ) +} + public class TestPortfolio(parent: Experiment, id: Int) : Portfolio(parent, id, "test") { override val repetitions: Int = 1 diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt index fd3e29c8..5d1c29e2 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt @@ -31,6 +31,7 @@ import com.atlarge.opendc.compute.virt.service.allocation.NumberOfActiveServersA import com.atlarge.opendc.compute.virt.service.allocation.ProvisionedCoresAllocationPolicy import com.atlarge.opendc.compute.virt.service.allocation.RandomAllocationPolicy import com.atlarge.opendc.compute.virt.service.allocation.ReplayAllocationPolicy +import com.atlarge.opendc.experiments.sc20.experiment.model.CompositeWorkload import com.atlarge.opendc.experiments.sc20.experiment.monitor.ParquetExperimentMonitor import com.atlarge.opendc.experiments.sc20.runner.TrialExperimentDescriptor import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionContext @@ -78,23 +79,32 @@ public data class Run(override val parent: Scenario, val id: Int, val seed: Int) "provisioned-cores" -> ProvisionedCoresAllocationPolicy() "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true) "random" -> RandomAllocationPolicy(Random(seeder.nextInt())) - "replay" -> ReplayAllocationPolicy(emptyMap()) + "replay" -> ReplayAllocationPolicy(experiment.vmPlacements) else -> throw IllegalArgumentException("Unknown policy ${parent.allocationPolicy}") } @Suppress("UNCHECKED_CAST") - val rawTraceReaders = context.cache.computeIfAbsent("raw-trace-readers") { mutableMapOf<String, Sc20RawParquetTraceReader>() } as MutableMap<String, Sc20RawParquetTraceReader> - val raw = synchronized(rawTraceReaders) { - val name = parent.workload.name - rawTraceReaders.computeIfAbsent(name) { - logger.info { "Loading trace $name" } - Sc20RawParquetTraceReader(File(experiment.traces, name)) + val rawTraceReaders = + context.cache.computeIfAbsent("raw-trace-readers") { mutableMapOf<String, Sc20RawParquetTraceReader>() } as MutableMap<String, Sc20RawParquetTraceReader> + val rawReaders = synchronized(rawTraceReaders) { + val workloadNames = if (parent.workload is CompositeWorkload) { + parent.workload.workloads.map { it.name } + } else { + listOf(parent.workload.name) + } + + workloadNames.map { workloadName -> + rawTraceReaders.computeIfAbsent(workloadName) { + logger.info { "Loading trace $workloadName" } + Sc20RawParquetTraceReader(File(experiment.traces, workloadName)) + } } } + val performanceInterferenceModel = experiment.performanceInterferenceModel ?.takeIf { parent.operationalPhenomena.hasInterference } ?.construct(seeder) ?: emptyMap() - val trace = Sc20ParquetTraceReader(raw, performanceInterferenceModel, parent.workload, seed) + val trace = Sc20ParquetTraceReader(rawReaders, performanceInterferenceModel, parent.workload, seed) val monitor = ParquetExperimentMonitor(this) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Workload.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Workload.kt index 2dbdf570..cc3c448a 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Workload.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Workload.kt @@ -27,4 +27,10 @@ package com.atlarge.opendc.experiments.sc20.experiment.model /** * A workload that is considered for a scenario. */ -public class Workload(val name: String, val fraction: Double) +public open class Workload(open val name: String, val fraction: Double) + +/** + * A workload that is composed of multiple workloads. + */ +public class CompositeWorkload(override val name: String, val workloads: List<Workload>, val totalLoad: Double) : + Workload(name, -1.0) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt index 7f71eb3e..be60e5b7 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt @@ -131,7 +131,8 @@ class ParquetExperimentMonitor(val run: Run) : ExperimentMonitor { interferedBurst, cpuUsage, cpuDemand, - lastPowerConsumption[hostServer] ?: 200.0 + lastPowerConsumption[hostServer] ?: 200.0, + hostServer.flavor.cpuCount ) currentHostEvent[hostServer] = event @@ -148,7 +149,8 @@ class ParquetExperimentMonitor(val run: Run) : ExperimentMonitor { interferedBurst, cpuUsage, cpuDemand, - lastPowerConsumption[hostServer] ?: 200.0 + lastPowerConsumption[hostServer] ?: 200.0, + hostServer.flavor.cpuCount ) currentHostEvent[hostServer] = event @@ -167,7 +169,8 @@ class ParquetExperimentMonitor(val run: Run) : ExperimentMonitor { interferedBurst, cpuUsage, cpuDemand, - lastPowerConsumption[hostServer] ?: 200.0 + lastPowerConsumption[hostServer] ?: 200.0, + hostServer.flavor.cpuCount ) currentHostEvent[hostServer] = event diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/HostEvent.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/HostEvent.kt index 8e91bca2..b9030172 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/HostEvent.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/HostEvent.kt @@ -40,5 +40,6 @@ data class HostEvent( val interferedBurst: Long, val cpuUsage: Double, val cpuDemand: Double, - val powerDraw: Double + val powerDraw: Double, + val cores: Int ) : Event("host-metrics") diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt index 523897b0..3bc09435 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt @@ -55,6 +55,7 @@ public class ParquetHostEventWriter(path: File, bufferSize: Int) : record.put("cpu_usage", event.cpuUsage) record.put("cpu_demand", event.cpuDemand) record.put("power_draw", event.powerDraw * (1.0 / 12)) + record.put("cores", event.cores) } val schema: Schema = SchemaBuilder @@ -76,6 +77,7 @@ public class ParquetHostEventWriter(path: File, bufferSize: Int) : .name("cpu_usage").type().doubleType().noDefault() .name("cpu_demand").type().doubleType().noDefault() .name("power_draw").type().doubleType().noDefault() + .name("cores").type().intType().noDefault() .endRecord() } } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt index ad50bf18..06bececf 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt @@ -28,6 +28,7 @@ import com.atlarge.opendc.compute.core.image.VmImage import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel import com.atlarge.opendc.compute.core.workload.VmWorkload +import com.atlarge.opendc.experiments.sc20.experiment.model.CompositeWorkload import com.atlarge.opendc.experiments.sc20.experiment.model.Workload import com.atlarge.opendc.format.trace.TraceEntry import com.atlarge.opendc.format.trace.TraceReader @@ -42,7 +43,7 @@ import java.util.TreeSet */ @OptIn(ExperimentalStdlibApi::class) class Sc20ParquetTraceReader( - raw: Sc20RawParquetTraceReader, + rawReaders: List<Sc20RawParquetTraceReader>, performanceInterferenceModel: Map<String, PerformanceInterferenceModel>, workload: Workload, seed: Int @@ -51,8 +52,17 @@ class Sc20ParquetTraceReader( * The iterator over the actual trace. */ private val iterator: Iterator<TraceEntry<VmWorkload>> = - raw.read() - .run { sampleWorkload(this, workload, seed) } + rawReaders + .map { it.read() } + .run { + if (workload is CompositeWorkload) { + this.zip(workload.workloads) + } else { + this.zip(listOf(workload)) + } + } + .map { sampleWorkload(it.first, workload, it.second, seed) } + .flatten() .run { // Apply performance interference model if (performanceInterferenceModel.isEmpty()) @@ -62,7 +72,7 @@ class Sc20ParquetTraceReader( val image = entry.workload.image val id = image.name val relevantPerformanceInterferenceModelItems = - performanceInterferenceModel[id] ?: PerformanceInterferenceModel(TreeSet()) + performanceInterferenceModel[id] ?: PerformanceInterferenceModel(TreeSet()) val newImage = VmImage( diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt index 3b480d33..652f7746 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt @@ -98,15 +98,18 @@ class Sc20RawParquetTraceReader(private val path: File) { return try { while (true) { val record = metaReader.read() ?: break + val id = record["id"].toString() + if (!fragments.containsKey(id)) { + continue + } + val submissionTime = record["submissionTime"] as Long val endTime = record["endTime"] as Long val maxCores = record["maxCores"] as Int val requiredMemory = record["requiredMemory"] as Long val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray()) - logger.info { "VM $id" } - val vmFragments = fragments.getValue(id).asSequence() val totalLoad = vmFragments.sumByDouble { it.usage } * 5 * 60 // avg MHz * duration = MFLOPs val vmWorkload = VmWorkload( @@ -129,6 +132,9 @@ class Sc20RawParquetTraceReader(private val path: File) { } entries + } catch (e: Exception) { + e.printStackTrace() + throw e } finally { metaReader.close() } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt index 04cdd302..32a188b5 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt @@ -24,14 +24,18 @@ package com.atlarge.opendc.experiments.sc20.trace +import me.tongfei.progressbar.ProgressBar +import org.apache.avro.Schema import org.apache.avro.SchemaBuilder import org.apache.avro.generic.GenericData import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName import java.io.BufferedReader import java.io.File import java.io.FileReader +import java.util.Random import kotlin.math.max import kotlin.math.min @@ -39,8 +43,8 @@ import kotlin.math.min * A script to convert a trace in text format into a Parquet trace. */ fun main(args: Array<String>) { - if (args.size < 2) { - println("error: expected <INPUT> <OUTPUT>") + if (args.size < 3) { + println("error: expected <OUTPUT> <INPUT> <TRACE-TYPE> [<SEED>]") return } @@ -66,31 +70,117 @@ fun main(args: Array<String>) { .name("flops").type().longType().noDefault() .endRecord() - val timestampCol = 0 - val cpuUsageCol = 1 - val coreCol = 12 - val vmIdCol = 19 - val provisionedMemoryCol = 20 - val traceInterval = 5 * 60 * 1000L - val dest = File(args[0]) val traceDirectory = File(args[1]) - val vms = - traceDirectory.walk() - .filterNot { it.isDirectory } - .filter { it.extension == "csv" || it.extension == "txt" } - .toList() + val metaParquet = File(dest.absolutePath, "meta.parquet") + val traceParquet = File(dest.absolutePath, "trace.parquet") + + if (metaParquet.exists()) { + metaParquet.delete() + } + if (traceParquet.exists()) { + traceParquet.delete() + } - val metaWriter = AvroParquetWriter.builder<GenericData.Record>(Path(dest.absolutePath, "meta.parquet")) + val metaWriter = AvroParquetWriter.builder<GenericData.Record>(Path(metaParquet.toURI())) .withSchema(metaSchema) .withCompressionCodec(CompressionCodecName.SNAPPY) .withPageSize(4 * 1024 * 1024) // For compression .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) .build() + val writer = AvroParquetWriter.builder<GenericData.Record>(Path(traceParquet.toURI())) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withPageSize(4 * 1024 * 1024) // For compression + .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) + .build() + + val traceType = args[2] + val allFragments = if (traceType == "solvinity") { + readSolvinityTrace(traceDirectory, metaSchema, metaWriter) + } else { + val seed = args[3].toLong() + readAzureTrace(traceDirectory, metaSchema, metaWriter, seed) + } + allFragments.sortWith(compareBy<Fragment> { it.tick }.thenBy { it.id }) + + for (fragment in allFragments) { + val record = GenericData.Record(schema) + record.put("id", fragment.id) + record.put("time", fragment.tick) + record.put("duration", fragment.duration) + record.put("cores", fragment.cores) + record.put("cpuUsage", fragment.usage) + record.put("flops", fragment.flops) + + writer.write(record) + } + + writer.close() + metaWriter.close() +} + +data class Fragment( + val id: String, + val tick: Long, + val flops: Long, + val duration: Long, + val usage: Double, + val cores: Int +) + +/** + * Reads the confidential Solvinity trace. + */ +fun readSolvinityTrace( + traceDirectory: File, + metaSchema: Schema, + metaWriter: ParquetWriter<GenericData.Record> +): MutableList<Fragment> { + val timestampCol = 0 + val cpuUsageCol = 1 + val coreCol = 12 + val provisionedMemoryCol = 20 + val traceInterval = 5 * 60 * 1000L + + // Identify start time of the entire trace + var minTimestamp = Long.MAX_VALUE + traceDirectory.walk() + .filterNot { it.isDirectory } + .filter { it.extension == "csv" || it.extension == "txt" } + .toList() + .forEach { vmFile -> + BufferedReader(FileReader(vmFile)).use { reader -> + reader.lineSequence() + .chunked(128) + .forEachIndexed { idx, lines -> + for (line in lines) { + // Ignore comments in the trace + if (line.startsWith("#") || line.isBlank()) { + continue + } + + val values = line.split(" ") + val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L + + if (timestamp < minTimestamp) { + minTimestamp = timestamp + } + return@forEach + } + } + } + } + + println("Start of trace at $minTimestamp") + val allFragments = mutableListOf<Fragment>() - vms + traceDirectory.walk() + .filterNot { it.isDirectory } + .filter { it.extension == "csv" || it.extension == "txt" } + .toList() .forEachIndexed { idx, vmFile -> println(vmFile) @@ -116,7 +206,7 @@ fun main(args: Array<String>) { val values = line.split(" ") vmId = vmFile.name - val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L + val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - minTimestamp cores = values[coreCol].trim().toInt() requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) maxCores = max(maxCores, cores) @@ -176,29 +266,164 @@ fun main(args: Array<String>) { metaWriter.write(metaRecord) } - val writer = AvroParquetWriter.builder<GenericData.Record>(Path(dest.absolutePath, "trace.parquet")) - .withSchema(schema) - .withCompressionCodec(CompressionCodecName.SNAPPY) - .withPageSize(4 * 1024 * 1024) // For compression - .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) - .build() + return allFragments +} - allFragments.sortWith(compareBy<Fragment> { it.tick }.thenBy { it.id }) +/** + * Reads the Azure cloud trace. + * + * See https://github.com/Azure/AzurePublicDataset/ for a definition of the trace. + */ +fun readAzureTrace( + traceDirectory: File, + metaSchema: Schema, + metaWriter: ParquetWriter<GenericData.Record>, + seed: Long +): MutableList<Fragment> { + val random = Random(seed) + val fraction = 0.01 - for (fragment in allFragments) { - val record = GenericData.Record(schema) - record.put("id", fragment.id) - record.put("time", fragment.tick) - record.put("duration", fragment.duration) - record.put("cores", fragment.cores) - record.put("cpuUsage", fragment.usage) - record.put("flops", fragment.flops) + // Read VM table + val vmIdTableCol = 0 + val coreTableCol = 9 + val provisionedMemoryTableCol = 10 - writer.write(record) + var vmId: String + var cores: Int + var requiredMemory: Long + + val vmIds = mutableSetOf<String>() + val vmIdToMetadata = mutableMapOf<String, VmInfo>() + + BufferedReader(FileReader(File(traceDirectory, "vmtable.csv"))).use { reader -> + reader.lineSequence() + .chunked(1024) + .forEach { lines -> + for (line in lines) { + // Ignore comments in the trace + if (line.startsWith("#") || line.isBlank()) { + continue + } + // Sample only a fraction of the VMs + if (random.nextDouble() > fraction) { + continue + } + + val values = line.split(",") + + // Exclude VMs with a large number of cores (not specified exactly) + if (values[coreTableCol].contains(">")) { + continue + } + + vmId = values[vmIdTableCol].trim() + cores = values[coreTableCol].trim().toInt() + requiredMemory = values[provisionedMemoryTableCol].trim().toInt() * 1_000L // GB -> MB + + vmIds.add(vmId) + vmIdToMetadata[vmId] = VmInfo(cores, requiredMemory, Long.MAX_VALUE, -1L) + } + } } - writer.close() - metaWriter.close() + // Read VM metric reading files + val timestampCol = 0 + val vmIdCol = 1 + val cpuUsageCol = 4 + val traceInterval = 5 * 60 * 1000L + + val vmIdToFragments = mutableMapOf<String, MutableList<Fragment>>() + val vmIdToLastFragment = mutableMapOf<String, Fragment?>() + val allFragments = mutableListOf<Fragment>() + + for (i in ProgressBar.wrap((1..195).toList(), "Reading Trace")) { + val readingsFile = File(File(traceDirectory, "readings"), "readings-$i.csv") + var timestamp: Long + var cpuUsage: Double + + BufferedReader(FileReader(readingsFile)).use { reader -> + reader.lineSequence() + .chunked(128) + .forEach { lines -> + for (line in lines) { + // Ignore comments in the trace + if (line.startsWith("#") || line.isBlank()) { + continue + } + + val values = line.split(",") + vmId = values[vmIdCol].trim() + + // Ignore readings for VMs not in the sample + if (!vmIds.contains(vmId)) { + continue + } + + timestamp = values[timestampCol].trim().toLong() * 1000L + vmIdToMetadata[vmId]!!.minTime = min(vmIdToMetadata[vmId]!!.minTime, timestamp) + cpuUsage = values[cpuUsageCol].trim().toDouble() * 3_000 // MHz + vmIdToMetadata[vmId]!!.maxTime = max(vmIdToMetadata[vmId]!!.maxTime, timestamp) + + val flops: Long = (cpuUsage * 5 * 60).toLong() + val lastFragment = vmIdToLastFragment[vmId] + + vmIdToLastFragment[vmId] = + if (lastFragment != null && lastFragment.flops == 0L && flops == 0L) { + Fragment( + vmId, + lastFragment.tick, + lastFragment.flops + flops, + lastFragment.duration + traceInterval, + cpuUsage, + vmIdToMetadata[vmId]!!.cores + ) + } else { + val fragment = + Fragment( + vmId, + timestamp, + flops, + traceInterval, + cpuUsage, + vmIdToMetadata[vmId]!!.cores + ) + if (lastFragment != null) { + if (vmIdToFragments[vmId] == null) { + vmIdToFragments[vmId] = mutableListOf() + } + vmIdToFragments[vmId]!!.add(lastFragment) + allFragments.add(lastFragment) + } + fragment + } + } + } + } + } + + for (entry in vmIdToLastFragment) { + if (entry.value != null) { + if (vmIdToFragments[entry.key] == null) { + vmIdToFragments[entry.key] = mutableListOf() + } + vmIdToFragments[entry.key]!!.add(entry.value!!) + } + } + + println("Read ${vmIdToLastFragment.size} VMs") + + for (entry in vmIdToMetadata) { + val metaRecord = GenericData.Record(metaSchema) + metaRecord.put("id", entry.key) + metaRecord.put("submissionTime", entry.value.minTime) + metaRecord.put("endTime", entry.value.maxTime) + println("${entry.value.minTime} - ${entry.value.maxTime}") + metaRecord.put("maxCores", entry.value.cores) + metaRecord.put("requiredMemory", entry.value.requiredMemory) + metaWriter.write(metaRecord) + } + + return allFragments } -data class Fragment(val id: String, val tick: Long, val flops: Long, val duration: Long, val usage: Double, val cores: Int) +class VmInfo(val cores: Int, val requiredMemory: Long, var minTime: Long, var maxTime: Long) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt index a8580686..dd70d4f1 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt @@ -25,6 +25,7 @@ package com.atlarge.opendc.experiments.sc20.trace import com.atlarge.opendc.compute.core.workload.VmWorkload +import com.atlarge.opendc.experiments.sc20.experiment.model.CompositeWorkload import com.atlarge.opendc.experiments.sc20.experiment.model.Workload import com.atlarge.opendc.format.trace.TraceEntry import mu.KotlinLogging @@ -35,22 +36,37 @@ private val logger = KotlinLogging.logger {} /** * Sample the workload for the specified [run]. */ -fun sampleWorkload(trace: List<TraceEntry<VmWorkload>>, workload: Workload, seed: Int): List<TraceEntry<VmWorkload>> { - return sampleRegularWorkload(trace, workload, seed) +fun sampleWorkload( + trace: List<TraceEntry<VmWorkload>>, + workload: Workload, + subWorkload: Workload, + seed: Int +): List<TraceEntry<VmWorkload>> { + return if (workload is CompositeWorkload) { + sampleRegularWorkload(trace, workload, subWorkload, seed) + } else { + sampleRegularWorkload(trace, workload, workload, seed) + } } /** * Sample a regular (non-HPC) workload. */ -fun sampleRegularWorkload(trace: List<TraceEntry<VmWorkload>>, workload: Workload, seed: Int): List<TraceEntry<VmWorkload>> { - val fraction = workload.fraction - if (fraction >= 1) { - return trace - } +fun sampleRegularWorkload( + trace: List<TraceEntry<VmWorkload>>, + workload: Workload, + subWorkload: Workload, + seed: Int +): List<TraceEntry<VmWorkload>> { + val fraction = subWorkload.fraction val shuffled = trace.shuffled(Random(seed)) val res = mutableListOf<TraceEntry<VmWorkload>>() - val totalLoad = shuffled.sumByDouble { it.workload.image.tags.getValue("total-load") as Double } + val totalLoad = if (workload is CompositeWorkload) { + workload.totalLoad + } else { + shuffled.sumByDouble { it.workload.image.tags.getValue("total-load") as Double } + } var currentLoad = 0.0 for (entry in shuffled) { diff --git a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt index 68c2cbc5..5ecf7605 100644 --- a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt +++ b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt @@ -145,8 +145,8 @@ class Sc20IntegrationTest { assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") assertEquals(207379117949, monitor.totalRequestedBurst) - assertEquals(207102919834, monitor.totalGrantedBurst) - assertEquals(276198896, monitor.totalOvercommissionedBurst) + assertEquals(203388071813, monitor.totalGrantedBurst) + assertEquals(3991046136, monitor.totalOvercommissionedBurst) assertEquals(0, monitor.totalInterferedBurst) } @@ -204,7 +204,7 @@ class Sc20IntegrationTest { */ private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<VmWorkload> { return Sc20ParquetTraceReader( - Sc20RawParquetTraceReader(File("src/test/resources/trace")), + listOf(Sc20RawParquetTraceReader(File("src/test/resources/trace"))), emptyMap(), Workload("test", fraction), seed |
