diff options
Diffstat (limited to 'opendc')
2 files changed, 32 insertions, 4 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt index fb874e22..156521db 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt @@ -6,7 +6,9 @@ import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerEvent import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.image.Image +import com.atlarge.opendc.compute.core.image.VmImage import com.atlarge.opendc.compute.metal.service.ProvisioningService +import com.atlarge.opendc.compute.virt.HypervisorEvent import com.atlarge.opendc.compute.virt.driver.VirtDriver import com.atlarge.opendc.compute.virt.HypervisorImage import com.atlarge.opendc.compute.virt.driver.InsufficientMemoryOnServerException @@ -15,7 +17,8 @@ import com.atlarge.opendc.core.services.ServiceKey import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.Job -import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.suspendCancellableCoroutine @@ -60,7 +63,7 @@ class SimpleVirtProvisioningService( is ServerEvent.StateChanged -> stateChanged(event.server) is ServerEvent.ServicePublished -> servicePublished(event.server, event.key) } - }.collect() + }.launchIn(this) } } } @@ -89,6 +92,7 @@ class SimpleVirtProvisioningService( } val call = launch { + delay(1) this@SimpleVirtProvisioningService.call = null schedule() } @@ -103,6 +107,12 @@ class SimpleVirtProvisioningService( try { println("Spawning ${imageInstance.image}") incomingImages -= imageInstance + + // Speculatively update the hypervisor view information to prevent other images in the queue from + // deciding on stale values. + selectedHv.numberOfActiveServers++ + selectedHv.availableMemory -= (imageInstance.image as VmImage).requiredMemory // XXX Temporary hack + val server = selectedHv.driver.spawn( imageInstance.name, imageInstance.image, @@ -113,6 +123,9 @@ class SimpleVirtProvisioningService( activeImages += imageInstance } catch (e: InsufficientMemoryOnServerException) { println("Unable to deploy image due to insufficient memory") + + selectedHv.numberOfActiveServers-- + selectedHv.availableMemory += (imageInstance.image as VmImage).requiredMemory } } } @@ -141,6 +154,15 @@ class SimpleVirtProvisioningService( val hv = hypervisors[server] ?: return hv.driver = server.services[VirtDriver] availableHypervisors += hv + + hv.driver.events + .onEach { event -> + if (event is HypervisorEvent.VmsUpdated) { + hv.numberOfActiveServers = event.numberOfActiveServers + hv.availableMemory = event.availableMemory + } + }.launchIn(this) + requestCycle() } } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index 0fafc118..66b20bff 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -40,6 +40,7 @@ import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue import com.xenomachina.argparser.ArgParser import com.xenomachina.argparser.default +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay import kotlinx.coroutines.flow.collect @@ -86,6 +87,7 @@ class ExperimentParameters(parser: ArgParser) { /** * Main entry point of the experiment. */ +@OptIn(ExperimentalCoroutinesApi::class) fun main(args: Array<String>) { ArgParser(args).parseInto(::ExperimentParameters).run { val monitor = Sc20Monitor(outputFile) @@ -111,6 +113,10 @@ fun main(args: Array<String>) { println(simulationContext.clock.instant()) val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService.Key] + + // Wait for the bare metal nodes to be spawned + delay(10) + val scheduler = SimpleVirtProvisioningService( AvailableMemoryAllocationPolicy(), simulationContext, @@ -140,9 +146,9 @@ fun main(args: Array<String>) { iatScale = -1.39, iatShape = 1.03, sizeScale = 1.88, sizeShape = 1.25 ) - for (node in bareMetalProvisioner.nodes()) { + // for (node in bareMetalProvisioner.nodes()) { // faultInjector.enqueue(node.metadata["driver"] as FailureDomain) - } + // } } val reader = Sc20TraceReader(File(traceDirectory), performanceInterferenceModel, getSelectedVmList()) |
