From a5d22796a95b187bc07cbd55a2289185bd9092b8 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 16 Mar 2020 22:42:58 +0100 Subject: feat: Track VM failures in SC20 experiment --- .../experiments/sc20/Sc20HypervisorMonitor.kt | 40 ------------------ .../atlarge/opendc/experiments/sc20/Sc20Monitor.kt | 49 ++++++++++++++++++++++ .../opendc/experiments/sc20/TestExperiment.kt | 14 ++----- 3 files changed, 52 insertions(+), 51 deletions(-) delete mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20HypervisorMonitor.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt (limited to 'opendc/opendc-experiments-sc20/src') diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20HypervisorMonitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20HypervisorMonitor.kt deleted file mode 100644 index 9e8f0fa8..00000000 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20HypervisorMonitor.kt +++ /dev/null @@ -1,40 +0,0 @@ -package com.atlarge.opendc.experiments.sc20 - -import com.atlarge.opendc.compute.core.Server -import com.atlarge.opendc.compute.metal.driver.BareMetalDriver -import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor -import kotlinx.coroutines.flow.first -import java.io.BufferedWriter -import java.io.Closeable -import java.io.FileWriter - -class Sc20HypervisorMonitor( - destination: String -) : HypervisorMonitor, Closeable { - private val outputFile = BufferedWriter(FileWriter(destination)) - - init { - outputFile.write("time,requestedBurst,grantedBurst,numberOfDeployedImages,server,hostUsage,powerDraw\n") - } - - override suspend fun onSliceFinish( - time: Long, - requestedBurst: Long, - grantedBurst: Long, - numberOfDeployedImages: Int, - hostServer: Server - ) { - // Assume for now that the host is not virtualized and measure the current power draw - val driver = hostServer.serviceRegistry[BareMetalDriver.Key] - val usage = driver.usage.first() - val powerDraw = driver.powerDraw.first() - - outputFile.write("$time,$requestedBurst,$grantedBurst,$numberOfDeployedImages,${hostServer.uid},$usage,$powerDraw") - outputFile.newLine() - } - - override fun close() { - outputFile.flush() - outputFile.close() - } -} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt new file mode 100644 index 00000000..6ce9cefa --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt @@ -0,0 +1,49 @@ +package com.atlarge.opendc.experiments.sc20 + +import com.atlarge.opendc.compute.core.Server +import com.atlarge.opendc.compute.core.ServerState +import com.atlarge.opendc.compute.core.monitor.ServerMonitor +import com.atlarge.opendc.compute.metal.driver.BareMetalDriver +import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor +import kotlinx.coroutines.flow.first +import java.io.BufferedWriter +import java.io.Closeable +import java.io.FileWriter + +class Sc20Monitor( + destination: String +) : HypervisorMonitor, ServerMonitor, Closeable { + private val outputFile = BufferedWriter(FileWriter(destination)) + private var failed: Int = 0 + + init { + outputFile.write("time,requestedBurst,grantedBurst,numberOfDeployedImages,server,hostUsage,powerDraw\n") + } + + override suspend fun onUpdate(server: Server, previousState: ServerState) { + if (server.state == ServerState.ERROR) { + failed++ + } + } + + override suspend fun onSliceFinish( + time: Long, + requestedBurst: Long, + grantedBurst: Long, + numberOfDeployedImages: Int, + hostServer: Server + ) { + // Assume for now that the host is not virtualized and measure the current power draw + val driver = hostServer.serviceRegistry[BareMetalDriver.Key] + val usage = driver.usage.first() + val powerDraw = driver.powerDraw.first() + + outputFile.write("$time,$requestedBurst,$grantedBurst,$numberOfDeployedImages,${hostServer.uid},$usage,$powerDraw,$failed") + outputFile.newLine() + } + + override fun close() { + outputFile.flush() + outputFile.close() + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index f0d3fc8d..e47438f0 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -27,9 +27,6 @@ package com.atlarge.opendc.experiments.sc20 import com.atlarge.odcsim.SimulationEngineProvider import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor -import com.atlarge.opendc.compute.core.Server -import com.atlarge.opendc.compute.core.ServerState -import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy @@ -85,12 +82,7 @@ class ExperimentParameters(parser: ArgParser) { */ fun main(args: Array) { ArgParser(args).parseInto(::ExperimentParameters).run { - val hypervisorMonitor = Sc20HypervisorMonitor(outputFile) - val monitor = object : ServerMonitor { - override suspend fun onUpdate(server: Server, previousState: ServerState) { - println(server) - } - } + val monitor = Sc20Monitor(outputFile) val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() val system = provider("test") @@ -115,7 +107,7 @@ fun main(args: Array) { AvailableMemoryAllocationPolicy(), simulationContext, environment.platforms[0].zones[0].services[ProvisioningService.Key], - hypervisorMonitor + monitor ) val reader = Sc20TraceReader(File(traceDirectory), performanceInterferenceModel, getSelectedVmList()) @@ -134,6 +126,6 @@ fun main(args: Array) { } // Explicitly close the monitor to flush its buffer - hypervisorMonitor.close() + monitor.close() } } -- cgit v1.2.3 From 5ff443c799322836d532fffb3ff8f720806c32b6 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 16 Mar 2020 23:07:22 +0100 Subject: feat: Add failures to SC20 experiment --- .../atlarge/opendc/experiments/sc20/Sc20Monitor.kt | 1 + .../opendc/experiments/sc20/TestExperiment.kt | 21 ++++++++++++++++++--- 2 files changed, 19 insertions(+), 3 deletions(-) (limited to 'opendc/opendc-experiments-sc20/src') diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt index 6ce9cefa..d3b2d5c6 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt @@ -1,5 +1,6 @@ package com.atlarge.opendc.experiments.sc20 +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.monitor.ServerMonitor diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index e47438f0..a1619fe2 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -30,6 +30,8 @@ import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy +import com.atlarge.opendc.core.failure.FailureDomain +import com.atlarge.opendc.core.failure.UncorrelatedFaultInjector import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader import com.atlarge.opendc.format.trace.sc20.Sc20TraceReader @@ -37,11 +39,11 @@ import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue import com.xenomachina.argparser.ArgParser import com.xenomachina.argparser.default +import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import java.io.File -import java.io.FileInputStream import java.io.FileReader import java.util.ServiceLoader import kotlin.math.max @@ -88,12 +90,14 @@ fun main(args: Array) { val system = provider("test") val root = system.newDomain("root") + val chan = Channel(Channel.CONFLATED) + root.launch { val environment = Sc20ClusterEnvironmentReader(File(environmentFile)) .use { it.construct(root) } val performanceInterferenceStream = if (performanceInterferenceFile != null) { - FileInputStream(File(performanceInterferenceFile!!)) + File(performanceInterferenceFile!!).inputStream().buffered() } else { object {}.javaClass.getResourceAsStream("/env/performance-interference.json") } @@ -103,17 +107,28 @@ fun main(args: Array) { println(simulationContext.clock.instant()) + val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService.Key] + val scheduler = SimpleVirtProvisioningService( AvailableMemoryAllocationPolicy(), simulationContext, - environment.platforms[0].zones[0].services[ProvisioningService.Key], + bareMetalProvisioner, monitor ) + root.launch { + chan.receive() + val faultInjector = UncorrelatedFaultInjector(mu = 2e7) + for (node in bareMetalProvisioner.nodes()) { + faultInjector.enqueue(node.metadata["driver"] as FailureDomain) + } + } + val reader = Sc20TraceReader(File(traceDirectory), performanceInterferenceModel, getSelectedVmList()) while (reader.hasNext()) { val (time, workload) = reader.next() delay(max(0, time - simulationContext.clock.millis())) + chan.send(Unit) scheduler.deploy(workload.image, monitor, Flavor(workload.image.cores, workload.image.requiredMemory)) } -- cgit v1.2.3 From b3e8e3d196de8b8c1bb904bfb3c6641415cf72bb Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 17 Mar 2020 15:52:10 +0100 Subject: feat: Use Weilbull distribution for failures --- .../main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt | 3 ++- .../main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) (limited to 'opendc/opendc-experiments-sc20/src') diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt index d3b2d5c6..40cb9719 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt @@ -18,10 +18,11 @@ class Sc20Monitor( private var failed: Int = 0 init { - outputFile.write("time,requestedBurst,grantedBurst,numberOfDeployedImages,server,hostUsage,powerDraw\n") + outputFile.write("time,requestedBurst,grantedBurst,numberOfDeployedImages,server,hostUsage,powerDraw,failedVms\n") } override suspend fun onUpdate(server: Server, previousState: ServerState) { + println("${simulationContext.clock.instant()} ${server.uid} ${server.state}") if (server.state == ServerState.ERROR) { failed++ } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index a1619fe2..69174f0f 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -89,7 +89,6 @@ fun main(args: Array) { val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() val system = provider("test") val root = system.newDomain("root") - val chan = Channel(Channel.CONFLATED) root.launch { @@ -118,7 +117,8 @@ fun main(args: Array) { root.launch { chan.receive() - val faultInjector = UncorrelatedFaultInjector(mu = 2e7) + // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 + val faultInjector = UncorrelatedFaultInjector(alpha = 9.66772, beta = 12.23796) for (node in bareMetalProvisioner.nodes()) { faultInjector.enqueue(node.metadata["driver"] as FailureDomain) } -- cgit v1.2.3 From 6b10881f123f5e6a8e7bce1045d02eba5e48c3a2 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 17 Mar 2020 17:37:41 +0100 Subject: [ci skip] feat: Add support for correlated failures --- .../com/atlarge/opendc/experiments/sc20/TestExperiment.kt | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) (limited to 'opendc/opendc-experiments-sc20/src') diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index 69174f0f..09b6592e 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -30,8 +30,8 @@ import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy +import com.atlarge.opendc.core.failure.CorrelatedFaultInjector import com.atlarge.opendc.core.failure.FailureDomain -import com.atlarge.opendc.core.failure.UncorrelatedFaultInjector import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader import com.atlarge.opendc.format.trace.sc20.Sc20TraceReader @@ -39,6 +39,7 @@ import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue import com.xenomachina.argparser.ArgParser import com.xenomachina.argparser.default +import kotlinx.coroutines.Job import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay import kotlinx.coroutines.launch @@ -115,10 +116,14 @@ fun main(args: Array) { monitor ) - root.launch { + val faultInjectorDomain = root.newDomain(name = "failures") + faultInjectorDomain.launch { chan.receive() // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 - val faultInjector = UncorrelatedFaultInjector(alpha = 9.66772, beta = 12.23796) + val faultInjector = CorrelatedFaultInjector(faultInjectorDomain, + iatScale = -1.39, iatShape = 1.03, + sizeScale = 1.88, sizeShape = 1.25 + ) for (node in bareMetalProvisioner.nodes()) { faultInjector.enqueue(node.metadata["driver"] as FailureDomain) } -- cgit v1.2.3 From b1cf9b2bd9559328c3c9d26e73123e67d2bfea05 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 17 Mar 2020 22:26:15 +0100 Subject: refactor: Rework monitor interfaces --- .../main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt | 7 +++---- .../kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt | 1 - 2 files changed, 3 insertions(+), 5 deletions(-) (limited to 'opendc/opendc-experiments-sc20/src') diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt index 40cb9719..0f4d0c1b 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt @@ -1,6 +1,5 @@ package com.atlarge.opendc.experiments.sc20 -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.monitor.ServerMonitor @@ -21,8 +20,8 @@ class Sc20Monitor( outputFile.write("time,requestedBurst,grantedBurst,numberOfDeployedImages,server,hostUsage,powerDraw,failedVms\n") } - override suspend fun onUpdate(server: Server, previousState: ServerState) { - println("${simulationContext.clock.instant()} ${server.uid} ${server.state}") + override fun stateChanged(server: Server, previousState: ServerState) { + println("${server.uid} ${server.state}") if (server.state == ServerState.ERROR) { failed++ } @@ -36,7 +35,7 @@ class Sc20Monitor( hostServer: Server ) { // Assume for now that the host is not virtualized and measure the current power draw - val driver = hostServer.serviceRegistry[BareMetalDriver.Key] + val driver = hostServer.services[BareMetalDriver.Key] val usage = driver.usage.first() val powerDraw = driver.powerDraw.first() diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index 09b6592e..efc85653 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -39,7 +39,6 @@ import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue import com.xenomachina.argparser.ArgParser import com.xenomachina.argparser.default -import kotlinx.coroutines.Job import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay import kotlinx.coroutines.launch -- cgit v1.2.3 From bc64182612ad06f15bff5b48637ed7d241e293b2 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 18 Mar 2020 00:50:05 +0100 Subject: [ci skip] refactor: Refactor monitors into EventFlow --- .../main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'opendc/opendc-experiments-sc20/src') diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index efc85653..4273c39e 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -112,7 +112,6 @@ fun main(args: Array) { AvailableMemoryAllocationPolicy(), simulationContext, bareMetalProvisioner, - monitor ) val faultInjectorDomain = root.newDomain(name = "failures") @@ -133,7 +132,7 @@ fun main(args: Array) { val (time, workload) = reader.next() delay(max(0, time - simulationContext.clock.millis())) chan.send(Unit) - scheduler.deploy(workload.image, monitor, Flavor(workload.image.cores, workload.image.requiredMemory)) + scheduler.deploy(workload.image, Flavor(workload.image.cores, workload.image.requiredMemory)) } println(simulationContext.clock.instant()) -- cgit v1.2.3 From 76bfeb44c5a02be143c152c52bc1029cff360744 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sat, 21 Mar 2020 22:04:31 +0100 Subject: refactor: Migrate to Flow for event listeners --- .../com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt | 8 +++----- .../com/atlarge/opendc/experiments/sc20/TestExperiment.kt | 13 +++++++++---- 2 files changed, 12 insertions(+), 9 deletions(-) (limited to 'opendc/opendc-experiments-sc20/src') diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt index 0f4d0c1b..e18bbe30 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt @@ -2,9 +2,7 @@ package com.atlarge.opendc.experiments.sc20 import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerState -import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.metal.driver.BareMetalDriver -import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor import kotlinx.coroutines.flow.first import java.io.BufferedWriter import java.io.Closeable @@ -12,7 +10,7 @@ import java.io.FileWriter class Sc20Monitor( destination: String -) : HypervisorMonitor, ServerMonitor, Closeable { +) : Closeable { private val outputFile = BufferedWriter(FileWriter(destination)) private var failed: Int = 0 @@ -20,14 +18,14 @@ class Sc20Monitor( outputFile.write("time,requestedBurst,grantedBurst,numberOfDeployedImages,server,hostUsage,powerDraw,failedVms\n") } - override fun stateChanged(server: Server, previousState: ServerState) { + fun stateChanged(server: Server) { println("${server.uid} ${server.state}") if (server.state == ServerState.ERROR) { failed++ } } - override suspend fun onSliceFinish( + suspend fun onSliceFinish( time: Long, requestedBurst: Long, grantedBurst: Long, diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index 4273c39e..96033ea7 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -27,6 +27,7 @@ package com.atlarge.opendc.experiments.sc20 import com.atlarge.odcsim.SimulationEngineProvider import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor +import com.atlarge.opendc.compute.core.ServerEvent import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy @@ -41,6 +42,8 @@ import com.xenomachina.argparser.ArgParser import com.xenomachina.argparser.default import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import java.io.File @@ -107,11 +110,10 @@ fun main(args: Array) { println(simulationContext.clock.instant()) val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService.Key] - val scheduler = SimpleVirtProvisioningService( AvailableMemoryAllocationPolicy(), simulationContext, - bareMetalProvisioner, + bareMetalProvisioner ) val faultInjectorDomain = root.newDomain(name = "failures") @@ -131,8 +133,11 @@ fun main(args: Array) { while (reader.hasNext()) { val (time, workload) = reader.next() delay(max(0, time - simulationContext.clock.millis())) - chan.send(Unit) - scheduler.deploy(workload.image, Flavor(workload.image.cores, workload.image.requiredMemory)) + launch { + chan.send(Unit) + val server = scheduler.deploy(workload.image, Flavor(workload.image.cores, workload.image.requiredMemory)) + server.events.onEach { if (it is ServerEvent.StateChanged) monitor.stateChanged(it.server) }.collect() + } } println(simulationContext.clock.instant()) -- cgit v1.2.3 From edce7993772182bac0d0c74d22189137b35872aa Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 24 Mar 2020 20:36:54 +0100 Subject: refactor: Add support for specifying VM name --- .../kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'opendc/opendc-experiments-sc20/src') diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index 96033ea7..6d832ee4 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -135,7 +135,10 @@ fun main(args: Array) { delay(max(0, time - simulationContext.clock.millis())) launch { chan.send(Unit) - val server = scheduler.deploy(workload.image, Flavor(workload.image.cores, workload.image.requiredMemory)) + val server = scheduler.deploy( + workload.image.name, workload.image, + Flavor(workload.image.cores, workload.image.requiredMemory) + ) server.events.onEach { if (it is ServerEvent.StateChanged) monitor.stateChanged(it.server) }.collect() } } -- cgit v1.2.3 From 225a9dd042870b1320681104aa022120611cc92b Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 24 Mar 2020 22:04:06 +0100 Subject: feat: Record hypervisor events during experiment --- .../atlarge/opendc/experiments/sc20/Sc20Monitor.kt | 5 +++-- .../opendc/experiments/sc20/TestExperiment.kt | 21 +++++++++++++++++++-- 2 files changed, 22 insertions(+), 4 deletions(-) (limited to 'opendc/opendc-experiments-sc20/src') diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt index e18bbe30..36da7703 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt @@ -1,5 +1,6 @@ package com.atlarge.opendc.experiments.sc20 +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.metal.driver.BareMetalDriver @@ -18,8 +19,8 @@ class Sc20Monitor( outputFile.write("time,requestedBurst,grantedBurst,numberOfDeployedImages,server,hostUsage,powerDraw,failedVms\n") } - fun stateChanged(server: Server) { - println("${server.uid} ${server.state}") + suspend fun stateChanged(server: Server) { + println("[${simulationContext.clock.millis()}] ${server.uid} ${server.state}") if (server.state == ServerState.ERROR) { failed++ } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index 6d832ee4..0fafc118 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -29,10 +29,10 @@ import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.ServerEvent import com.atlarge.opendc.compute.metal.service.ProvisioningService +import com.atlarge.opendc.compute.virt.HypervisorEvent import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy import com.atlarge.opendc.core.failure.CorrelatedFaultInjector -import com.atlarge.opendc.core.failure.FailureDomain import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader import com.atlarge.opendc.format.trace.sc20.Sc20TraceReader @@ -43,6 +43,7 @@ import com.xenomachina.argparser.default import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking @@ -116,6 +117,21 @@ fun main(args: Array) { bareMetalProvisioner ) + // Wait for the hypervisors to be spawned + delay(10) + + // Monitor hypervisor events + for (hypervisor in scheduler.drivers()) { + hypervisor.events + .onEach { event -> + when (event) { + is HypervisorEvent.SliceFinished -> monitor.onSliceFinish(simulationContext.clock.millis(), event.requestedBurst, event.grantedBurst, event.numberOfDeployedImages, event.hostServer) + else -> println(event) + } + } + .launchIn(this) + } + val faultInjectorDomain = root.newDomain(name = "failures") faultInjectorDomain.launch { chan.receive() @@ -125,7 +141,7 @@ fun main(args: Array) { sizeScale = 1.88, sizeShape = 1.25 ) for (node in bareMetalProvisioner.nodes()) { - faultInjector.enqueue(node.metadata["driver"] as FailureDomain) + // faultInjector.enqueue(node.metadata["driver"] as FailureDomain) } } @@ -139,6 +155,7 @@ fun main(args: Array) { workload.image.name, workload.image, Flavor(workload.image.cores, workload.image.requiredMemory) ) + // Monitor server events server.events.onEach { if (it is ServerEvent.StateChanged) monitor.stateChanged(it.server) }.collect() } } -- cgit v1.2.3 From 27a8f2312bf9207314abb201ed74f021b818f8af Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 24 Mar 2020 22:58:27 +0100 Subject: bug: Fix race condition in VM provisioner This change fixes a race condition in the VM provisioner where VMs were scheduled based on stale information. --- .../com/atlarge/opendc/experiments/sc20/TestExperiment.kt | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) (limited to 'opendc/opendc-experiments-sc20/src') diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index 0fafc118..66b20bff 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -40,6 +40,7 @@ import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue import com.xenomachina.argparser.ArgParser import com.xenomachina.argparser.default +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay import kotlinx.coroutines.flow.collect @@ -86,6 +87,7 @@ class ExperimentParameters(parser: ArgParser) { /** * Main entry point of the experiment. */ +@OptIn(ExperimentalCoroutinesApi::class) fun main(args: Array) { ArgParser(args).parseInto(::ExperimentParameters).run { val monitor = Sc20Monitor(outputFile) @@ -111,6 +113,10 @@ fun main(args: Array) { println(simulationContext.clock.instant()) val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService.Key] + + // Wait for the bare metal nodes to be spawned + delay(10) + val scheduler = SimpleVirtProvisioningService( AvailableMemoryAllocationPolicy(), simulationContext, @@ -140,9 +146,9 @@ fun main(args: Array) { iatScale = -1.39, iatShape = 1.03, sizeScale = 1.88, sizeShape = 1.25 ) - for (node in bareMetalProvisioner.nodes()) { + // for (node in bareMetalProvisioner.nodes()) { // faultInjector.enqueue(node.metadata["driver"] as FailureDomain) - } + // } } val reader = Sc20TraceReader(File(traceDirectory), performanceInterferenceModel, getSelectedVmList()) -- cgit v1.2.3 From 7eb8177e2278bde2c0f4fad00af6fdd2d632cb5b Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 26 Mar 2020 12:37:54 +0100 Subject: feat: Implement correlated failures for individual clusters --- .../opendc/experiments/sc20/TestExperiment.kt | 33 +++++++++++++++------- 1 file changed, 23 insertions(+), 10 deletions(-) (limited to 'opendc/opendc-experiments-sc20/src') diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index 66b20bff..639c3aef 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -24,15 +24,19 @@ package com.atlarge.opendc.experiments.sc20 +import com.atlarge.odcsim.Domain import com.atlarge.odcsim.SimulationEngineProvider import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.ServerEvent +import com.atlarge.opendc.compute.metal.NODE_CLUSTER import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.compute.virt.HypervisorEvent import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy import com.atlarge.opendc.core.failure.CorrelatedFaultInjector +import com.atlarge.opendc.core.failure.FailureDomain +import com.atlarge.opendc.core.failure.FaultInjector import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader import com.atlarge.opendc.format.trace.sc20.Sc20TraceReader @@ -84,6 +88,17 @@ class ExperimentParameters(parser: ArgParser) { } } +/** + * Obtain the [FaultInjector] to use for the experiments. + */ +fun createFaultInjector(domain: Domain): FaultInjector { + // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 + return CorrelatedFaultInjector(domain, + iatScale = -1.39, iatShape = 1.03, + sizeScale = 1.88, sizeShape = 1.25 + ) +} + /** * Main entry point of the experiment. */ @@ -138,17 +153,15 @@ fun main(args: Array) { .launchIn(this) } - val faultInjectorDomain = root.newDomain(name = "failures") - faultInjectorDomain.launch { + root.newDomain(name = "failures").launch { chan.receive() - // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 - val faultInjector = CorrelatedFaultInjector(faultInjectorDomain, - iatScale = -1.39, iatShape = 1.03, - sizeScale = 1.88, sizeShape = 1.25 - ) - // for (node in bareMetalProvisioner.nodes()) { - // faultInjector.enqueue(node.metadata["driver"] as FailureDomain) - // } + val injectors = mutableMapOf() + + for (node in bareMetalProvisioner.nodes()) { + val cluster = node.metadata[NODE_CLUSTER] as String + val injector = injectors.getOrPut(cluster) { createFaultInjector(simulationContext.domain) } + injector.enqueue(node.metadata["driver"] as FailureDomain) + } } val reader = Sc20TraceReader(File(traceDirectory), performanceInterferenceModel, getSelectedVmList()) -- cgit v1.2.3