diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-09-30 21:14:20 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-09-30 23:40:57 +0200 |
| commit | c41d201343263346ac84855a0b2254051ed33c21 (patch) | |
| tree | 9141a382f9e1b2d924e9a191e53cc6daa9107563 /simulator/opendc/opendc-compute/src | |
| parent | c543f55e961f9f7468e19c1c0f5f20566d07dfb5 (diff) | |
Eliminate use of Domain and simulationContext in OpenDC
This change takes the first step in eliminating the explict use of
Domain and simulationContext from OpenDC. In this way, we decouple the
logic of various datacenter services from simulation logic, which should
promote re-use.
Diffstat (limited to 'simulator/opendc/opendc-compute/src')
16 files changed, 237 insertions, 198 deletions
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt index fd0fc836..01968cd8 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt @@ -28,8 +28,8 @@ import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.core.resource.Resource import com.atlarge.opendc.core.resource.TagContainer import com.atlarge.opendc.core.services.ServiceRegistry -import java.util.UUID import kotlinx.coroutines.flow.Flow +import java.util.UUID /** * A server instance that is running on some physical or virtual machine. diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt index f770fa49..817bee4b 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt @@ -30,12 +30,18 @@ import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.core.services.ServiceKey import kotlinx.coroutines.selects.SelectClause0 import kotlinx.coroutines.selects.select +import java.time.Clock /** * Represents the execution context in which a bootable [Image] runs on a [Server]. */ public interface ServerContext { /** + * The virtual clock. + */ + public val clock: Clock + + /** * The server on which the image runs. */ public val server: Server diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt index c615d865..0e1af093 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt @@ -1,6 +1,5 @@ package com.atlarge.opendc.compute.core.image -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.execution.ServerContext import com.atlarge.opendc.core.resource.TagContainer import java.util.UUID @@ -16,8 +15,7 @@ class VmImage( ) : Image { override suspend fun invoke(ctx: ServerContext) { - val clock = simulationContext.clock - var offset = clock.millis() + var offset = ctx.clock.millis() val batch = flopsHistory.map { fragment -> val cores = min(fragment.cores, ctx.server.flavor.cpuCount) diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt index cb637aea..7cb4c0c5 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt @@ -27,8 +27,8 @@ package com.atlarge.opendc.compute.metal import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.core.Identity -import java.util.UUID import kotlinx.coroutines.flow.Flow +import java.util.UUID /** * A bare-metal compute node. diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt index 17d8ee53..41cec291 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt @@ -30,8 +30,8 @@ import com.atlarge.opendc.compute.metal.Node import com.atlarge.opendc.core.failure.FailureDomain import com.atlarge.opendc.core.power.Powerable import com.atlarge.opendc.core.services.AbstractServiceKey -import java.util.UUID import kotlinx.coroutines.flow.Flow +import java.util.UUID /** * A driver interface for the management interface of a bare-metal compute node. diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt index a453e459..c118cc3d 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt @@ -24,8 +24,6 @@ package com.atlarge.opendc.compute.metal.driver -import com.atlarge.odcsim.Domain -import com.atlarge.odcsim.SimulationContext import com.atlarge.odcsim.flow.EventFlow import com.atlarge.odcsim.flow.StateFlow import com.atlarge.opendc.compute.core.Flavor @@ -46,14 +44,6 @@ import com.atlarge.opendc.compute.metal.power.ConstantPowerModel import com.atlarge.opendc.core.power.PowerModel import com.atlarge.opendc.core.services.ServiceKey import com.atlarge.opendc.core.services.ServiceRegistry -import java.lang.Exception -import java.time.Clock -import java.util.UUID -import kotlin.coroutines.ContinuationInterceptor -import kotlin.math.ceil -import kotlin.math.max -import kotlin.math.min -import kotlin.random.Random import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Delay import kotlinx.coroutines.DisposableHandle @@ -67,12 +57,20 @@ import kotlinx.coroutines.intrinsics.startCoroutineCancellable import kotlinx.coroutines.launch import kotlinx.coroutines.selects.SelectClause0 import kotlinx.coroutines.selects.SelectInstance -import kotlinx.coroutines.withContext +import java.lang.Exception +import java.time.Clock +import java.util.UUID +import kotlin.coroutines.ContinuationInterceptor +import kotlin.math.ceil +import kotlin.math.max +import kotlin.math.min +import kotlin.random.Random /** * A basic implementation of the [BareMetalDriver] that simulates an [Image] running on a bare-metal machine. * - * @param domain The simulation domain the driver runs in. + * @param coroutineScope The [CoroutineScope] the driver runs in. + * @param clock The virtual clock to keep track of time. * @param uid The unique identifier of the machine. * @param name An optional name of the machine. * @param metadata The initial metadata of the node. @@ -82,7 +80,8 @@ import kotlinx.coroutines.withContext */ @OptIn(ExperimentalCoroutinesApi::class) public class SimpleBareMetalDriver( - private val domain: Domain, + private val coroutineScope: CoroutineScope, + private val clock: Clock, uid: UUID, name: String, metadata: Map<String, Any>, @@ -129,14 +128,14 @@ public class SimpleBareMetalDriver( */ private val random = Random(uid.leastSignificantBits xor uid.mostSignificantBits) - override suspend fun init(): Node = withContext(domain.coroutineContext) { - nodeState.value + override suspend fun init(): Node { + return nodeState.value } - override suspend fun start(): Node = withContext(domain.coroutineContext) { + override suspend fun start(): Node { val node = nodeState.value if (node.state != NodeState.SHUTOFF) { - return@withContext node + return node } val events = EventFlow<ServerEvent>() @@ -153,13 +152,13 @@ public class SimpleBareMetalDriver( setNode(node.copy(state = NodeState.BOOT, server = server)) serverContext = BareMetalServerContext(events) - return@withContext nodeState.value + return nodeState.value } - override suspend fun stop(): Node = withContext(domain.coroutineContext) { + override suspend fun stop(): Node { val node = nodeState.value if (node.state == NodeState.SHUTOFF) { - return@withContext node + return node } // We terminate the image running on the machine @@ -167,20 +166,20 @@ public class SimpleBareMetalDriver( serverContext = null setNode(node.copy(state = NodeState.SHUTOFF, server = null)) - return@withContext node + return node } - override suspend fun reboot(): Node = withContext(domain.coroutineContext) { + override suspend fun reboot(): Node { stop() - start() + return start() } - override suspend fun setImage(image: Image): Node = withContext(domain.coroutineContext) { + override suspend fun setImage(image: Image): Node { setNode(nodeState.value.copy(image = image)) - return@withContext nodeState.value + return nodeState.value } - override suspend fun refresh(): Node = withContext(domain.coroutineContext) { nodeState.value } + override suspend fun refresh(): Node = nodeState.value private fun setNode(value: Node) { val field = nodeState.value @@ -207,7 +206,10 @@ public class SimpleBareMetalDriver( override val server: Server get() = nodeState.value.server!! - private val job = domain.launch { + override val clock: Clock + get() = this@SimpleBareMetalDriver.clock + + private val job = coroutineScope.launch { delay(1) // TODO Introduce boot time init() try { @@ -265,18 +267,13 @@ public class SimpleBareMetalDriver( private var usageFlush: DisposableHandle? = null /** - * Cache the [Clock] for timing. - */ - private val clock = domain.coroutineContext[SimulationContext]!!.clock - - /** * Cache the [Delay] instance for timing. * * XXX We need to cache this before the call to [onRun] since doing this in [onRun] is too heavy. * XXX Note however that this is an ugly hack which may break in the future. */ @OptIn(InternalCoroutinesApi::class) - private val delay = domain.coroutineContext[ContinuationInterceptor] as Delay + private val delay = coroutineScope.coroutineContext[ContinuationInterceptor] as Delay @OptIn(InternalCoroutinesApi::class) override fun onRun( @@ -353,10 +350,10 @@ public class SimpleBareMetalDriver( currentDisposable?.dispose() // Schedule reset the usage of the machine since the call is returning - usageFlush = delay.invokeOnTimeout(1, Runnable { + usageFlush = delay.invokeOnTimeout(1) { usageState.value = 0.0 usageFlush = null - }) + } } select.disposeOnSelect(disposable) @@ -458,7 +455,7 @@ public class SimpleBareMetalDriver( } override val scope: CoroutineScope - get() = domain + get() = coroutineScope override suspend fun fail() { serverContext?.unavailable = true diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt index f6b236ae..f64f9b5a 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt @@ -24,44 +24,41 @@ package com.atlarge.opendc.compute.metal.service -import com.atlarge.odcsim.Domain import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.metal.Node import com.atlarge.opendc.compute.metal.driver.BareMetalDriver import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.withContext /** * A very basic implementation of the [ProvisioningService]. */ -public class SimpleProvisioningService(val domain: Domain) : ProvisioningService { +public class SimpleProvisioningService : ProvisioningService { /** * The active nodes in this service. */ private val nodes: MutableMap<Node, BareMetalDriver> = mutableMapOf() - override suspend fun create(driver: BareMetalDriver): Node = withContext(domain.coroutineContext) { + override suspend fun create(driver: BareMetalDriver): Node { val node = driver.init() nodes[node] = driver - return@withContext node + return node } - override suspend fun nodes(): Set<Node> = withContext(domain.coroutineContext) { nodes.keys } + override suspend fun nodes(): Set<Node> = nodes.keys - override suspend fun refresh(node: Node): Node = withContext(domain.coroutineContext) { - return@withContext nodes[node]!!.refresh() + override suspend fun refresh(node: Node): Node { + return nodes[node]!!.refresh() } - override suspend fun deploy(node: Node, image: Image): Node = withContext(domain.coroutineContext) { + override suspend fun deploy(node: Node, image: Image): Node { val driver = nodes[node]!! driver.setImage(image) - val newNode = driver.reboot() - return@withContext newNode + return driver.reboot() } - override suspend fun stop(node: Node): Node = withContext(domain.coroutineContext) { + override suspend fun stop(node: Node): Node { val driver = nodes[node]!! - try { + return try { driver.stop() } catch (e: CancellationException) { node diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt index 1e7e351f..69b0124d 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt @@ -25,8 +25,8 @@ package com.atlarge.opendc.compute.virt import com.atlarge.opendc.core.Identity -import java.util.UUID import kotlinx.coroutines.flow.Flow +import java.util.UUID /** * A hypervisor (or virtual machine monitor) is software or firmware that virtualizes the host compute environment diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt index 607759a8..bd395f0d 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt @@ -29,9 +29,9 @@ import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver import com.atlarge.opendc.compute.virt.driver.VirtDriver import com.atlarge.opendc.core.resource.TagContainer -import java.util.UUID import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.suspendCancellableCoroutine +import java.util.UUID /** * A hypervisor managing the VMs of a node. diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt index 192db413..df45f440 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt @@ -25,7 +25,6 @@ package com.atlarge.opendc.compute.virt.driver import com.atlarge.odcsim.flow.EventFlow -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.Server @@ -40,10 +39,6 @@ import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel import com.atlarge.opendc.compute.virt.HypervisorEvent import com.atlarge.opendc.core.services.ServiceKey import com.atlarge.opendc.core.services.ServiceRegistry -import java.util.UUID -import kotlin.math.ceil -import kotlin.math.max -import kotlin.math.min import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.DisposableHandle @@ -59,6 +54,17 @@ import kotlinx.coroutines.launch import kotlinx.coroutines.selects.SelectClause0 import kotlinx.coroutines.selects.SelectInstance import kotlinx.coroutines.selects.select +import mu.KotlinLogging +import java.time.Clock +import java.util.UUID +import kotlin.math.ceil +import kotlin.math.max +import kotlin.math.min + +/** + * The logging instance to use. + */ +private val logger = KotlinLogging.logger {} /** * A [VirtDriver] that is backed by a simple hypervisor implementation. @@ -97,7 +103,7 @@ class SimpleVirtDriver( scheduler() } catch (e: Exception) { if (e !is CancellationException) { - simulationContext.log.error("Hypervisor scheduler failed", e) + logger.error("Hypervisor scheduler failed", e) } throw e } @@ -117,8 +123,14 @@ class SimpleVirtDriver( val events = EventFlow<ServerEvent>() val server = Server( - UUID.randomUUID(), name, emptyMap(), flavor, image, ServerState.BUILD, - ServiceRegistry(), events + UUID.randomUUID(), + name, + emptyMap(), + flavor, + image, + ServerState.BUILD, + ServiceRegistry(), + events ) availableMemory -= requiredMemory vms.add(VmServerContext(server, events)) @@ -181,7 +193,7 @@ class SimpleVirtDriver( * The scheduling process of the hypervisor. */ private suspend fun scheduler() { - val clock = simulationContext.clock + val clock = hostContext.clock val maxUsage = hostContext.cpus.sumByDouble { it.frequency } val pCPUs = hostContext.cpus.indices.sortedBy { hostContext.cpus[it].frequency } @@ -561,6 +573,9 @@ class SimpleVirtDriver( override val cpus: List<ProcessingUnit> = hostContext.cpus.take(server.flavor.cpuCount) + override val clock: Clock + get() = hostContext.clock + init { vm = Vm(this) } diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt index b1844f67..1002d382 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt @@ -29,8 +29,8 @@ import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.virt.HypervisorEvent import com.atlarge.opendc.core.services.AbstractServiceKey -import java.util.UUID import kotlinx.coroutines.flow.Flow +import java.util.UUID /** * A driver interface for a hypervisor running on some host server and communicating with the central compute service to diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt index 79388bc3..6b2cfc40 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt @@ -1,8 +1,6 @@ package com.atlarge.opendc.compute.virt.service -import com.atlarge.odcsim.SimulationContext import com.atlarge.odcsim.flow.EventFlow -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerEvent @@ -16,29 +14,25 @@ import com.atlarge.opendc.compute.virt.driver.InsufficientMemoryOnServerExceptio import com.atlarge.opendc.compute.virt.driver.VirtDriver import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy import com.atlarge.opendc.core.services.ServiceKey -import kotlin.coroutines.Continuation -import kotlin.coroutines.resume -import kotlin.math.max -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.Job -import kotlinx.coroutines.delay +import kotlinx.coroutines.* import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.launch -import kotlinx.coroutines.suspendCancellableCoroutine -import kotlinx.coroutines.withContext import mu.KotlinLogging +import java.time.Clock +import kotlin.coroutines.Continuation +import kotlin.coroutines.resume +import kotlin.math.max private val logger = KotlinLogging.logger {} @OptIn(ExperimentalCoroutinesApi::class) class SimpleVirtProvisioningService( - public override val allocationPolicy: AllocationPolicy, - private val ctx: SimulationContext, - private val provisioningService: ProvisioningService -) : VirtProvisioningService, CoroutineScope by ctx.domain { + private val coroutineScope: CoroutineScope, + private val clock: Clock, + private val provisioningService: ProvisioningService, + override val allocationPolicy: AllocationPolicy +) : VirtProvisioningService { /** * The hypervisors that have been launched by the service. */ @@ -59,11 +53,11 @@ class SimpleVirtProvisioningService( */ private val activeImages: MutableSet<ImageView> = mutableSetOf() - public var submittedVms = 0 - public var queuedVms = 0 - public var runningVms = 0 - public var finishedVms = 0 - public var unscheduledVms = 0 + var submittedVms = 0 + var queuedVms = 0 + var runningVms = 0 + var finishedVms = 0 + var unscheduledVms = 0 private var maxCores = 0 private var maxMemory = 0L @@ -81,7 +75,7 @@ class SimpleVirtProvisioningService( override val events: Flow<VirtProvisioningEvent> = eventFlow init { - launch { + coroutineScope.launch { val provisionedNodes = provisioningService.nodes() provisionedNodes.forEach { node -> val hypervisorImage = HypervisorImage @@ -96,27 +90,29 @@ class SimpleVirtProvisioningService( } } - override suspend fun drivers(): Set<VirtDriver> = withContext(coroutineContext) { - availableHypervisors.map { it.driver }.toSet() + override suspend fun drivers(): Set<VirtDriver> { + return availableHypervisors.map { it.driver }.toSet() } override suspend fun deploy( name: String, image: Image, flavor: Flavor - ): Server = withContext(coroutineContext) { - eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( - this@SimpleVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, - ++submittedVms, - runningVms, - finishedVms, - ++queuedVms, - unscheduledVms - )) - - suspendCancellableCoroutine<Server> { cont -> + ): Server { + eventFlow.emit( + VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + ++submittedVms, + runningVms, + finishedVms, + ++queuedVms, + unscheduledVms + ) + ) + + return suspendCancellableCoroutine<Server> { cont -> val vmInstance = ImageView(name, image, flavor, cont) incomingImages += vmInstance requestCycle() @@ -139,9 +135,9 @@ class SimpleVirtProvisioningService( // We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120). // This is important because the slices of the VMs need to be aligned. // We calculate here the delay until the next scheduling slot. - val delay = quantum - (ctx.clock.millis() % quantum) + val delay = quantum - (clock.millis() % quantum) - val call = launch { + val call = coroutineScope.launch { delay(delay) this@SimpleVirtProvisioningService.call = null schedule() @@ -150,7 +146,6 @@ class SimpleVirtProvisioningService( } private suspend fun schedule() { - val clock = simulationContext.clock val imagesToBeScheduled = incomingImages.toSet() for (imageInstance in imagesToBeScheduled) { @@ -159,16 +154,18 @@ class SimpleVirtProvisioningService( if (selectedHv == null) { if (requiredMemory > maxMemory || imageInstance.flavor.cpuCount > maxCores) { - eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( - this@SimpleVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, - submittedVms, - runningVms, - finishedVms, - queuedVms, - ++unscheduledVms - )) + eventFlow.emit( + VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + runningVms, + finishedVms, + queuedVms, + ++unscheduledVms + ) + ) incomingImages -= imageInstance @@ -180,7 +177,7 @@ class SimpleVirtProvisioningService( } try { - logger.info { "[${ctx.clock.millis()}] Spawning ${imageInstance.image} on ${selectedHv.server.uid} ${selectedHv.server.name} ${selectedHv.server.flavor}" } + logger.info { "[${clock.millis()}] Spawning ${imageInstance.image} on ${selectedHv.server.uid} ${selectedHv.server.name} ${selectedHv.server.flavor}" } incomingImages -= imageInstance // Speculatively update the hypervisor view information to prevent other images in the queue from @@ -197,16 +194,18 @@ class SimpleVirtProvisioningService( imageInstance.server = server imageInstance.continuation.resume(server) - eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( - this@SimpleVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, - submittedVms, - ++runningVms, - finishedVms, - --queuedVms, - unscheduledVms - )) + eventFlow.emit( + VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + ++runningVms, + finishedVms, + --queuedVms, + unscheduledVms + ) + ) activeImages += imageInstance server.events @@ -214,18 +213,20 @@ class SimpleVirtProvisioningService( when (event) { is ServerEvent.StateChanged -> { if (event.server.state == ServerState.SHUTOFF) { - logger.info { "[${ctx.clock.millis()}] Server ${event.server.uid} ${event.server.name} ${event.server.flavor} finished." } - - eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( - this@SimpleVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, - submittedVms, - --runningVms, - ++finishedVms, - queuedVms, - unscheduledVms - )) + logger.info { "[${clock.millis()}] Server ${event.server.uid} ${event.server.name} ${event.server.flavor} finished." } + + eventFlow.emit( + VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + --runningVms, + ++finishedVms, + queuedVms, + unscheduledVms + ) + ) activeImages -= imageInstance selectedHv.provisionedCores -= server.flavor.cpuCount @@ -238,7 +239,7 @@ class SimpleVirtProvisioningService( } } } - .launchIn(this) + .launchIn(coroutineScope) } catch (e: InsufficientMemoryOnServerException) { logger.error("Failed to deploy VM", e) @@ -254,7 +255,7 @@ class SimpleVirtProvisioningService( private fun stateChanged(server: Server) { when (server.state) { ServerState.ACTIVE -> { - logger.debug { "[${ctx.clock.millis()}] Server ${server.uid} available: ${server.state}" } + logger.debug { "[${clock.millis()}] Server ${server.uid} available: ${server.state}" } if (server in hypervisors) { // Corner case for when the hypervisor already exists @@ -272,16 +273,18 @@ class SimpleVirtProvisioningService( hypervisors[server] = hv } - eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( - this@SimpleVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, - submittedVms, - runningVms, - finishedVms, - queuedVms, - unscheduledVms - )) + eventFlow.emit( + VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + runningVms, + finishedVms, + queuedVms, + unscheduledVms + ) + ) // Re-schedule on the new machine if (incomingImages.isNotEmpty()) { @@ -289,20 +292,22 @@ class SimpleVirtProvisioningService( } } ServerState.SHUTOFF, ServerState.ERROR -> { - logger.debug { "[${ctx.clock.millis()}] Server ${server.uid} unavailable: ${server.state}" } + logger.debug { "[${clock.millis()}] Server ${server.uid} unavailable: ${server.state}" } val hv = hypervisors[server] ?: return availableHypervisors -= hv - eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( - this@SimpleVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, - submittedVms, - runningVms, - finishedVms, - queuedVms, - unscheduledVms - )) + eventFlow.emit( + VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + runningVms, + finishedVms, + queuedVms, + unscheduledVms + ) + ) if (incomingImages.isNotEmpty()) { requestCycle() @@ -318,16 +323,18 @@ class SimpleVirtProvisioningService( hv.driver = server.services[VirtDriver] availableHypervisors += hv - eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( - this@SimpleVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, - submittedVms, - runningVms, - finishedVms, - queuedVms, - unscheduledVms - )) + eventFlow.emit( + VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + runningVms, + finishedVms, + queuedVms, + unscheduledVms + ) + ) hv.driver.events .onEach { event -> @@ -335,7 +342,7 @@ class SimpleVirtProvisioningService( hv.numberOfActiveServers = event.numberOfActiveServers hv.availableMemory = event.availableMemory } - }.launchIn(this) + }.launchIn(coroutineScope) requestCycle() } diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt index 1c7b751c..417db77d 100644 --- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt +++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt @@ -24,10 +24,10 @@ package com.atlarge.opendc.compute.core.image -import java.util.UUID import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows +import java.util.UUID /** * Test suite for [FlopsApplicationImage] diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt index af9d3421..80c9c547 100644 --- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt +++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt @@ -31,8 +31,6 @@ import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.ServerEvent import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.image.FlopsApplicationImage -import java.util.ServiceLoader -import java.util.UUID import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach @@ -41,6 +39,8 @@ import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withContext import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test +import java.util.ServiceLoader +import java.util.UUID internal class SimpleBareMetalDriverTest { /** @@ -56,7 +56,7 @@ internal class SimpleBareMetalDriverTest { val dom = root.newDomain(name = "driver") val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4) val cpus = List(4) { ProcessingUnit(cpuNode, it, 2400.0) } - val driver = SimpleBareMetalDriver(dom, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) + val driver = SimpleBareMetalDriver(dom, simulationContext.clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1_000, 2) // Batch driver commands diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt index ed2256c0..37cd5898 100644 --- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt +++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt @@ -25,17 +25,18 @@ package com.atlarge.opendc.compute.metal.service import com.atlarge.odcsim.SimulationEngineProvider +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.image.FlopsApplicationImage import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver -import java.util.ServiceLoader -import java.util.UUID import kotlinx.coroutines.delay import kotlinx.coroutines.flow.collect import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.Test +import java.util.ServiceLoader +import java.util.UUID /** * Test suite for the [SimpleProvisioningService]. @@ -50,14 +51,15 @@ internal class SimpleProvisioningServiceTest { val system = provider("sim") val root = system.newDomain(name = "root") root.launch { + val clock = simulationContext.clock val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1000, 2) val dom = root.newDomain("provisioner") val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4) val cpus = List(4) { ProcessingUnit(cpuNode, it, 2400.0) } - val driver = SimpleBareMetalDriver(dom.newDomain(), UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) + val driver = SimpleBareMetalDriver(dom.newDomain(), clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) - val provisioner = SimpleProvisioningService(dom) + val provisioner = SimpleProvisioningService() provisioner.create(driver) delay(5) val nodes = provisioner.nodes() diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt index 622b185e..528434b1 100644 --- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt +++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt @@ -25,6 +25,7 @@ package com.atlarge.opendc.compute.virt import com.atlarge.odcsim.SimulationEngineProvider +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.ProcessingUnit @@ -33,8 +34,6 @@ import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment import com.atlarge.opendc.compute.core.image.VmImage import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver import com.atlarge.opendc.compute.virt.driver.VirtDriver -import java.util.ServiceLoader -import java.util.UUID import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.delay import kotlinx.coroutines.flow.launchIn @@ -45,6 +44,8 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Disabled import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll +import java.util.ServiceLoader +import java.util.UUID /** * Basic test-suite for the hypervisor. @@ -62,6 +63,7 @@ internal class HypervisorTest { val root = system.newDomain("root") root.launch { + val clock = simulationContext.clock val vmm = HypervisorImage val workloadA = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1_000, 1) val workloadB = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 2_000, 1) @@ -70,7 +72,7 @@ internal class HypervisorTest { val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 1) val cpus = List(1) { ProcessingUnit(cpuNode, it, 2000.0) } - val metalDriver = SimpleBareMetalDriver(driverDom, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) + val metalDriver = SimpleBareMetalDriver(driverDom, clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) metalDriver.init() metalDriver.setImage(vmm) @@ -108,26 +110,41 @@ internal class HypervisorTest { var overcommissionedBurst = 0L root.launch { + val clock = simulationContext.clock val vmm = HypervisorImage val duration = 5 * 60L - val vmImageA = VmImage(UUID.randomUUID(), "<unnamed>", emptyMap(), sequenceOf( - FlopsHistoryFragment(0, 28L * duration, duration * 1000, 28.0, 2), - FlopsHistoryFragment(0, 3500L * duration, duration * 1000, 3500.0, 2), - FlopsHistoryFragment(0, 0, duration * 1000, 0.0, 2), - FlopsHistoryFragment(0, 183L * duration, duration * 1000, 183.0, 2) - ), 2, 0) - val vmImageB = VmImage(UUID.randomUUID(), "<unnamed>", emptyMap(), sequenceOf( - FlopsHistoryFragment(0, 28L * duration, duration * 1000, 28.0, 2), - FlopsHistoryFragment(0, 3100L * duration, duration * 1000, 3100.0, 2), - FlopsHistoryFragment(0, 0, duration * 1000, 0.0, 2), - FlopsHistoryFragment(0, 73L * duration, duration * 1000, 73.0, 2) - ), 2, 0) + val vmImageA = VmImage( + UUID.randomUUID(), + "<unnamed>", + emptyMap(), + sequenceOf( + FlopsHistoryFragment(0, 28L * duration, duration * 1000, 28.0, 2), + FlopsHistoryFragment(0, 3500L * duration, duration * 1000, 3500.0, 2), + FlopsHistoryFragment(0, 0, duration * 1000, 0.0, 2), + FlopsHistoryFragment(0, 183L * duration, duration * 1000, 183.0, 2) + ), + 2, + 0 + ) + val vmImageB = VmImage( + UUID.randomUUID(), + "<unnamed>", + emptyMap(), + sequenceOf( + FlopsHistoryFragment(0, 28L * duration, duration * 1000, 28.0, 2), + FlopsHistoryFragment(0, 3100L * duration, duration * 1000, 3100.0, 2), + FlopsHistoryFragment(0, 0, duration * 1000, 0.0, 2), + FlopsHistoryFragment(0, 73L * duration, duration * 1000, 73.0, 2) + ), + 2, + 0 + ) val driverDom = root.newDomain("driver") val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) val cpus = List(2) { ProcessingUnit(cpuNode, it, 3200.0) } - val metalDriver = SimpleBareMetalDriver(driverDom, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) + val metalDriver = SimpleBareMetalDriver(driverDom, clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) metalDriver.init() metalDriver.setImage(vmm) |
