summaryrefslogtreecommitdiff
path: root/opendc/opendc-experiments-sc20/src
diff options
context:
space:
mode:
authorGeorgios Andreadis <g.andreadis@student.tudelft.nl>2020-05-20 15:59:54 +0200
committerGeorgios Andreadis <g.andreadis@student.tudelft.nl>2020-05-20 15:59:54 +0200
commit70ad01d793f88b1bef7d7988d24bff384ddbb3b9 (patch)
tree10b4d6053d1cd58e921f71ff7b0d6f0cf7bab75a /opendc/opendc-experiments-sc20/src
parentee494d6ce6f817cf4e9ab0dba0d9f9f1987c0029 (diff)
parent21eafd32c45495ab9e8ebbeffbdbe1d43ffe566b (diff)
Merge branch 'perf/batch-slices' into '2.x'
Batch VM slices See merge request opendc/opendc-simulator!70
Diffstat (limited to 'opendc/opendc-experiments-sc20/src')
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt6
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt134
-rw-r--r--opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt56
-rw-r--r--opendc/opendc-experiments-sc20/src/test/resources/env/single.txt3
4 files changed, 151 insertions, 48 deletions
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
index 83952d43..9d2b0247 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
@@ -172,7 +172,7 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Exp
.onEach { event ->
when (event) {
is HypervisorEvent.SliceFinished -> monitor.reportHostSlice(
- simulationContext.clock.millis(),
+ clock.millis(),
event.requestedBurst,
event.grantedBurst,
event.overcommissionedBurst,
@@ -242,10 +242,8 @@ suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtP
// Monitor server events
server.events
.onEach {
- val time = simulationContext.clock.millis()
-
if (it is ServerEvent.StateChanged) {
- monitor.reportVmStateChange(time, it.server)
+ monitor.reportVmStateChange(simulationContext.clock.millis(), it.server)
}
delay(1)
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt
index 138905a4..7f71eb3e 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt
@@ -25,7 +25,6 @@
package com.atlarge.opendc.experiments.sc20.experiment.monitor
import com.atlarge.opendc.compute.core.Server
-import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.virt.driver.VirtDriver
import com.atlarge.opendc.compute.virt.service.VirtProvisioningEvent
import com.atlarge.opendc.experiments.sc20.experiment.Run
@@ -54,9 +53,17 @@ class ParquetExperimentMonitor(val run: Run) : ExperimentMonitor {
File(run.parent.parent.parent.output, "provisioner-metrics/$partition/data.parquet"),
run.parent.parent.parent.bufferSize
)
- private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>()
+ private val currentHostEvent = mutableMapOf<Server, HostEvent>()
+ private var startTime = -1L
- override fun reportVmStateChange(time: Long, server: Server) {}
+ override fun reportVmStateChange(time: Long, server: Server) {
+ if (startTime < 0) {
+ startTime = time
+
+ // Update timestamp of initial event
+ currentHostEvent.replaceAll { k, v -> v.copy(timestamp = startTime) }
+ }
+ }
override fun reportHostStateChange(
time: Long,
@@ -65,27 +72,31 @@ class ParquetExperimentMonitor(val run: Run) : ExperimentMonitor {
) {
logger.debug { "Host ${server.uid} changed state ${server.state} [$time]" }
- val lastServerState = lastServerStates[server]
- if (server.state == ServerState.SHUTOFF && lastServerState != null) {
- val duration = time - lastServerState.second
- reportHostSlice(
- time,
- 0,
- 0,
- 0,
- 0,
- 0.0,
- 0.0,
- 0,
- server,
- duration
- )
+ val previousEvent = currentHostEvent[server]
- lastServerStates.remove(server)
- lastPowerConsumption.remove(server)
- } else {
- lastServerStates[server] = Pair(server.state, time)
- }
+ val roundedTime = previousEvent?.let {
+ val duration = time - it.timestamp
+ val k = 5 * 60 * 1000L // 5 min in ms
+ val rem = duration % k
+
+ if (rem == 0L) {
+ time
+ } else {
+ it.timestamp + duration + k - rem
+ }
+ } ?: time
+
+ reportHostSlice(
+ roundedTime,
+ 0,
+ 0,
+ 0,
+ 0,
+ 0.0,
+ 0.0,
+ 0,
+ server
+ )
}
private val lastPowerConsumption = mutableMapOf<Server, Double>()
@@ -106,23 +117,62 @@ class ParquetExperimentMonitor(val run: Run) : ExperimentMonitor {
hostServer: Server,
duration: Long
) {
- lastServerStates[hostServer] = Pair(hostServer.state, time)
+ val previousEvent = currentHostEvent[hostServer]
+ when {
+ previousEvent == null -> {
+ val event = HostEvent(
+ time,
+ 5 * 60 * 1000L,
+ hostServer,
+ numberOfDeployedImages,
+ requestedBurst,
+ grantedBurst,
+ overcommissionedBurst,
+ interferedBurst,
+ cpuUsage,
+ cpuDemand,
+ lastPowerConsumption[hostServer] ?: 200.0
+ )
- hostWriter.write(
- HostEvent(
- time,
- duration,
- hostServer,
- numberOfDeployedImages,
- requestedBurst,
- grantedBurst,
- overcommissionedBurst,
- interferedBurst,
- cpuUsage,
- cpuDemand,
- lastPowerConsumption[hostServer] ?: 200.0
- )
- )
+ currentHostEvent[hostServer] = event
+ }
+ previousEvent.timestamp == time -> {
+ val event = HostEvent(
+ time,
+ previousEvent.duration,
+ hostServer,
+ numberOfDeployedImages,
+ requestedBurst,
+ grantedBurst,
+ overcommissionedBurst,
+ interferedBurst,
+ cpuUsage,
+ cpuDemand,
+ lastPowerConsumption[hostServer] ?: 200.0
+ )
+
+ currentHostEvent[hostServer] = event
+ }
+ else -> {
+ hostWriter.write(previousEvent)
+
+ val event = HostEvent(
+ time,
+ time - previousEvent.timestamp,
+ hostServer,
+ numberOfDeployedImages,
+ requestedBurst,
+ grantedBurst,
+ overcommissionedBurst,
+ interferedBurst,
+ cpuUsage,
+ cpuDemand,
+ lastPowerConsumption[hostServer] ?: 200.0
+ )
+
+ currentHostEvent[hostServer] = event
+ }
+ }
}
override fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) {
@@ -141,6 +191,12 @@ class ParquetExperimentMonitor(val run: Run) : ExperimentMonitor {
}
override fun close() {
+ // Flush remaining events
+ for ((_, event) in currentHostEvent) {
+ hostWriter.write(event)
+ }
+ currentHostEvent.clear()
+
hostWriter.close()
provisionerWriter.close()
}
diff --git a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
index abd5c961..68c2cbc5 100644
--- a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
+++ b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
@@ -47,9 +47,11 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.AfterEach
+import org.junit.jupiter.api.Assertions.assertAll
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertAll
import java.io.File
import java.util.ServiceLoader
@@ -134,6 +136,7 @@ class Sc20IntegrationTest {
failureDomain?.cancel()
scheduler.terminate()
+ monitor.close()
}
runSimulation()
@@ -147,6 +150,48 @@ class Sc20IntegrationTest {
assertEquals(0, monitor.totalInterferedBurst)
}
+ @Test
+ fun small() {
+ val seed = 1
+ val chan = Channel<Unit>(Channel.CONFLATED)
+ val allocationPolicy = AvailableCoreMemoryAllocationPolicy()
+ val traceReader = createTestTraceReader(0.5, seed)
+ val environmentReader = createTestEnvironmentReader("single")
+ lateinit var scheduler: SimpleVirtProvisioningService
+
+ root.launch {
+ val res = createProvisioner(
+ root,
+ environmentReader,
+ allocationPolicy
+ )
+ scheduler = res.second
+
+ attachMonitor(scheduler, monitor)
+ processTrace(
+ traceReader,
+ scheduler,
+ chan,
+ monitor
+ )
+
+ println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}")
+
+ scheduler.terminate()
+ monitor.close()
+ }
+
+ runSimulation()
+
+ // Note that these values have been verified beforehand
+ assertAll(
+ { assertEquals(96344114723, monitor.totalRequestedBurst) },
+ { assertEquals(96324378235, monitor.totalGrantedBurst) },
+ { assertEquals(19736424, monitor.totalOvercommissionedBurst) },
+ { assertEquals(0, monitor.totalInterferedBurst) }
+ )
+ }
+
/**
* Run the simulation.
*/
@@ -157,20 +202,20 @@ class Sc20IntegrationTest {
/**
* Obtain the trace reader for the test.
*/
- private fun createTestTraceReader(): TraceReader<VmWorkload> {
+ private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<VmWorkload> {
return Sc20ParquetTraceReader(
Sc20RawParquetTraceReader(File("src/test/resources/trace")),
emptyMap(),
- Workload("test", 1.0),
- 0
+ Workload("test", fraction),
+ seed
)
}
/**
* Obtain the environment reader for the test.
*/
- private fun createTestEnvironmentReader(): EnvironmentReader {
- val stream = object {}.javaClass.getResourceAsStream("/env/topology.txt")
+ private fun createTestEnvironmentReader(name: String = "topology"): EnvironmentReader {
+ val stream = object {}.javaClass.getResourceAsStream("/env/$name.txt")
return Sc20ClusterEnvironmentReader(stream)
}
@@ -197,6 +242,7 @@ class Sc20IntegrationTest {
totalOvercommissionedBurst += overcommissionedBurst
totalInterferedBurst += interferedBurst
}
+
override fun close() {}
}
}
diff --git a/opendc/opendc-experiments-sc20/src/test/resources/env/single.txt b/opendc/opendc-experiments-sc20/src/test/resources/env/single.txt
new file mode 100644
index 00000000..53b3c2d7
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/test/resources/env/single.txt
@@ -0,0 +1,3 @@
+ClusterID;ClusterName;Cores;Speed;Memory;numberOfHosts;memoryCapacityPerHost;coreCountPerHost
+A01;A01;8;3.2;64;1;64;8
+