From 136c1b9ddc7bd9331d3552d681e9190fc6198271 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sat, 3 Oct 2020 17:41:59 +0200 Subject: Migrate codebase to opendc-simulator-compute This change updates the remainder of the codebase to use the opendc-simulator-compute module for the simulation of workloads. --- .../opendc-experiments-sc18/build.gradle.kts | 5 ++- .../opendc-experiments-sc20/build.gradle.kts | 1 + .../sc20/experiment/ExperimentHelpers.kt | 11 ++++--- .../sc20/trace/Sc20ParquetTraceReader.kt | 10 +++--- .../sc20/trace/Sc20RawParquetTraceReader.kt | 22 ++++++------- .../sc20/trace/Sc20StreamingParquetTraceReader.kt | 36 ++++++++++++---------- .../experiments/sc20/trace/WorkloadSampler.kt | 8 ++--- .../src/main/resources/log4j2.xml | 2 +- .../opendc/experiments/sc20/Sc20IntegrationTest.kt | 14 +++++---- 9 files changed, 57 insertions(+), 52 deletions(-) (limited to 'simulator/opendc-experiments') diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts b/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts index b12b1ae3..9cf72f18 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts +++ b/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts @@ -37,7 +37,10 @@ dependencies { implementation(project(":opendc-format")) implementation(project(":opendc-workflows")) implementation(project(":opendc-simulator:opendc-simulator-core")) - implementation(kotlin("stdlib")) + implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.9.8") { + exclude("org.jetbrains.kotlin", module = "kotlin-reflect") + } + implementation(kotlin("reflect")) testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}") testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:${Library.JUNIT_JUPITER}") diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/build.gradle.kts b/simulator/opendc-experiments/opendc-experiments-sc20/build.gradle.kts index 1a7c2f78..c0992fc9 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc20/build.gradle.kts +++ b/simulator/opendc-experiments/opendc-experiments-sc20/build.gradle.kts @@ -37,6 +37,7 @@ dependencies { api(project(":opendc-core")) implementation(project(":opendc-format")) implementation(project(":opendc-simulator:opendc-simulator-core")) + implementation(project(":opendc-simulator:opendc-simulator-compute")) implementation("com.github.ajalt:clikt:2.6.0") implementation("me.tongfei:progressbar:0.8.1") diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/ExperimentHelpers.kt index 97d7d5da..7b6c4880 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/ExperimentHelpers.kt +++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/ExperimentHelpers.kt @@ -40,7 +40,7 @@ import org.opendc.compute.metal.NODE_CLUSTER import org.opendc.compute.metal.driver.BareMetalDriver import org.opendc.compute.metal.service.ProvisioningService import org.opendc.compute.virt.HypervisorEvent -import org.opendc.compute.virt.driver.SimpleVirtDriver +import org.opendc.compute.virt.driver.SimVirtDriver import org.opendc.compute.virt.service.SimpleVirtProvisioningService import org.opendc.compute.virt.service.VirtProvisioningEvent import org.opendc.compute.virt.service.allocation.AllocationPolicy @@ -171,8 +171,9 @@ public suspend fun attachMonitor( // Monitor hypervisor events for (hypervisor in hypervisors) { // TODO Do not expose VirtDriver directly but use Hypervisor class. - monitor.reportHostStateChange(clock.millis(), hypervisor, (hypervisor as SimpleVirtDriver).server) - hypervisor.server.events + val server = (hypervisor as SimVirtDriver).server + monitor.reportHostStateChange(clock.millis(), hypervisor, server) + server.events .onEach { event -> val time = clock.millis() when (event) { @@ -242,8 +243,8 @@ public suspend fun processTrace( workload.image.name, workload.image, Flavor( - workload.image.maxCores, - workload.image.requiredMemory + workload.image.tags["cores"] as Int, + workload.image.tags["required-memory"] as Long ) ) // Monitor server events diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt index 5a336865..39b00a61 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt +++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt @@ -22,7 +22,7 @@ package org.opendc.experiments.sc20.trace -import org.opendc.compute.core.image.VmImage +import org.opendc.compute.core.image.SimWorkloadImage import org.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL import org.opendc.compute.core.workload.PerformanceInterferenceModel import org.opendc.compute.core.workload.VmWorkload @@ -73,13 +73,11 @@ public class Sc20ParquetTraceReader( performanceInterferenceModel[id] ?: PerformanceInterferenceModel(TreeSet()) val newImage = - VmImage( + SimWorkloadImage( image.uid, image.name, - mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), - image.flopsHistory, - image.maxCores, - image.requiredMemory + image.tags + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), + (image as SimWorkloadImage).workload ) val newWorkload = entry.workload.copy(image = newImage) Sc20RawParquetTraceReader.TraceEntryImpl(entry.submissionTime, newWorkload) diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt index 8965cf43..b40cad7c 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt +++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt @@ -26,12 +26,12 @@ import mu.KotlinLogging import org.apache.avro.generic.GenericData import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetReader -import org.opendc.compute.core.image.FlopsHistoryFragment -import org.opendc.compute.core.image.VmImage +import org.opendc.compute.core.image.SimWorkloadImage import org.opendc.compute.core.workload.VmWorkload import org.opendc.core.User import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader +import org.opendc.simulator.compute.workload.SimTraceWorkload import java.io.File import java.util.UUID @@ -47,12 +47,12 @@ public class Sc20RawParquetTraceReader(private val path: File) { /** * Read the fragments into memory. */ - private fun parseFragments(path: File): Map> { + private fun parseFragments(path: File): Map> { val reader = AvroParquetReader.builder(Path(path.absolutePath, "trace.parquet")) .disableCompatibility() .build() - val fragments = mutableMapOf>() + val fragments = mutableMapOf>() return try { while (true) { @@ -65,7 +65,7 @@ public class Sc20RawParquetTraceReader(private val path: File) { val cpuUsage = record["cpuUsage"] as Double val flops = record["flops"] as Long - val fragment = FlopsHistoryFragment( + val fragment = SimTraceWorkload.Fragment( tick, flops, duration, @@ -85,7 +85,7 @@ public class Sc20RawParquetTraceReader(private val path: File) { /** * Read the metadata into a workload. */ - private fun parseMeta(path: File, fragments: Map>): List { + private fun parseMeta(path: File, fragments: Map>): List { val metaReader = AvroParquetReader.builder(Path(path.absolutePath, "meta.parquet")) .disableCompatibility() .build() @@ -114,17 +114,17 @@ public class Sc20RawParquetTraceReader(private val path: File) { uid, id, UnnamedUser, - VmImage( + SimWorkloadImage( uid, id, mapOf( "submit-time" to submissionTime, "end-time" to endTime, - "total-load" to totalLoad + "total-load" to totalLoad, + "cores" to maxCores, + "required-memory" to requiredMemory ), - vmFragments, - maxCores, - requiredMemory + SimTraceWorkload(vmFragments) ) ) entries.add(TraceEntryImpl(submissionTime, vmWorkload)) diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt index 41dc4b49..2ec01606 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt +++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt @@ -31,14 +31,14 @@ import org.apache.parquet.filter2.predicate.FilterApi import org.apache.parquet.filter2.predicate.Statistics import org.apache.parquet.filter2.predicate.UserDefinedPredicate import org.apache.parquet.io.api.Binary -import org.opendc.compute.core.image.FlopsHistoryFragment -import org.opendc.compute.core.image.VmImage +import org.opendc.compute.core.image.SimWorkloadImage import org.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL import org.opendc.compute.core.workload.PerformanceInterferenceModel import org.opendc.compute.core.workload.VmWorkload import org.opendc.core.User import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader +import org.opendc.simulator.compute.workload.SimTraceWorkload import java.io.File import java.io.Serializable import java.util.SortedSet @@ -71,7 +71,7 @@ public class Sc20StreamingParquetTraceReader( /** * The intermediate buffer to store the read records in. */ - private val queue = ArrayBlockingQueue>(1024) + private val queue = ArrayBlockingQueue>(1024) /** * An optional filter for filtering the selected VMs @@ -92,7 +92,7 @@ public class Sc20StreamingParquetTraceReader( /** * A poisonous fragment. */ - private val poison = Pair("\u0000", FlopsHistoryFragment(0, 0, 0, 0.0, 0)) + private val poison = Pair("\u0000", SimTraceWorkload.Fragment(0, 0, 0, 0.0, 0)) /** * The thread to read the records in. @@ -119,7 +119,7 @@ public class Sc20StreamingParquetTraceReader( val cpuUsage = record["cpuUsage"] as Double val flops = record["flops"] as Long - val fragment = FlopsHistoryFragment( + val fragment = SimTraceWorkload.Fragment( tick, flops, duration, @@ -139,12 +139,12 @@ public class Sc20StreamingParquetTraceReader( /** * Fill the buffers with the VMs */ - private fun pull(buffers: Map>>) { + private fun pull(buffers: Map>>) { if (!hasNext) { return } - val fragments = mutableListOf>() + val fragments = mutableListOf>() queue.drainTo(fragments) for ((id, fragment) in fragments) { @@ -167,7 +167,7 @@ public class Sc20StreamingParquetTraceReader( init { val takenIds = mutableSetOf() val entries = mutableMapOf() - val buffers = mutableMapOf>>() + val buffers = mutableMapOf>>() val metaReader = AvroParquetReader.builder(Path(traceFile.absolutePath, "meta.parquet")) .disableCompatibility() @@ -200,10 +200,10 @@ public class Sc20StreamingParquetTraceReader( logger.info("Processing VM $id") - val internalBuffer = mutableListOf() - val externalBuffer = mutableListOf() + val internalBuffer = mutableListOf() + val externalBuffer = mutableListOf() buffers.getOrPut(id) { mutableListOf() }.add(externalBuffer) - val fragments = sequence { + val fragments = sequence { repeat@ while (true) { if (externalBuffer.isEmpty()) { if (hasNext) { @@ -220,7 +220,7 @@ public class Sc20StreamingParquetTraceReader( for (fragment in internalBuffer) { yield(fragment) - if (fragment.tick >= endTime) { + if (fragment.time >= endTime) { break@repeat } } @@ -239,13 +239,15 @@ public class Sc20StreamingParquetTraceReader( uid, "VM Workload $id", UnnamedUser, - VmImage( + SimWorkloadImage( uid, id, - mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), - fragments, - maxCores, - requiredMemory + mapOf( + IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems, + "cores" to maxCores, + "required-memory" to requiredMemory + ), + SimTraceWorkload(fragments), ) ) diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/WorkloadSampler.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/WorkloadSampler.kt index 19da97bb..e8cc2c36 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/WorkloadSampler.kt +++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/WorkloadSampler.kt @@ -23,7 +23,7 @@ package org.opendc.experiments.sc20.trace import mu.KotlinLogging -import org.opendc.compute.core.image.VmImage +import org.opendc.compute.core.image.SimWorkloadImage import org.opendc.compute.core.workload.VmWorkload import org.opendc.experiments.sc20.experiment.model.CompositeWorkload import org.opendc.experiments.sc20.experiment.model.SamplingStrategy @@ -197,13 +197,11 @@ public fun sampleHpcWorkload( */ private fun sample(entry: TraceEntry, i: Int): TraceEntry { val id = UUID.nameUUIDFromBytes("${entry.workload.image.uid}-$i".toByteArray()) - val image = VmImage( + val image = SimWorkloadImage( id, entry.workload.image.name, entry.workload.image.tags, - entry.workload.image.flopsHistory, - entry.workload.image.maxCores, - entry.workload.image.requiredMemory + (entry.workload.image as SimWorkloadImage).workload ) val vmWorkload = entry.workload.copy(uid = id, image = image, name = entry.workload.name) return VmTraceEntry(vmWorkload, entry.submissionTime) diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/resources/log4j2.xml b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/resources/log4j2.xml index 5ce99dfb..8029092e 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/resources/log4j2.xml +++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/resources/log4j2.xml @@ -30,7 +30,7 @@ - + diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt index 230e7f36..ff6e4bb9 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt +++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt @@ -142,12 +142,14 @@ class Sc20IntegrationTest { runSimulation() // Note that these values have been verified beforehand - assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") - assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") - assertEquals(207379117949, monitor.totalRequestedBurst) - assertEquals(203388071813, monitor.totalGrantedBurst) - assertEquals(3991046136, monitor.totalOvercommissionedBurst) - assertEquals(0, monitor.totalInterferedBurst) + assertAll( + { assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") }, + { assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") }, + { assertEquals(207379117949, monitor.totalRequestedBurst) }, + { assertEquals(203388071813, monitor.totalGrantedBurst) }, + { assertEquals(3991046136, monitor.totalOvercommissionedBurst) }, + { assertEquals(0, monitor.totalInterferedBurst) } + ) } @Test -- cgit v1.2.3