diff options
Diffstat (limited to 'simulator/opendc/opendc-compute/src')
16 files changed, 237 insertions, 198 deletions
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt index fd0fc836..01968cd8 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt @@ -28,8 +28,8 @@ import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.core.resource.Resource import com.atlarge.opendc.core.resource.TagContainer import com.atlarge.opendc.core.services.ServiceRegistry -import java.util.UUID import kotlinx.coroutines.flow.Flow +import java.util.UUID /** * A server instance that is running on some physical or virtual machine. diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt index f770fa49..817bee4b 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt @@ -30,12 +30,18 @@ import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.core.services.ServiceKey import kotlinx.coroutines.selects.SelectClause0 import kotlinx.coroutines.selects.select +import java.time.Clock /** * Represents the execution context in which a bootable [Image] runs on a [Server]. */ public interface ServerContext { /** + * The virtual clock. + */ + public val clock: Clock + + /** * The server on which the image runs. */ public val server: Server diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt index c615d865..0e1af093 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt @@ -1,6 +1,5 @@ package com.atlarge.opendc.compute.core.image -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.execution.ServerContext import com.atlarge.opendc.core.resource.TagContainer import java.util.UUID @@ -16,8 +15,7 @@ class VmImage( ) : Image { override suspend fun invoke(ctx: ServerContext) { - val clock = simulationContext.clock - var offset = clock.millis() + var offset = ctx.clock.millis() val batch = flopsHistory.map { fragment -> val cores = min(fragment.cores, ctx.server.flavor.cpuCount) diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt index cb637aea..7cb4c0c5 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt @@ -27,8 +27,8 @@ package com.atlarge.opendc.compute.metal import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.core.Identity -import java.util.UUID import kotlinx.coroutines.flow.Flow +import java.util.UUID /** * A bare-metal compute node. diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt index 17d8ee53..41cec291 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt @@ -30,8 +30,8 @@ import com.atlarge.opendc.compute.metal.Node import com.atlarge.opendc.core.failure.FailureDomain import com.atlarge.opendc.core.power.Powerable import com.atlarge.opendc.core.services.AbstractServiceKey -import java.util.UUID import kotlinx.coroutines.flow.Flow +import java.util.UUID /** * A driver interface for the management interface of a bare-metal compute node. diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt index a453e459..c118cc3d 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt @@ -24,8 +24,6 @@ package com.atlarge.opendc.compute.metal.driver -import com.atlarge.odcsim.Domain -import com.atlarge.odcsim.SimulationContext import com.atlarge.odcsim.flow.EventFlow import com.atlarge.odcsim.flow.StateFlow import com.atlarge.opendc.compute.core.Flavor @@ -46,14 +44,6 @@ import com.atlarge.opendc.compute.metal.power.ConstantPowerModel import com.atlarge.opendc.core.power.PowerModel import com.atlarge.opendc.core.services.ServiceKey import com.atlarge.opendc.core.services.ServiceRegistry -import java.lang.Exception -import java.time.Clock -import java.util.UUID -import kotlin.coroutines.ContinuationInterceptor -import kotlin.math.ceil -import kotlin.math.max -import kotlin.math.min -import kotlin.random.Random import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Delay import kotlinx.coroutines.DisposableHandle @@ -67,12 +57,20 @@ import kotlinx.coroutines.intrinsics.startCoroutineCancellable import kotlinx.coroutines.launch import kotlinx.coroutines.selects.SelectClause0 import kotlinx.coroutines.selects.SelectInstance -import kotlinx.coroutines.withContext +import java.lang.Exception +import java.time.Clock +import java.util.UUID +import kotlin.coroutines.ContinuationInterceptor +import kotlin.math.ceil +import kotlin.math.max +import kotlin.math.min +import kotlin.random.Random /** * A basic implementation of the [BareMetalDriver] that simulates an [Image] running on a bare-metal machine. * - * @param domain The simulation domain the driver runs in. + * @param coroutineScope The [CoroutineScope] the driver runs in. + * @param clock The virtual clock to keep track of time. * @param uid The unique identifier of the machine. * @param name An optional name of the machine. * @param metadata The initial metadata of the node. @@ -82,7 +80,8 @@ import kotlinx.coroutines.withContext */ @OptIn(ExperimentalCoroutinesApi::class) public class SimpleBareMetalDriver( - private val domain: Domain, + private val coroutineScope: CoroutineScope, + private val clock: Clock, uid: UUID, name: String, metadata: Map<String, Any>, @@ -129,14 +128,14 @@ public class SimpleBareMetalDriver( */ private val random = Random(uid.leastSignificantBits xor uid.mostSignificantBits) - override suspend fun init(): Node = withContext(domain.coroutineContext) { - nodeState.value + override suspend fun init(): Node { + return nodeState.value } - override suspend fun start(): Node = withContext(domain.coroutineContext) { + override suspend fun start(): Node { val node = nodeState.value if (node.state != NodeState.SHUTOFF) { - return@withContext node + return node } val events = EventFlow<ServerEvent>() @@ -153,13 +152,13 @@ public class SimpleBareMetalDriver( setNode(node.copy(state = NodeState.BOOT, server = server)) serverContext = BareMetalServerContext(events) - return@withContext nodeState.value + return nodeState.value } - override suspend fun stop(): Node = withContext(domain.coroutineContext) { + override suspend fun stop(): Node { val node = nodeState.value if (node.state == NodeState.SHUTOFF) { - return@withContext node + return node } // We terminate the image running on the machine @@ -167,20 +166,20 @@ public class SimpleBareMetalDriver( serverContext = null setNode(node.copy(state = NodeState.SHUTOFF, server = null)) - return@withContext node + return node } - override suspend fun reboot(): Node = withContext(domain.coroutineContext) { + override suspend fun reboot(): Node { stop() - start() + return start() } - override suspend fun setImage(image: Image): Node = withContext(domain.coroutineContext) { + override suspend fun setImage(image: Image): Node { setNode(nodeState.value.copy(image = image)) - return@withContext nodeState.value + return nodeState.value } - override suspend fun refresh(): Node = withContext(domain.coroutineContext) { nodeState.value } + override suspend fun refresh(): Node = nodeState.value private fun setNode(value: Node) { val field = nodeState.value @@ -207,7 +206,10 @@ public class SimpleBareMetalDriver( override val server: Server get() = nodeState.value.server!! - private val job = domain.launch { + override val clock: Clock + get() = this@SimpleBareMetalDriver.clock + + private val job = coroutineScope.launch { delay(1) // TODO Introduce boot time init() try { @@ -265,18 +267,13 @@ public class SimpleBareMetalDriver( private var usageFlush: DisposableHandle? = null /** - * Cache the [Clock] for timing. - */ - private val clock = domain.coroutineContext[SimulationContext]!!.clock - - /** * Cache the [Delay] instance for timing. * * XXX We need to cache this before the call to [onRun] since doing this in [onRun] is too heavy. * XXX Note however that this is an ugly hack which may break in the future. */ @OptIn(InternalCoroutinesApi::class) - private val delay = domain.coroutineContext[ContinuationInterceptor] as Delay + private val delay = coroutineScope.coroutineContext[ContinuationInterceptor] as Delay @OptIn(InternalCoroutinesApi::class) override fun onRun( @@ -353,10 +350,10 @@ public class SimpleBareMetalDriver( currentDisposable?.dispose() // Schedule reset the usage of the machine since the call is returning - usageFlush = delay.invokeOnTimeout(1, Runnable { + usageFlush = delay.invokeOnTimeout(1) { usageState.value = 0.0 usageFlush = null - }) + } } select.disposeOnSelect(disposable) @@ -458,7 +455,7 @@ public class SimpleBareMetalDriver( } override val scope: CoroutineScope - get() = domain + get() = coroutineScope override suspend fun fail() { serverContext?.unavailable = true diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt index f6b236ae..f64f9b5a 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt @@ -24,44 +24,41 @@ package com.atlarge.opendc.compute.metal.service -import com.atlarge.odcsim.Domain import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.metal.Node import com.atlarge.opendc.compute.metal.driver.BareMetalDriver import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.withContext /** * A very basic implementation of the [ProvisioningService]. */ -public class SimpleProvisioningService(val domain: Domain) : ProvisioningService { +public class SimpleProvisioningService : ProvisioningService { /** * The active nodes in this service. */ private val nodes: MutableMap<Node, BareMetalDriver> = mutableMapOf() - override suspend fun create(driver: BareMetalDriver): Node = withContext(domain.coroutineContext) { + override suspend fun create(driver: BareMetalDriver): Node { val node = driver.init() nodes[node] = driver - return@withContext node + return node } - override suspend fun nodes(): Set<Node> = withContext(domain.coroutineContext) { nodes.keys } + override suspend fun nodes(): Set<Node> = nodes.keys - override suspend fun refresh(node: Node): Node = withContext(domain.coroutineContext) { - return@withContext nodes[node]!!.refresh() + override suspend fun refresh(node: Node): Node { + return nodes[node]!!.refresh() } - override suspend fun deploy(node: Node, image: Image): Node = withContext(domain.coroutineContext) { + override suspend fun deploy(node: Node, image: Image): Node { val driver = nodes[node]!! driver.setImage(image) - val newNode = driver.reboot() - return@withContext newNode + return driver.reboot() } - override suspend fun stop(node: Node): Node = withContext(domain.coroutineContext) { + override suspend fun stop(node: Node): Node { val driver = nodes[node]!! - try { + return try { driver.stop() } catch (e: CancellationException) { node diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt index 1e7e351f..69b0124d 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt @@ -25,8 +25,8 @@ package com.atlarge.opendc.compute.virt import com.atlarge.opendc.core.Identity -import java.util.UUID import kotlinx.coroutines.flow.Flow +import java.util.UUID /** * A hypervisor (or virtual machine monitor) is software or firmware that virtualizes the host compute environment diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt index 607759a8..bd395f0d 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt @@ -29,9 +29,9 @@ import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver import com.atlarge.opendc.compute.virt.driver.VirtDriver import com.atlarge.opendc.core.resource.TagContainer -import java.util.UUID import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.suspendCancellableCoroutine +import java.util.UUID /** * A hypervisor managing the VMs of a node. diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt index 192db413..df45f440 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt @@ -25,7 +25,6 @@ package com.atlarge.opendc.compute.virt.driver import com.atlarge.odcsim.flow.EventFlow -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.Server @@ -40,10 +39,6 @@ import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel import com.atlarge.opendc.compute.virt.HypervisorEvent import com.atlarge.opendc.core.services.ServiceKey import com.atlarge.opendc.core.services.ServiceRegistry -import java.util.UUID -import kotlin.math.ceil -import kotlin.math.max -import kotlin.math.min import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.DisposableHandle @@ -59,6 +54,17 @@ import kotlinx.coroutines.launch import kotlinx.coroutines.selects.SelectClause0 import kotlinx.coroutines.selects.SelectInstance import kotlinx.coroutines.selects.select +import mu.KotlinLogging +import java.time.Clock +import java.util.UUID +import kotlin.math.ceil +import kotlin.math.max +import kotlin.math.min + +/** + * The logging instance to use. + */ +private val logger = KotlinLogging.logger {} /** * A [VirtDriver] that is backed by a simple hypervisor implementation. @@ -97,7 +103,7 @@ class SimpleVirtDriver( scheduler() } catch (e: Exception) { if (e !is CancellationException) { - simulationContext.log.error("Hypervisor scheduler failed", e) + logger.error("Hypervisor scheduler failed", e) } throw e } @@ -117,8 +123,14 @@ class SimpleVirtDriver( val events = EventFlow<ServerEvent>() val server = Server( - UUID.randomUUID(), name, emptyMap(), flavor, image, ServerState.BUILD, - ServiceRegistry(), events + UUID.randomUUID(), + name, + emptyMap(), + flavor, + image, + ServerState.BUILD, + ServiceRegistry(), + events ) availableMemory -= requiredMemory vms.add(VmServerContext(server, events)) @@ -181,7 +193,7 @@ class SimpleVirtDriver( * The scheduling process of the hypervisor. */ private suspend fun scheduler() { - val clock = simulationContext.clock + val clock = hostContext.clock val maxUsage = hostContext.cpus.sumByDouble { it.frequency } val pCPUs = hostContext.cpus.indices.sortedBy { hostContext.cpus[it].frequency } @@ -561,6 +573,9 @@ class SimpleVirtDriver( override val cpus: List<ProcessingUnit> = hostContext.cpus.take(server.flavor.cpuCount) + override val clock: Clock + get() = hostContext.clock + init { vm = Vm(this) } diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt index b1844f67..1002d382 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt @@ -29,8 +29,8 @@ import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.virt.HypervisorEvent import com.atlarge.opendc.core.services.AbstractServiceKey -import java.util.UUID import kotlinx.coroutines.flow.Flow +import java.util.UUID /** * A driver interface for a hypervisor running on some host server and communicating with the central compute service to diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt index 79388bc3..6b2cfc40 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt @@ -1,8 +1,6 @@ package com.atlarge.opendc.compute.virt.service -import com.atlarge.odcsim.SimulationContext import com.atlarge.odcsim.flow.EventFlow -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerEvent @@ -16,29 +14,25 @@ import com.atlarge.opendc.compute.virt.driver.InsufficientMemoryOnServerExceptio import com.atlarge.opendc.compute.virt.driver.VirtDriver import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy import com.atlarge.opendc.core.services.ServiceKey -import kotlin.coroutines.Continuation -import kotlin.coroutines.resume -import kotlin.math.max -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.Job -import kotlinx.coroutines.delay +import kotlinx.coroutines.* import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.launch -import kotlinx.coroutines.suspendCancellableCoroutine -import kotlinx.coroutines.withContext import mu.KotlinLogging +import java.time.Clock +import kotlin.coroutines.Continuation +import kotlin.coroutines.resume +import kotlin.math.max private val logger = KotlinLogging.logger {} @OptIn(ExperimentalCoroutinesApi::class) class SimpleVirtProvisioningService( - public override val allocationPolicy: AllocationPolicy, - private val ctx: SimulationContext, - private val provisioningService: ProvisioningService -) : VirtProvisioningService, CoroutineScope by ctx.domain { + private val coroutineScope: CoroutineScope, + private val clock: Clock, + private val provisioningService: ProvisioningService, + override val allocationPolicy: AllocationPolicy +) : VirtProvisioningService { /** * The hypervisors that have been launched by the service. */ @@ -59,11 +53,11 @@ class SimpleVirtProvisioningService( */ private val activeImages: MutableSet<ImageView> = mutableSetOf() - public var submittedVms = 0 - public var queuedVms = 0 - public var runningVms = 0 - public var finishedVms = 0 - public var unscheduledVms = 0 + var submittedVms = 0 + var queuedVms = 0 + var runningVms = 0 + var finishedVms = 0 + var unscheduledVms = 0 private var maxCores = 0 private var maxMemory = 0L @@ -81,7 +75,7 @@ class SimpleVirtProvisioningService( override val events: Flow<VirtProvisioningEvent> = eventFlow init { - launch { + coroutineScope.launch { val provisionedNodes = provisioningService.nodes() provisionedNodes.forEach { node -> val hypervisorImage = HypervisorImage @@ -96,27 +90,29 @@ class SimpleVirtProvisioningService( } } - override suspend fun drivers(): Set<VirtDriver> = withContext(coroutineContext) { - availableHypervisors.map { it.driver }.toSet() + override suspend fun drivers(): Set<VirtDriver> { + return availableHypervisors.map { it.driver }.toSet() } override suspend fun deploy( name: String, image: Image, flavor: Flavor - ): Server = withContext(coroutineContext) { - eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( - this@SimpleVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, - ++submittedVms, - runningVms, - finishedVms, - ++queuedVms, - unscheduledVms - )) - - suspendCancellableCoroutine<Server> { cont -> + ): Server { + eventFlow.emit( + VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + ++submittedVms, + runningVms, + finishedVms, + ++queuedVms, + unscheduledVms + ) + ) + + return suspendCancellableCoroutine<Server> { cont -> val vmInstance = ImageView(name, image, flavor, cont) incomingImages += vmInstance requestCycle() @@ -139,9 +135,9 @@ class SimpleVirtProvisioningService( // We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120). // This is important because the slices of the VMs need to be aligned. // We calculate here the delay until the next scheduling slot. - val delay = quantum - (ctx.clock.millis() % quantum) + val delay = quantum - (clock.millis() % quantum) - val call = launch { + val call = coroutineScope.launch { delay(delay) this@SimpleVirtProvisioningService.call = null schedule() @@ -150,7 +146,6 @@ class SimpleVirtProvisioningService( } private suspend fun schedule() { - val clock = simulationContext.clock val imagesToBeScheduled = incomingImages.toSet() for (imageInstance in imagesToBeScheduled) { @@ -159,16 +154,18 @@ class SimpleVirtProvisioningService( if (selectedHv == null) { if (requiredMemory > maxMemory || imageInstance.flavor.cpuCount > maxCores) { - eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( - this@SimpleVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, - submittedVms, - runningVms, - finishedVms, - queuedVms, - ++unscheduledVms - )) + eventFlow.emit( + VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + runningVms, + finishedVms, + queuedVms, + ++unscheduledVms + ) + ) incomingImages -= imageInstance @@ -180,7 +177,7 @@ class SimpleVirtProvisioningService( } try { - logger.info { "[${ctx.clock.millis()}] Spawning ${imageInstance.image} on ${selectedHv.server.uid} ${selectedHv.server.name} ${selectedHv.server.flavor}" } + logger.info { "[${clock.millis()}] Spawning ${imageInstance.image} on ${selectedHv.server.uid} ${selectedHv.server.name} ${selectedHv.server.flavor}" } incomingImages -= imageInstance // Speculatively update the hypervisor view information to prevent other images in the queue from @@ -197,16 +194,18 @@ class SimpleVirtProvisioningService( imageInstance.server = server imageInstance.continuation.resume(server) - eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( - this@SimpleVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, - submittedVms, - ++runningVms, - finishedVms, - --queuedVms, - unscheduledVms - )) + eventFlow.emit( + VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + ++runningVms, + finishedVms, + --queuedVms, + unscheduledVms + ) + ) activeImages += imageInstance server.events @@ -214,18 +213,20 @@ class SimpleVirtProvisioningService( when (event) { is ServerEvent.StateChanged -> { if (event.server.state == ServerState.SHUTOFF) { - logger.info { "[${ctx.clock.millis()}] Server ${event.server.uid} ${event.server.name} ${event.server.flavor} finished." } - - eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( - this@SimpleVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, - submittedVms, - --runningVms, - ++finishedVms, - queuedVms, - unscheduledVms - )) + logger.info { "[${clock.millis()}] Server ${event.server.uid} ${event.server.name} ${event.server.flavor} finished." } + + eventFlow.emit( + VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + --runningVms, + ++finishedVms, + queuedVms, + unscheduledVms + ) + ) activeImages -= imageInstance selectedHv.provisionedCores -= server.flavor.cpuCount @@ -238,7 +239,7 @@ class SimpleVirtProvisioningService( } } } - .launchIn(this) + .launchIn(coroutineScope) } catch (e: InsufficientMemoryOnServerException) { logger.error("Failed to deploy VM", e) @@ -254,7 +255,7 @@ class SimpleVirtProvisioningService( private fun stateChanged(server: Server) { when (server.state) { ServerState.ACTIVE -> { - logger.debug { "[${ctx.clock.millis()}] Server ${server.uid} available: ${server.state}" } + logger.debug { "[${clock.millis()}] Server ${server.uid} available: ${server.state}" } if (server in hypervisors) { // Corner case for when the hypervisor already exists @@ -272,16 +273,18 @@ class SimpleVirtProvisioningService( hypervisors[server] = hv } - eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( - this@SimpleVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, - submittedVms, - runningVms, - finishedVms, - queuedVms, - unscheduledVms - )) + eventFlow.emit( + VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + runningVms, + finishedVms, + queuedVms, + unscheduledVms + ) + ) // Re-schedule on the new machine if (incomingImages.isNotEmpty()) { @@ -289,20 +292,22 @@ class SimpleVirtProvisioningService( } } ServerState.SHUTOFF, ServerState.ERROR -> { - logger.debug { "[${ctx.clock.millis()}] Server ${server.uid} unavailable: ${server.state}" } + logger.debug { "[${clock.millis()}] Server ${server.uid} unavailable: ${server.state}" } val hv = hypervisors[server] ?: return availableHypervisors -= hv - eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( - this@SimpleVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, - submittedVms, - runningVms, - finishedVms, - queuedVms, - unscheduledVms - )) + eventFlow.emit( + VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + runningVms, + finishedVms, + queuedVms, + unscheduledVms + ) + ) if (incomingImages.isNotEmpty()) { requestCycle() @@ -318,16 +323,18 @@ class SimpleVirtProvisioningService( hv.driver = server.services[VirtDriver] availableHypervisors += hv - eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( - this@SimpleVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, - submittedVms, - runningVms, - finishedVms, - queuedVms, - unscheduledVms - )) + eventFlow.emit( + VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + runningVms, + finishedVms, + queuedVms, + unscheduledVms + ) + ) hv.driver.events .onEach { event -> @@ -335,7 +342,7 @@ class SimpleVirtProvisioningService( hv.numberOfActiveServers = event.numberOfActiveServers hv.availableMemory = event.availableMemory } - }.launchIn(this) + }.launchIn(coroutineScope) requestCycle() } diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt index 1c7b751c..417db77d 100644 --- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt +++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt @@ -24,10 +24,10 @@ package com.atlarge.opendc.compute.core.image -import java.util.UUID import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows +import java.util.UUID /** * Test suite for [FlopsApplicationImage] diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt index af9d3421..80c9c547 100644 --- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt +++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt @@ -31,8 +31,6 @@ import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.ServerEvent import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.image.FlopsApplicationImage -import java.util.ServiceLoader -import java.util.UUID import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach @@ -41,6 +39,8 @@ import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withContext import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test +import java.util.ServiceLoader +import java.util.UUID internal class SimpleBareMetalDriverTest { /** @@ -56,7 +56,7 @@ internal class SimpleBareMetalDriverTest { val dom = root.newDomain(name = "driver") val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4) val cpus = List(4) { ProcessingUnit(cpuNode, it, 2400.0) } - val driver = SimpleBareMetalDriver(dom, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) + val driver = SimpleBareMetalDriver(dom, simulationContext.clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1_000, 2) // Batch driver commands diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt index ed2256c0..37cd5898 100644 --- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt +++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt @@ -25,17 +25,18 @@ package com.atlarge.opendc.compute.metal.service import com.atlarge.odcsim.SimulationEngineProvider +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.image.FlopsApplicationImage import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver -import java.util.ServiceLoader -import java.util.UUID import kotlinx.coroutines.delay import kotlinx.coroutines.flow.collect import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.Test +import java.util.ServiceLoader +import java.util.UUID /** * Test suite for the [SimpleProvisioningService]. @@ -50,14 +51,15 @@ internal class SimpleProvisioningServiceTest { val system = provider("sim") val root = system.newDomain(name = "root") root.launch { + val clock = simulationContext.clock val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1000, 2) val dom = root.newDomain("provisioner") val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4) val cpus = List(4) { ProcessingUnit(cpuNode, it, 2400.0) } - val driver = SimpleBareMetalDriver(dom.newDomain(), UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) + val driver = SimpleBareMetalDriver(dom.newDomain(), clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) - val provisioner = SimpleProvisioningService(dom) + val provisioner = SimpleProvisioningService() provisioner.create(driver) delay(5) val nodes = provisioner.nodes() diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt index 622b185e..528434b1 100644 --- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt +++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt @@ -25,6 +25,7 @@ package com.atlarge.opendc.compute.virt import com.atlarge.odcsim.SimulationEngineProvider +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.ProcessingUnit @@ -33,8 +34,6 @@ import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment import com.atlarge.opendc.compute.core.image.VmImage import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver import com.atlarge.opendc.compute.virt.driver.VirtDriver -import java.util.ServiceLoader -import java.util.UUID import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.delay import kotlinx.coroutines.flow.launchIn @@ -45,6 +44,8 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Disabled import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll +import java.util.ServiceLoader +import java.util.UUID /** * Basic test-suite for the hypervisor. @@ -62,6 +63,7 @@ internal class HypervisorTest { val root = system.newDomain("root") root.launch { + val clock = simulationContext.clock val vmm = HypervisorImage val workloadA = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1_000, 1) val workloadB = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 2_000, 1) @@ -70,7 +72,7 @@ internal class HypervisorTest { val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 1) val cpus = List(1) { ProcessingUnit(cpuNode, it, 2000.0) } - val metalDriver = SimpleBareMetalDriver(driverDom, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) + val metalDriver = SimpleBareMetalDriver(driverDom, clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) metalDriver.init() metalDriver.setImage(vmm) @@ -108,26 +110,41 @@ internal class HypervisorTest { var overcommissionedBurst = 0L root.launch { + val clock = simulationContext.clock val vmm = HypervisorImage val duration = 5 * 60L - val vmImageA = VmImage(UUID.randomUUID(), "<unnamed>", emptyMap(), sequenceOf( - FlopsHistoryFragment(0, 28L * duration, duration * 1000, 28.0, 2), - FlopsHistoryFragment(0, 3500L * duration, duration * 1000, 3500.0, 2), - FlopsHistoryFragment(0, 0, duration * 1000, 0.0, 2), - FlopsHistoryFragment(0, 183L * duration, duration * 1000, 183.0, 2) - ), 2, 0) - val vmImageB = VmImage(UUID.randomUUID(), "<unnamed>", emptyMap(), sequenceOf( - FlopsHistoryFragment(0, 28L * duration, duration * 1000, 28.0, 2), - FlopsHistoryFragment(0, 3100L * duration, duration * 1000, 3100.0, 2), - FlopsHistoryFragment(0, 0, duration * 1000, 0.0, 2), - FlopsHistoryFragment(0, 73L * duration, duration * 1000, 73.0, 2) - ), 2, 0) + val vmImageA = VmImage( + UUID.randomUUID(), + "<unnamed>", + emptyMap(), + sequenceOf( + FlopsHistoryFragment(0, 28L * duration, duration * 1000, 28.0, 2), + FlopsHistoryFragment(0, 3500L * duration, duration * 1000, 3500.0, 2), + FlopsHistoryFragment(0, 0, duration * 1000, 0.0, 2), + FlopsHistoryFragment(0, 183L * duration, duration * 1000, 183.0, 2) + ), + 2, + 0 + ) + val vmImageB = VmImage( + UUID.randomUUID(), + "<unnamed>", + emptyMap(), + sequenceOf( + FlopsHistoryFragment(0, 28L * duration, duration * 1000, 28.0, 2), + FlopsHistoryFragment(0, 3100L * duration, duration * 1000, 3100.0, 2), + FlopsHistoryFragment(0, 0, duration * 1000, 0.0, 2), + FlopsHistoryFragment(0, 73L * duration, duration * 1000, 73.0, 2) + ), + 2, + 0 + ) val driverDom = root.newDomain("driver") val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) val cpus = List(2) { ProcessingUnit(cpuNode, it, 3200.0) } - val metalDriver = SimpleBareMetalDriver(driverDom, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) + val metalDriver = SimpleBareMetalDriver(driverDom, clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) metalDriver.init() metalDriver.setImage(vmm) |
