diff options
| author | Georgios Andreadis <g.andreadis@student.tudelft.nl> | 2020-05-20 15:59:54 +0200 |
|---|---|---|
| committer | Georgios Andreadis <g.andreadis@student.tudelft.nl> | 2020-05-20 15:59:54 +0200 |
| commit | 70ad01d793f88b1bef7d7988d24bff384ddbb3b9 (patch) | |
| tree | 10b4d6053d1cd58e921f71ff7b0d6f0cf7bab75a /opendc/opendc-experiments-sc20/src | |
| parent | ee494d6ce6f817cf4e9ab0dba0d9f9f1987c0029 (diff) | |
| parent | 21eafd32c45495ab9e8ebbeffbdbe1d43ffe566b (diff) | |
Merge branch 'perf/batch-slices' into '2.x'
Batch VM slices
See merge request opendc/opendc-simulator!70
Diffstat (limited to 'opendc/opendc-experiments-sc20/src')
4 files changed, 151 insertions, 48 deletions
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt index 83952d43..9d2b0247 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt @@ -172,7 +172,7 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Exp .onEach { event -> when (event) { is HypervisorEvent.SliceFinished -> monitor.reportHostSlice( - simulationContext.clock.millis(), + clock.millis(), event.requestedBurst, event.grantedBurst, event.overcommissionedBurst, @@ -242,10 +242,8 @@ suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtP // Monitor server events server.events .onEach { - val time = simulationContext.clock.millis() - if (it is ServerEvent.StateChanged) { - monitor.reportVmStateChange(time, it.server) + monitor.reportVmStateChange(simulationContext.clock.millis(), it.server) } delay(1) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt index 138905a4..7f71eb3e 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt @@ -25,7 +25,6 @@ package com.atlarge.opendc.experiments.sc20.experiment.monitor import com.atlarge.opendc.compute.core.Server -import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.virt.driver.VirtDriver import com.atlarge.opendc.compute.virt.service.VirtProvisioningEvent import com.atlarge.opendc.experiments.sc20.experiment.Run @@ -54,9 +53,17 @@ class ParquetExperimentMonitor(val run: Run) : ExperimentMonitor { File(run.parent.parent.parent.output, "provisioner-metrics/$partition/data.parquet"), run.parent.parent.parent.bufferSize ) - private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>() + private val currentHostEvent = mutableMapOf<Server, HostEvent>() + private var startTime = -1L - override fun reportVmStateChange(time: Long, server: Server) {} + override fun reportVmStateChange(time: Long, server: Server) { + if (startTime < 0) { + startTime = time + + // Update timestamp of initial event + currentHostEvent.replaceAll { k, v -> v.copy(timestamp = startTime) } + } + } override fun reportHostStateChange( time: Long, @@ -65,27 +72,31 @@ class ParquetExperimentMonitor(val run: Run) : ExperimentMonitor { ) { logger.debug { "Host ${server.uid} changed state ${server.state} [$time]" } - val lastServerState = lastServerStates[server] - if (server.state == ServerState.SHUTOFF && lastServerState != null) { - val duration = time - lastServerState.second - reportHostSlice( - time, - 0, - 0, - 0, - 0, - 0.0, - 0.0, - 0, - server, - duration - ) + val previousEvent = currentHostEvent[server] - lastServerStates.remove(server) - lastPowerConsumption.remove(server) - } else { - lastServerStates[server] = Pair(server.state, time) - } + val roundedTime = previousEvent?.let { + val duration = time - it.timestamp + val k = 5 * 60 * 1000L // 5 min in ms + val rem = duration % k + + if (rem == 0L) { + time + } else { + it.timestamp + duration + k - rem + } + } ?: time + + reportHostSlice( + roundedTime, + 0, + 0, + 0, + 0, + 0.0, + 0.0, + 0, + server + ) } private val lastPowerConsumption = mutableMapOf<Server, Double>() @@ -106,23 +117,62 @@ class ParquetExperimentMonitor(val run: Run) : ExperimentMonitor { hostServer: Server, duration: Long ) { - lastServerStates[hostServer] = Pair(hostServer.state, time) + val previousEvent = currentHostEvent[hostServer] + when { + previousEvent == null -> { + val event = HostEvent( + time, + 5 * 60 * 1000L, + hostServer, + numberOfDeployedImages, + requestedBurst, + grantedBurst, + overcommissionedBurst, + interferedBurst, + cpuUsage, + cpuDemand, + lastPowerConsumption[hostServer] ?: 200.0 + ) - hostWriter.write( - HostEvent( - time, - duration, - hostServer, - numberOfDeployedImages, - requestedBurst, - grantedBurst, - overcommissionedBurst, - interferedBurst, - cpuUsage, - cpuDemand, - lastPowerConsumption[hostServer] ?: 200.0 - ) - ) + currentHostEvent[hostServer] = event + } + previousEvent.timestamp == time -> { + val event = HostEvent( + time, + previousEvent.duration, + hostServer, + numberOfDeployedImages, + requestedBurst, + grantedBurst, + overcommissionedBurst, + interferedBurst, + cpuUsage, + cpuDemand, + lastPowerConsumption[hostServer] ?: 200.0 + ) + + currentHostEvent[hostServer] = event + } + else -> { + hostWriter.write(previousEvent) + + val event = HostEvent( + time, + time - previousEvent.timestamp, + hostServer, + numberOfDeployedImages, + requestedBurst, + grantedBurst, + overcommissionedBurst, + interferedBurst, + cpuUsage, + cpuDemand, + lastPowerConsumption[hostServer] ?: 200.0 + ) + + currentHostEvent[hostServer] = event + } + } } override fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) { @@ -141,6 +191,12 @@ class ParquetExperimentMonitor(val run: Run) : ExperimentMonitor { } override fun close() { + // Flush remaining events + for ((_, event) in currentHostEvent) { + hostWriter.write(event) + } + currentHostEvent.clear() + hostWriter.close() provisionerWriter.close() } diff --git a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt index abd5c961..68c2cbc5 100644 --- a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt +++ b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt @@ -47,9 +47,11 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Assertions.assertAll import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll import java.io.File import java.util.ServiceLoader @@ -134,6 +136,7 @@ class Sc20IntegrationTest { failureDomain?.cancel() scheduler.terminate() + monitor.close() } runSimulation() @@ -147,6 +150,48 @@ class Sc20IntegrationTest { assertEquals(0, monitor.totalInterferedBurst) } + @Test + fun small() { + val seed = 1 + val chan = Channel<Unit>(Channel.CONFLATED) + val allocationPolicy = AvailableCoreMemoryAllocationPolicy() + val traceReader = createTestTraceReader(0.5, seed) + val environmentReader = createTestEnvironmentReader("single") + lateinit var scheduler: SimpleVirtProvisioningService + + root.launch { + val res = createProvisioner( + root, + environmentReader, + allocationPolicy + ) + scheduler = res.second + + attachMonitor(scheduler, monitor) + processTrace( + traceReader, + scheduler, + chan, + monitor + ) + + println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") + + scheduler.terminate() + monitor.close() + } + + runSimulation() + + // Note that these values have been verified beforehand + assertAll( + { assertEquals(96344114723, monitor.totalRequestedBurst) }, + { assertEquals(96324378235, monitor.totalGrantedBurst) }, + { assertEquals(19736424, monitor.totalOvercommissionedBurst) }, + { assertEquals(0, monitor.totalInterferedBurst) } + ) + } + /** * Run the simulation. */ @@ -157,20 +202,20 @@ class Sc20IntegrationTest { /** * Obtain the trace reader for the test. */ - private fun createTestTraceReader(): TraceReader<VmWorkload> { + private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<VmWorkload> { return Sc20ParquetTraceReader( Sc20RawParquetTraceReader(File("src/test/resources/trace")), emptyMap(), - Workload("test", 1.0), - 0 + Workload("test", fraction), + seed ) } /** * Obtain the environment reader for the test. */ - private fun createTestEnvironmentReader(): EnvironmentReader { - val stream = object {}.javaClass.getResourceAsStream("/env/topology.txt") + private fun createTestEnvironmentReader(name: String = "topology"): EnvironmentReader { + val stream = object {}.javaClass.getResourceAsStream("/env/$name.txt") return Sc20ClusterEnvironmentReader(stream) } @@ -197,6 +242,7 @@ class Sc20IntegrationTest { totalOvercommissionedBurst += overcommissionedBurst totalInterferedBurst += interferedBurst } + override fun close() {} } } diff --git a/opendc/opendc-experiments-sc20/src/test/resources/env/single.txt b/opendc/opendc-experiments-sc20/src/test/resources/env/single.txt new file mode 100644 index 00000000..53b3c2d7 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/test/resources/env/single.txt @@ -0,0 +1,3 @@ +ClusterID;ClusterName;Cores;Speed;Memory;numberOfHosts;memoryCapacityPerHost;coreCountPerHost +A01;A01;8;3.2;64;1;64;8 + |
