summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt26
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt10
2 files changed, 32 insertions, 4 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
index fb874e22..156521db 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
@@ -6,7 +6,9 @@ import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.ServerEvent
import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.core.image.Image
+import com.atlarge.opendc.compute.core.image.VmImage
import com.atlarge.opendc.compute.metal.service.ProvisioningService
+import com.atlarge.opendc.compute.virt.HypervisorEvent
import com.atlarge.opendc.compute.virt.driver.VirtDriver
import com.atlarge.opendc.compute.virt.HypervisorImage
import com.atlarge.opendc.compute.virt.driver.InsufficientMemoryOnServerException
@@ -15,7 +17,8 @@ import com.atlarge.opendc.core.services.ServiceKey
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
-import kotlinx.coroutines.flow.collect
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.suspendCancellableCoroutine
@@ -60,7 +63,7 @@ class SimpleVirtProvisioningService(
is ServerEvent.StateChanged -> stateChanged(event.server)
is ServerEvent.ServicePublished -> servicePublished(event.server, event.key)
}
- }.collect()
+ }.launchIn(this)
}
}
}
@@ -89,6 +92,7 @@ class SimpleVirtProvisioningService(
}
val call = launch {
+ delay(1)
this@SimpleVirtProvisioningService.call = null
schedule()
}
@@ -103,6 +107,12 @@ class SimpleVirtProvisioningService(
try {
println("Spawning ${imageInstance.image}")
incomingImages -= imageInstance
+
+ // Speculatively update the hypervisor view information to prevent other images in the queue from
+ // deciding on stale values.
+ selectedHv.numberOfActiveServers++
+ selectedHv.availableMemory -= (imageInstance.image as VmImage).requiredMemory // XXX Temporary hack
+
val server = selectedHv.driver.spawn(
imageInstance.name,
imageInstance.image,
@@ -113,6 +123,9 @@ class SimpleVirtProvisioningService(
activeImages += imageInstance
} catch (e: InsufficientMemoryOnServerException) {
println("Unable to deploy image due to insufficient memory")
+
+ selectedHv.numberOfActiveServers--
+ selectedHv.availableMemory += (imageInstance.image as VmImage).requiredMemory
}
}
}
@@ -141,6 +154,15 @@ class SimpleVirtProvisioningService(
val hv = hypervisors[server] ?: return
hv.driver = server.services[VirtDriver]
availableHypervisors += hv
+
+ hv.driver.events
+ .onEach { event ->
+ if (event is HypervisorEvent.VmsUpdated) {
+ hv.numberOfActiveServers = event.numberOfActiveServers
+ hv.availableMemory = event.availableMemory
+ }
+ }.launchIn(this)
+
requestCycle()
}
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
index 0fafc118..66b20bff 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
@@ -40,6 +40,7 @@ import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import com.xenomachina.argparser.ArgParser
import com.xenomachina.argparser.default
+import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
@@ -86,6 +87,7 @@ class ExperimentParameters(parser: ArgParser) {
/**
* Main entry point of the experiment.
*/
+@OptIn(ExperimentalCoroutinesApi::class)
fun main(args: Array<String>) {
ArgParser(args).parseInto(::ExperimentParameters).run {
val monitor = Sc20Monitor(outputFile)
@@ -111,6 +113,10 @@ fun main(args: Array<String>) {
println(simulationContext.clock.instant())
val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService.Key]
+
+ // Wait for the bare metal nodes to be spawned
+ delay(10)
+
val scheduler = SimpleVirtProvisioningService(
AvailableMemoryAllocationPolicy(),
simulationContext,
@@ -140,9 +146,9 @@ fun main(args: Array<String>) {
iatScale = -1.39, iatShape = 1.03,
sizeScale = 1.88, sizeShape = 1.25
)
- for (node in bareMetalProvisioner.nodes()) {
+ // for (node in bareMetalProvisioner.nodes()) {
// faultInjector.enqueue(node.metadata["driver"] as FailureDomain)
- }
+ // }
}
val reader = Sc20TraceReader(File(traceDirectory), performanceInterferenceModel, getSelectedVmList())