diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-10-03 17:41:59 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-10-04 16:35:21 +0200 |
| commit | 136c1b9ddc7bd9331d3552d681e9190fc6198271 (patch) | |
| tree | 19a04e730d93969542f726084444b9e7f13a7b8d /simulator/opendc-experiments/opendc-experiments-sc20/src | |
| parent | c8567a567348e13c341bf1a1ec64ed34ce25815a (diff) | |
Migrate codebase to opendc-simulator-compute
This change updates the remainder of the codebase to use the
opendc-simulator-compute module for the simulation of workloads.
Diffstat (limited to 'simulator/opendc-experiments/opendc-experiments-sc20/src')
7 files changed, 52 insertions, 51 deletions
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/ExperimentHelpers.kt index 97d7d5da..7b6c4880 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/ExperimentHelpers.kt +++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/ExperimentHelpers.kt @@ -40,7 +40,7 @@ import org.opendc.compute.metal.NODE_CLUSTER import org.opendc.compute.metal.driver.BareMetalDriver import org.opendc.compute.metal.service.ProvisioningService import org.opendc.compute.virt.HypervisorEvent -import org.opendc.compute.virt.driver.SimpleVirtDriver +import org.opendc.compute.virt.driver.SimVirtDriver import org.opendc.compute.virt.service.SimpleVirtProvisioningService import org.opendc.compute.virt.service.VirtProvisioningEvent import org.opendc.compute.virt.service.allocation.AllocationPolicy @@ -171,8 +171,9 @@ public suspend fun attachMonitor( // Monitor hypervisor events for (hypervisor in hypervisors) { // TODO Do not expose VirtDriver directly but use Hypervisor class. - monitor.reportHostStateChange(clock.millis(), hypervisor, (hypervisor as SimpleVirtDriver).server) - hypervisor.server.events + val server = (hypervisor as SimVirtDriver).server + monitor.reportHostStateChange(clock.millis(), hypervisor, server) + server.events .onEach { event -> val time = clock.millis() when (event) { @@ -242,8 +243,8 @@ public suspend fun processTrace( workload.image.name, workload.image, Flavor( - workload.image.maxCores, - workload.image.requiredMemory + workload.image.tags["cores"] as Int, + workload.image.tags["required-memory"] as Long ) ) // Monitor server events diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt index 5a336865..39b00a61 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt +++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt @@ -22,7 +22,7 @@ package org.opendc.experiments.sc20.trace -import org.opendc.compute.core.image.VmImage +import org.opendc.compute.core.image.SimWorkloadImage import org.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL import org.opendc.compute.core.workload.PerformanceInterferenceModel import org.opendc.compute.core.workload.VmWorkload @@ -73,13 +73,11 @@ public class Sc20ParquetTraceReader( performanceInterferenceModel[id] ?: PerformanceInterferenceModel(TreeSet()) val newImage = - VmImage( + SimWorkloadImage( image.uid, image.name, - mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), - image.flopsHistory, - image.maxCores, - image.requiredMemory + image.tags + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), + (image as SimWorkloadImage).workload ) val newWorkload = entry.workload.copy(image = newImage) Sc20RawParquetTraceReader.TraceEntryImpl(entry.submissionTime, newWorkload) diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt index 8965cf43..b40cad7c 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt +++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt @@ -26,12 +26,12 @@ import mu.KotlinLogging import org.apache.avro.generic.GenericData import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetReader -import org.opendc.compute.core.image.FlopsHistoryFragment -import org.opendc.compute.core.image.VmImage +import org.opendc.compute.core.image.SimWorkloadImage import org.opendc.compute.core.workload.VmWorkload import org.opendc.core.User import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader +import org.opendc.simulator.compute.workload.SimTraceWorkload import java.io.File import java.util.UUID @@ -47,12 +47,12 @@ public class Sc20RawParquetTraceReader(private val path: File) { /** * Read the fragments into memory. */ - private fun parseFragments(path: File): Map<String, List<FlopsHistoryFragment>> { + private fun parseFragments(path: File): Map<String, List<SimTraceWorkload.Fragment>> { val reader = AvroParquetReader.builder<GenericData.Record>(Path(path.absolutePath, "trace.parquet")) .disableCompatibility() .build() - val fragments = mutableMapOf<String, MutableList<FlopsHistoryFragment>>() + val fragments = mutableMapOf<String, MutableList<SimTraceWorkload.Fragment>>() return try { while (true) { @@ -65,7 +65,7 @@ public class Sc20RawParquetTraceReader(private val path: File) { val cpuUsage = record["cpuUsage"] as Double val flops = record["flops"] as Long - val fragment = FlopsHistoryFragment( + val fragment = SimTraceWorkload.Fragment( tick, flops, duration, @@ -85,7 +85,7 @@ public class Sc20RawParquetTraceReader(private val path: File) { /** * Read the metadata into a workload. */ - private fun parseMeta(path: File, fragments: Map<String, List<FlopsHistoryFragment>>): List<TraceEntryImpl> { + private fun parseMeta(path: File, fragments: Map<String, List<SimTraceWorkload.Fragment>>): List<TraceEntryImpl> { val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(path.absolutePath, "meta.parquet")) .disableCompatibility() .build() @@ -114,17 +114,17 @@ public class Sc20RawParquetTraceReader(private val path: File) { uid, id, UnnamedUser, - VmImage( + SimWorkloadImage( uid, id, mapOf( "submit-time" to submissionTime, "end-time" to endTime, - "total-load" to totalLoad + "total-load" to totalLoad, + "cores" to maxCores, + "required-memory" to requiredMemory ), - vmFragments, - maxCores, - requiredMemory + SimTraceWorkload(vmFragments) ) ) entries.add(TraceEntryImpl(submissionTime, vmWorkload)) diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt index 41dc4b49..2ec01606 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt +++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt @@ -31,14 +31,14 @@ import org.apache.parquet.filter2.predicate.FilterApi import org.apache.parquet.filter2.predicate.Statistics import org.apache.parquet.filter2.predicate.UserDefinedPredicate import org.apache.parquet.io.api.Binary -import org.opendc.compute.core.image.FlopsHistoryFragment -import org.opendc.compute.core.image.VmImage +import org.opendc.compute.core.image.SimWorkloadImage import org.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL import org.opendc.compute.core.workload.PerformanceInterferenceModel import org.opendc.compute.core.workload.VmWorkload import org.opendc.core.User import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader +import org.opendc.simulator.compute.workload.SimTraceWorkload import java.io.File import java.io.Serializable import java.util.SortedSet @@ -71,7 +71,7 @@ public class Sc20StreamingParquetTraceReader( /** * The intermediate buffer to store the read records in. */ - private val queue = ArrayBlockingQueue<Pair<String, FlopsHistoryFragment>>(1024) + private val queue = ArrayBlockingQueue<Pair<String, SimTraceWorkload.Fragment>>(1024) /** * An optional filter for filtering the selected VMs @@ -92,7 +92,7 @@ public class Sc20StreamingParquetTraceReader( /** * A poisonous fragment. */ - private val poison = Pair("\u0000", FlopsHistoryFragment(0, 0, 0, 0.0, 0)) + private val poison = Pair("\u0000", SimTraceWorkload.Fragment(0, 0, 0, 0.0, 0)) /** * The thread to read the records in. @@ -119,7 +119,7 @@ public class Sc20StreamingParquetTraceReader( val cpuUsage = record["cpuUsage"] as Double val flops = record["flops"] as Long - val fragment = FlopsHistoryFragment( + val fragment = SimTraceWorkload.Fragment( tick, flops, duration, @@ -139,12 +139,12 @@ public class Sc20StreamingParquetTraceReader( /** * Fill the buffers with the VMs */ - private fun pull(buffers: Map<String, List<MutableList<FlopsHistoryFragment>>>) { + private fun pull(buffers: Map<String, List<MutableList<SimTraceWorkload.Fragment>>>) { if (!hasNext) { return } - val fragments = mutableListOf<Pair<String, FlopsHistoryFragment>>() + val fragments = mutableListOf<Pair<String, SimTraceWorkload.Fragment>>() queue.drainTo(fragments) for ((id, fragment) in fragments) { @@ -167,7 +167,7 @@ public class Sc20StreamingParquetTraceReader( init { val takenIds = mutableSetOf<UUID>() val entries = mutableMapOf<String, GenericData.Record>() - val buffers = mutableMapOf<String, MutableList<MutableList<FlopsHistoryFragment>>>() + val buffers = mutableMapOf<String, MutableList<MutableList<SimTraceWorkload.Fragment>>>() val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "meta.parquet")) .disableCompatibility() @@ -200,10 +200,10 @@ public class Sc20StreamingParquetTraceReader( logger.info("Processing VM $id") - val internalBuffer = mutableListOf<FlopsHistoryFragment>() - val externalBuffer = mutableListOf<FlopsHistoryFragment>() + val internalBuffer = mutableListOf<SimTraceWorkload.Fragment>() + val externalBuffer = mutableListOf<SimTraceWorkload.Fragment>() buffers.getOrPut(id) { mutableListOf() }.add(externalBuffer) - val fragments = sequence<FlopsHistoryFragment> { + val fragments = sequence { repeat@ while (true) { if (externalBuffer.isEmpty()) { if (hasNext) { @@ -220,7 +220,7 @@ public class Sc20StreamingParquetTraceReader( for (fragment in internalBuffer) { yield(fragment) - if (fragment.tick >= endTime) { + if (fragment.time >= endTime) { break@repeat } } @@ -239,13 +239,15 @@ public class Sc20StreamingParquetTraceReader( uid, "VM Workload $id", UnnamedUser, - VmImage( + SimWorkloadImage( uid, id, - mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), - fragments, - maxCores, - requiredMemory + mapOf( + IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems, + "cores" to maxCores, + "required-memory" to requiredMemory + ), + SimTraceWorkload(fragments), ) ) diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/WorkloadSampler.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/WorkloadSampler.kt index 19da97bb..e8cc2c36 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/WorkloadSampler.kt +++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/WorkloadSampler.kt @@ -23,7 +23,7 @@ package org.opendc.experiments.sc20.trace import mu.KotlinLogging -import org.opendc.compute.core.image.VmImage +import org.opendc.compute.core.image.SimWorkloadImage import org.opendc.compute.core.workload.VmWorkload import org.opendc.experiments.sc20.experiment.model.CompositeWorkload import org.opendc.experiments.sc20.experiment.model.SamplingStrategy @@ -197,13 +197,11 @@ public fun sampleHpcWorkload( */ private fun sample(entry: TraceEntry<VmWorkload>, i: Int): TraceEntry<VmWorkload> { val id = UUID.nameUUIDFromBytes("${entry.workload.image.uid}-$i".toByteArray()) - val image = VmImage( + val image = SimWorkloadImage( id, entry.workload.image.name, entry.workload.image.tags, - entry.workload.image.flopsHistory, - entry.workload.image.maxCores, - entry.workload.image.requiredMemory + (entry.workload.image as SimWorkloadImage).workload ) val vmWorkload = entry.workload.copy(uid = id, image = image, name = entry.workload.name) return VmTraceEntry(vmWorkload, entry.submissionTime) diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/resources/log4j2.xml b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/resources/log4j2.xml index 5ce99dfb..8029092e 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/resources/log4j2.xml +++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/resources/log4j2.xml @@ -30,7 +30,7 @@ </Console> </Appenders> <Loggers> - <Logger name="org.opendc" level="warn" additivity="false"> + <Logger name="org.opendc" level="debug" additivity="false"> <AppenderRef ref="Console"/> </Logger> <Logger name="org.opendc.experiments.sc20" level="info" additivity="false"> diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt index 230e7f36..ff6e4bb9 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt +++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt @@ -142,12 +142,14 @@ class Sc20IntegrationTest { runSimulation() // Note that these values have been verified beforehand - assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") - assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") - assertEquals(207379117949, monitor.totalRequestedBurst) - assertEquals(203388071813, monitor.totalGrantedBurst) - assertEquals(3991046136, monitor.totalOvercommissionedBurst) - assertEquals(0, monitor.totalInterferedBurst) + assertAll( + { assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") }, + { assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") }, + { assertEquals(207379117949, monitor.totalRequestedBurst) }, + { assertEquals(203388071813, monitor.totalGrantedBurst) }, + { assertEquals(3991046136, monitor.totalOvercommissionedBurst) }, + { assertEquals(0, monitor.totalInterferedBurst) } + ) } @Test |
