summaryrefslogtreecommitdiff
path: root/simulator/opendc/opendc-compute/src
diff options
context:
space:
mode:
Diffstat (limited to 'simulator/opendc/opendc-compute/src')
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt2
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt6
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt4
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt2
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt2
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt69
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt23
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt2
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt2
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt33
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt2
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt221
-rw-r--r--simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt2
-rw-r--r--simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt6
-rw-r--r--simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt10
-rw-r--r--simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt49
16 files changed, 237 insertions, 198 deletions
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt
index fd0fc836..01968cd8 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt
@@ -28,8 +28,8 @@ import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.core.resource.Resource
import com.atlarge.opendc.core.resource.TagContainer
import com.atlarge.opendc.core.services.ServiceRegistry
-import java.util.UUID
import kotlinx.coroutines.flow.Flow
+import java.util.UUID
/**
* A server instance that is running on some physical or virtual machine.
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt
index f770fa49..817bee4b 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt
@@ -30,12 +30,18 @@ import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.core.services.ServiceKey
import kotlinx.coroutines.selects.SelectClause0
import kotlinx.coroutines.selects.select
+import java.time.Clock
/**
* Represents the execution context in which a bootable [Image] runs on a [Server].
*/
public interface ServerContext {
/**
+ * The virtual clock.
+ */
+ public val clock: Clock
+
+ /**
* The server on which the image runs.
*/
public val server: Server
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
index c615d865..0e1af093 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
@@ -1,6 +1,5 @@
package com.atlarge.opendc.compute.core.image
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.execution.ServerContext
import com.atlarge.opendc.core.resource.TagContainer
import java.util.UUID
@@ -16,8 +15,7 @@ class VmImage(
) : Image {
override suspend fun invoke(ctx: ServerContext) {
- val clock = simulationContext.clock
- var offset = clock.millis()
+ var offset = ctx.clock.millis()
val batch = flopsHistory.map { fragment ->
val cores = min(fragment.cores, ctx.server.flavor.cpuCount)
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt
index cb637aea..7cb4c0c5 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt
@@ -27,8 +27,8 @@ package com.atlarge.opendc.compute.metal
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.core.Identity
-import java.util.UUID
import kotlinx.coroutines.flow.Flow
+import java.util.UUID
/**
* A bare-metal compute node.
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt
index 17d8ee53..41cec291 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt
@@ -30,8 +30,8 @@ import com.atlarge.opendc.compute.metal.Node
import com.atlarge.opendc.core.failure.FailureDomain
import com.atlarge.opendc.core.power.Powerable
import com.atlarge.opendc.core.services.AbstractServiceKey
-import java.util.UUID
import kotlinx.coroutines.flow.Flow
+import java.util.UUID
/**
* A driver interface for the management interface of a bare-metal compute node.
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
index a453e459..c118cc3d 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
@@ -24,8 +24,6 @@
package com.atlarge.opendc.compute.metal.driver
-import com.atlarge.odcsim.Domain
-import com.atlarge.odcsim.SimulationContext
import com.atlarge.odcsim.flow.EventFlow
import com.atlarge.odcsim.flow.StateFlow
import com.atlarge.opendc.compute.core.Flavor
@@ -46,14 +44,6 @@ import com.atlarge.opendc.compute.metal.power.ConstantPowerModel
import com.atlarge.opendc.core.power.PowerModel
import com.atlarge.opendc.core.services.ServiceKey
import com.atlarge.opendc.core.services.ServiceRegistry
-import java.lang.Exception
-import java.time.Clock
-import java.util.UUID
-import kotlin.coroutines.ContinuationInterceptor
-import kotlin.math.ceil
-import kotlin.math.max
-import kotlin.math.min
-import kotlin.random.Random
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Delay
import kotlinx.coroutines.DisposableHandle
@@ -67,12 +57,20 @@ import kotlinx.coroutines.intrinsics.startCoroutineCancellable
import kotlinx.coroutines.launch
import kotlinx.coroutines.selects.SelectClause0
import kotlinx.coroutines.selects.SelectInstance
-import kotlinx.coroutines.withContext
+import java.lang.Exception
+import java.time.Clock
+import java.util.UUID
+import kotlin.coroutines.ContinuationInterceptor
+import kotlin.math.ceil
+import kotlin.math.max
+import kotlin.math.min
+import kotlin.random.Random
/**
* A basic implementation of the [BareMetalDriver] that simulates an [Image] running on a bare-metal machine.
*
- * @param domain The simulation domain the driver runs in.
+ * @param coroutineScope The [CoroutineScope] the driver runs in.
+ * @param clock The virtual clock to keep track of time.
* @param uid The unique identifier of the machine.
* @param name An optional name of the machine.
* @param metadata The initial metadata of the node.
@@ -82,7 +80,8 @@ import kotlinx.coroutines.withContext
*/
@OptIn(ExperimentalCoroutinesApi::class)
public class SimpleBareMetalDriver(
- private val domain: Domain,
+ private val coroutineScope: CoroutineScope,
+ private val clock: Clock,
uid: UUID,
name: String,
metadata: Map<String, Any>,
@@ -129,14 +128,14 @@ public class SimpleBareMetalDriver(
*/
private val random = Random(uid.leastSignificantBits xor uid.mostSignificantBits)
- override suspend fun init(): Node = withContext(domain.coroutineContext) {
- nodeState.value
+ override suspend fun init(): Node {
+ return nodeState.value
}
- override suspend fun start(): Node = withContext(domain.coroutineContext) {
+ override suspend fun start(): Node {
val node = nodeState.value
if (node.state != NodeState.SHUTOFF) {
- return@withContext node
+ return node
}
val events = EventFlow<ServerEvent>()
@@ -153,13 +152,13 @@ public class SimpleBareMetalDriver(
setNode(node.copy(state = NodeState.BOOT, server = server))
serverContext = BareMetalServerContext(events)
- return@withContext nodeState.value
+ return nodeState.value
}
- override suspend fun stop(): Node = withContext(domain.coroutineContext) {
+ override suspend fun stop(): Node {
val node = nodeState.value
if (node.state == NodeState.SHUTOFF) {
- return@withContext node
+ return node
}
// We terminate the image running on the machine
@@ -167,20 +166,20 @@ public class SimpleBareMetalDriver(
serverContext = null
setNode(node.copy(state = NodeState.SHUTOFF, server = null))
- return@withContext node
+ return node
}
- override suspend fun reboot(): Node = withContext(domain.coroutineContext) {
+ override suspend fun reboot(): Node {
stop()
- start()
+ return start()
}
- override suspend fun setImage(image: Image): Node = withContext(domain.coroutineContext) {
+ override suspend fun setImage(image: Image): Node {
setNode(nodeState.value.copy(image = image))
- return@withContext nodeState.value
+ return nodeState.value
}
- override suspend fun refresh(): Node = withContext(domain.coroutineContext) { nodeState.value }
+ override suspend fun refresh(): Node = nodeState.value
private fun setNode(value: Node) {
val field = nodeState.value
@@ -207,7 +206,10 @@ public class SimpleBareMetalDriver(
override val server: Server
get() = nodeState.value.server!!
- private val job = domain.launch {
+ override val clock: Clock
+ get() = this@SimpleBareMetalDriver.clock
+
+ private val job = coroutineScope.launch {
delay(1) // TODO Introduce boot time
init()
try {
@@ -265,18 +267,13 @@ public class SimpleBareMetalDriver(
private var usageFlush: DisposableHandle? = null
/**
- * Cache the [Clock] for timing.
- */
- private val clock = domain.coroutineContext[SimulationContext]!!.clock
-
- /**
* Cache the [Delay] instance for timing.
*
* XXX We need to cache this before the call to [onRun] since doing this in [onRun] is too heavy.
* XXX Note however that this is an ugly hack which may break in the future.
*/
@OptIn(InternalCoroutinesApi::class)
- private val delay = domain.coroutineContext[ContinuationInterceptor] as Delay
+ private val delay = coroutineScope.coroutineContext[ContinuationInterceptor] as Delay
@OptIn(InternalCoroutinesApi::class)
override fun onRun(
@@ -353,10 +350,10 @@ public class SimpleBareMetalDriver(
currentDisposable?.dispose()
// Schedule reset the usage of the machine since the call is returning
- usageFlush = delay.invokeOnTimeout(1, Runnable {
+ usageFlush = delay.invokeOnTimeout(1) {
usageState.value = 0.0
usageFlush = null
- })
+ }
}
select.disposeOnSelect(disposable)
@@ -458,7 +455,7 @@ public class SimpleBareMetalDriver(
}
override val scope: CoroutineScope
- get() = domain
+ get() = coroutineScope
override suspend fun fail() {
serverContext?.unavailable = true
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt
index f6b236ae..f64f9b5a 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt
@@ -24,44 +24,41 @@
package com.atlarge.opendc.compute.metal.service
-import com.atlarge.odcsim.Domain
import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.compute.metal.Node
import com.atlarge.opendc.compute.metal.driver.BareMetalDriver
import kotlinx.coroutines.CancellationException
-import kotlinx.coroutines.withContext
/**
* A very basic implementation of the [ProvisioningService].
*/
-public class SimpleProvisioningService(val domain: Domain) : ProvisioningService {
+public class SimpleProvisioningService : ProvisioningService {
/**
* The active nodes in this service.
*/
private val nodes: MutableMap<Node, BareMetalDriver> = mutableMapOf()
- override suspend fun create(driver: BareMetalDriver): Node = withContext(domain.coroutineContext) {
+ override suspend fun create(driver: BareMetalDriver): Node {
val node = driver.init()
nodes[node] = driver
- return@withContext node
+ return node
}
- override suspend fun nodes(): Set<Node> = withContext(domain.coroutineContext) { nodes.keys }
+ override suspend fun nodes(): Set<Node> = nodes.keys
- override suspend fun refresh(node: Node): Node = withContext(domain.coroutineContext) {
- return@withContext nodes[node]!!.refresh()
+ override suspend fun refresh(node: Node): Node {
+ return nodes[node]!!.refresh()
}
- override suspend fun deploy(node: Node, image: Image): Node = withContext(domain.coroutineContext) {
+ override suspend fun deploy(node: Node, image: Image): Node {
val driver = nodes[node]!!
driver.setImage(image)
- val newNode = driver.reboot()
- return@withContext newNode
+ return driver.reboot()
}
- override suspend fun stop(node: Node): Node = withContext(domain.coroutineContext) {
+ override suspend fun stop(node: Node): Node {
val driver = nodes[node]!!
- try {
+ return try {
driver.stop()
} catch (e: CancellationException) {
node
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt
index 1e7e351f..69b0124d 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt
@@ -25,8 +25,8 @@
package com.atlarge.opendc.compute.virt
import com.atlarge.opendc.core.Identity
-import java.util.UUID
import kotlinx.coroutines.flow.Flow
+import java.util.UUID
/**
* A hypervisor (or virtual machine monitor) is software or firmware that virtualizes the host compute environment
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt
index 607759a8..bd395f0d 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt
@@ -29,9 +29,9 @@ import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver
import com.atlarge.opendc.compute.virt.driver.VirtDriver
import com.atlarge.opendc.core.resource.TagContainer
-import java.util.UUID
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.suspendCancellableCoroutine
+import java.util.UUID
/**
* A hypervisor managing the VMs of a node.
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
index 192db413..df45f440 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
@@ -25,7 +25,6 @@
package com.atlarge.opendc.compute.virt.driver
import com.atlarge.odcsim.flow.EventFlow
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.Server
@@ -40,10 +39,6 @@ import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
import com.atlarge.opendc.compute.virt.HypervisorEvent
import com.atlarge.opendc.core.services.ServiceKey
import com.atlarge.opendc.core.services.ServiceRegistry
-import java.util.UUID
-import kotlin.math.ceil
-import kotlin.math.max
-import kotlin.math.min
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.DisposableHandle
@@ -59,6 +54,17 @@ import kotlinx.coroutines.launch
import kotlinx.coroutines.selects.SelectClause0
import kotlinx.coroutines.selects.SelectInstance
import kotlinx.coroutines.selects.select
+import mu.KotlinLogging
+import java.time.Clock
+import java.util.UUID
+import kotlin.math.ceil
+import kotlin.math.max
+import kotlin.math.min
+
+/**
+ * The logging instance to use.
+ */
+private val logger = KotlinLogging.logger {}
/**
* A [VirtDriver] that is backed by a simple hypervisor implementation.
@@ -97,7 +103,7 @@ class SimpleVirtDriver(
scheduler()
} catch (e: Exception) {
if (e !is CancellationException) {
- simulationContext.log.error("Hypervisor scheduler failed", e)
+ logger.error("Hypervisor scheduler failed", e)
}
throw e
}
@@ -117,8 +123,14 @@ class SimpleVirtDriver(
val events = EventFlow<ServerEvent>()
val server = Server(
- UUID.randomUUID(), name, emptyMap(), flavor, image, ServerState.BUILD,
- ServiceRegistry(), events
+ UUID.randomUUID(),
+ name,
+ emptyMap(),
+ flavor,
+ image,
+ ServerState.BUILD,
+ ServiceRegistry(),
+ events
)
availableMemory -= requiredMemory
vms.add(VmServerContext(server, events))
@@ -181,7 +193,7 @@ class SimpleVirtDriver(
* The scheduling process of the hypervisor.
*/
private suspend fun scheduler() {
- val clock = simulationContext.clock
+ val clock = hostContext.clock
val maxUsage = hostContext.cpus.sumByDouble { it.frequency }
val pCPUs = hostContext.cpus.indices.sortedBy { hostContext.cpus[it].frequency }
@@ -561,6 +573,9 @@ class SimpleVirtDriver(
override val cpus: List<ProcessingUnit> = hostContext.cpus.take(server.flavor.cpuCount)
+ override val clock: Clock
+ get() = hostContext.clock
+
init {
vm = Vm(this)
}
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt
index b1844f67..1002d382 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt
@@ -29,8 +29,8 @@ import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.compute.virt.HypervisorEvent
import com.atlarge.opendc.core.services.AbstractServiceKey
-import java.util.UUID
import kotlinx.coroutines.flow.Flow
+import java.util.UUID
/**
* A driver interface for a hypervisor running on some host server and communicating with the central compute service to
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
index 79388bc3..6b2cfc40 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
@@ -1,8 +1,6 @@
package com.atlarge.opendc.compute.virt.service
-import com.atlarge.odcsim.SimulationContext
import com.atlarge.odcsim.flow.EventFlow
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.ServerEvent
@@ -16,29 +14,25 @@ import com.atlarge.opendc.compute.virt.driver.InsufficientMemoryOnServerExceptio
import com.atlarge.opendc.compute.virt.driver.VirtDriver
import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy
import com.atlarge.opendc.core.services.ServiceKey
-import kotlin.coroutines.Continuation
-import kotlin.coroutines.resume
-import kotlin.math.max
-import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.ExperimentalCoroutinesApi
-import kotlinx.coroutines.Job
-import kotlinx.coroutines.delay
+import kotlinx.coroutines.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.suspendCancellableCoroutine
-import kotlinx.coroutines.withContext
import mu.KotlinLogging
+import java.time.Clock
+import kotlin.coroutines.Continuation
+import kotlin.coroutines.resume
+import kotlin.math.max
private val logger = KotlinLogging.logger {}
@OptIn(ExperimentalCoroutinesApi::class)
class SimpleVirtProvisioningService(
- public override val allocationPolicy: AllocationPolicy,
- private val ctx: SimulationContext,
- private val provisioningService: ProvisioningService
-) : VirtProvisioningService, CoroutineScope by ctx.domain {
+ private val coroutineScope: CoroutineScope,
+ private val clock: Clock,
+ private val provisioningService: ProvisioningService,
+ override val allocationPolicy: AllocationPolicy
+) : VirtProvisioningService {
/**
* The hypervisors that have been launched by the service.
*/
@@ -59,11 +53,11 @@ class SimpleVirtProvisioningService(
*/
private val activeImages: MutableSet<ImageView> = mutableSetOf()
- public var submittedVms = 0
- public var queuedVms = 0
- public var runningVms = 0
- public var finishedVms = 0
- public var unscheduledVms = 0
+ var submittedVms = 0
+ var queuedVms = 0
+ var runningVms = 0
+ var finishedVms = 0
+ var unscheduledVms = 0
private var maxCores = 0
private var maxMemory = 0L
@@ -81,7 +75,7 @@ class SimpleVirtProvisioningService(
override val events: Flow<VirtProvisioningEvent> = eventFlow
init {
- launch {
+ coroutineScope.launch {
val provisionedNodes = provisioningService.nodes()
provisionedNodes.forEach { node ->
val hypervisorImage = HypervisorImage
@@ -96,27 +90,29 @@ class SimpleVirtProvisioningService(
}
}
- override suspend fun drivers(): Set<VirtDriver> = withContext(coroutineContext) {
- availableHypervisors.map { it.driver }.toSet()
+ override suspend fun drivers(): Set<VirtDriver> {
+ return availableHypervisors.map { it.driver }.toSet()
}
override suspend fun deploy(
name: String,
image: Image,
flavor: Flavor
- ): Server = withContext(coroutineContext) {
- eventFlow.emit(VirtProvisioningEvent.MetricsAvailable(
- this@SimpleVirtProvisioningService,
- hypervisors.size,
- availableHypervisors.size,
- ++submittedVms,
- runningVms,
- finishedVms,
- ++queuedVms,
- unscheduledVms
- ))
-
- suspendCancellableCoroutine<Server> { cont ->
+ ): Server {
+ eventFlow.emit(
+ VirtProvisioningEvent.MetricsAvailable(
+ this@SimpleVirtProvisioningService,
+ hypervisors.size,
+ availableHypervisors.size,
+ ++submittedVms,
+ runningVms,
+ finishedVms,
+ ++queuedVms,
+ unscheduledVms
+ )
+ )
+
+ return suspendCancellableCoroutine<Server> { cont ->
val vmInstance = ImageView(name, image, flavor, cont)
incomingImages += vmInstance
requestCycle()
@@ -139,9 +135,9 @@ class SimpleVirtProvisioningService(
// We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120).
// This is important because the slices of the VMs need to be aligned.
// We calculate here the delay until the next scheduling slot.
- val delay = quantum - (ctx.clock.millis() % quantum)
+ val delay = quantum - (clock.millis() % quantum)
- val call = launch {
+ val call = coroutineScope.launch {
delay(delay)
this@SimpleVirtProvisioningService.call = null
schedule()
@@ -150,7 +146,6 @@ class SimpleVirtProvisioningService(
}
private suspend fun schedule() {
- val clock = simulationContext.clock
val imagesToBeScheduled = incomingImages.toSet()
for (imageInstance in imagesToBeScheduled) {
@@ -159,16 +154,18 @@ class SimpleVirtProvisioningService(
if (selectedHv == null) {
if (requiredMemory > maxMemory || imageInstance.flavor.cpuCount > maxCores) {
- eventFlow.emit(VirtProvisioningEvent.MetricsAvailable(
- this@SimpleVirtProvisioningService,
- hypervisors.size,
- availableHypervisors.size,
- submittedVms,
- runningVms,
- finishedVms,
- queuedVms,
- ++unscheduledVms
- ))
+ eventFlow.emit(
+ VirtProvisioningEvent.MetricsAvailable(
+ this@SimpleVirtProvisioningService,
+ hypervisors.size,
+ availableHypervisors.size,
+ submittedVms,
+ runningVms,
+ finishedVms,
+ queuedVms,
+ ++unscheduledVms
+ )
+ )
incomingImages -= imageInstance
@@ -180,7 +177,7 @@ class SimpleVirtProvisioningService(
}
try {
- logger.info { "[${ctx.clock.millis()}] Spawning ${imageInstance.image} on ${selectedHv.server.uid} ${selectedHv.server.name} ${selectedHv.server.flavor}" }
+ logger.info { "[${clock.millis()}] Spawning ${imageInstance.image} on ${selectedHv.server.uid} ${selectedHv.server.name} ${selectedHv.server.flavor}" }
incomingImages -= imageInstance
// Speculatively update the hypervisor view information to prevent other images in the queue from
@@ -197,16 +194,18 @@ class SimpleVirtProvisioningService(
imageInstance.server = server
imageInstance.continuation.resume(server)
- eventFlow.emit(VirtProvisioningEvent.MetricsAvailable(
- this@SimpleVirtProvisioningService,
- hypervisors.size,
- availableHypervisors.size,
- submittedVms,
- ++runningVms,
- finishedVms,
- --queuedVms,
- unscheduledVms
- ))
+ eventFlow.emit(
+ VirtProvisioningEvent.MetricsAvailable(
+ this@SimpleVirtProvisioningService,
+ hypervisors.size,
+ availableHypervisors.size,
+ submittedVms,
+ ++runningVms,
+ finishedVms,
+ --queuedVms,
+ unscheduledVms
+ )
+ )
activeImages += imageInstance
server.events
@@ -214,18 +213,20 @@ class SimpleVirtProvisioningService(
when (event) {
is ServerEvent.StateChanged -> {
if (event.server.state == ServerState.SHUTOFF) {
- logger.info { "[${ctx.clock.millis()}] Server ${event.server.uid} ${event.server.name} ${event.server.flavor} finished." }
-
- eventFlow.emit(VirtProvisioningEvent.MetricsAvailable(
- this@SimpleVirtProvisioningService,
- hypervisors.size,
- availableHypervisors.size,
- submittedVms,
- --runningVms,
- ++finishedVms,
- queuedVms,
- unscheduledVms
- ))
+ logger.info { "[${clock.millis()}] Server ${event.server.uid} ${event.server.name} ${event.server.flavor} finished." }
+
+ eventFlow.emit(
+ VirtProvisioningEvent.MetricsAvailable(
+ this@SimpleVirtProvisioningService,
+ hypervisors.size,
+ availableHypervisors.size,
+ submittedVms,
+ --runningVms,
+ ++finishedVms,
+ queuedVms,
+ unscheduledVms
+ )
+ )
activeImages -= imageInstance
selectedHv.provisionedCores -= server.flavor.cpuCount
@@ -238,7 +239,7 @@ class SimpleVirtProvisioningService(
}
}
}
- .launchIn(this)
+ .launchIn(coroutineScope)
} catch (e: InsufficientMemoryOnServerException) {
logger.error("Failed to deploy VM", e)
@@ -254,7 +255,7 @@ class SimpleVirtProvisioningService(
private fun stateChanged(server: Server) {
when (server.state) {
ServerState.ACTIVE -> {
- logger.debug { "[${ctx.clock.millis()}] Server ${server.uid} available: ${server.state}" }
+ logger.debug { "[${clock.millis()}] Server ${server.uid} available: ${server.state}" }
if (server in hypervisors) {
// Corner case for when the hypervisor already exists
@@ -272,16 +273,18 @@ class SimpleVirtProvisioningService(
hypervisors[server] = hv
}
- eventFlow.emit(VirtProvisioningEvent.MetricsAvailable(
- this@SimpleVirtProvisioningService,
- hypervisors.size,
- availableHypervisors.size,
- submittedVms,
- runningVms,
- finishedVms,
- queuedVms,
- unscheduledVms
- ))
+ eventFlow.emit(
+ VirtProvisioningEvent.MetricsAvailable(
+ this@SimpleVirtProvisioningService,
+ hypervisors.size,
+ availableHypervisors.size,
+ submittedVms,
+ runningVms,
+ finishedVms,
+ queuedVms,
+ unscheduledVms
+ )
+ )
// Re-schedule on the new machine
if (incomingImages.isNotEmpty()) {
@@ -289,20 +292,22 @@ class SimpleVirtProvisioningService(
}
}
ServerState.SHUTOFF, ServerState.ERROR -> {
- logger.debug { "[${ctx.clock.millis()}] Server ${server.uid} unavailable: ${server.state}" }
+ logger.debug { "[${clock.millis()}] Server ${server.uid} unavailable: ${server.state}" }
val hv = hypervisors[server] ?: return
availableHypervisors -= hv
- eventFlow.emit(VirtProvisioningEvent.MetricsAvailable(
- this@SimpleVirtProvisioningService,
- hypervisors.size,
- availableHypervisors.size,
- submittedVms,
- runningVms,
- finishedVms,
- queuedVms,
- unscheduledVms
- ))
+ eventFlow.emit(
+ VirtProvisioningEvent.MetricsAvailable(
+ this@SimpleVirtProvisioningService,
+ hypervisors.size,
+ availableHypervisors.size,
+ submittedVms,
+ runningVms,
+ finishedVms,
+ queuedVms,
+ unscheduledVms
+ )
+ )
if (incomingImages.isNotEmpty()) {
requestCycle()
@@ -318,16 +323,18 @@ class SimpleVirtProvisioningService(
hv.driver = server.services[VirtDriver]
availableHypervisors += hv
- eventFlow.emit(VirtProvisioningEvent.MetricsAvailable(
- this@SimpleVirtProvisioningService,
- hypervisors.size,
- availableHypervisors.size,
- submittedVms,
- runningVms,
- finishedVms,
- queuedVms,
- unscheduledVms
- ))
+ eventFlow.emit(
+ VirtProvisioningEvent.MetricsAvailable(
+ this@SimpleVirtProvisioningService,
+ hypervisors.size,
+ availableHypervisors.size,
+ submittedVms,
+ runningVms,
+ finishedVms,
+ queuedVms,
+ unscheduledVms
+ )
+ )
hv.driver.events
.onEach { event ->
@@ -335,7 +342,7 @@ class SimpleVirtProvisioningService(
hv.numberOfActiveServers = event.numberOfActiveServers
hv.availableMemory = event.availableMemory
}
- }.launchIn(this)
+ }.launchIn(coroutineScope)
requestCycle()
}
diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt
index 1c7b751c..417db77d 100644
--- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt
+++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt
@@ -24,10 +24,10 @@
package com.atlarge.opendc.compute.core.image
-import java.util.UUID
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
+import java.util.UUID
/**
* Test suite for [FlopsApplicationImage]
diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
index af9d3421..80c9c547 100644
--- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
+++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
@@ -31,8 +31,6 @@ import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.ServerEvent
import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.core.image.FlopsApplicationImage
-import java.util.ServiceLoader
-import java.util.UUID
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
@@ -41,6 +39,8 @@ import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
+import java.util.ServiceLoader
+import java.util.UUID
internal class SimpleBareMetalDriverTest {
/**
@@ -56,7 +56,7 @@ internal class SimpleBareMetalDriverTest {
val dom = root.newDomain(name = "driver")
val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4)
val cpus = List(4) { ProcessingUnit(cpuNode, it, 2400.0) }
- val driver = SimpleBareMetalDriver(dom, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
+ val driver = SimpleBareMetalDriver(dom, simulationContext.clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1_000, 2)
// Batch driver commands
diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt
index ed2256c0..37cd5898 100644
--- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt
+++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt
@@ -25,17 +25,18 @@
package com.atlarge.opendc.compute.metal.service
import com.atlarge.odcsim.SimulationEngineProvider
+import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.ProcessingNode
import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.image.FlopsApplicationImage
import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver
-import java.util.ServiceLoader
-import java.util.UUID
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Test
+import java.util.ServiceLoader
+import java.util.UUID
/**
* Test suite for the [SimpleProvisioningService].
@@ -50,14 +51,15 @@ internal class SimpleProvisioningServiceTest {
val system = provider("sim")
val root = system.newDomain(name = "root")
root.launch {
+ val clock = simulationContext.clock
val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1000, 2)
val dom = root.newDomain("provisioner")
val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4)
val cpus = List(4) { ProcessingUnit(cpuNode, it, 2400.0) }
- val driver = SimpleBareMetalDriver(dom.newDomain(), UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
+ val driver = SimpleBareMetalDriver(dom.newDomain(), clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
- val provisioner = SimpleProvisioningService(dom)
+ val provisioner = SimpleProvisioningService()
provisioner.create(driver)
delay(5)
val nodes = provisioner.nodes()
diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt
index 622b185e..528434b1 100644
--- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt
+++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt
@@ -25,6 +25,7 @@
package com.atlarge.opendc.compute.virt
import com.atlarge.odcsim.SimulationEngineProvider
+import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.ProcessingNode
import com.atlarge.opendc.compute.core.ProcessingUnit
@@ -33,8 +34,6 @@ import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment
import com.atlarge.opendc.compute.core.image.VmImage
import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver
import com.atlarge.opendc.compute.virt.driver.VirtDriver
-import java.util.ServiceLoader
-import java.util.UUID
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.launchIn
@@ -45,6 +44,8 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
+import java.util.ServiceLoader
+import java.util.UUID
/**
* Basic test-suite for the hypervisor.
@@ -62,6 +63,7 @@ internal class HypervisorTest {
val root = system.newDomain("root")
root.launch {
+ val clock = simulationContext.clock
val vmm = HypervisorImage
val workloadA = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1_000, 1)
val workloadB = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 2_000, 1)
@@ -70,7 +72,7 @@ internal class HypervisorTest {
val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 1)
val cpus = List(1) { ProcessingUnit(cpuNode, it, 2000.0) }
- val metalDriver = SimpleBareMetalDriver(driverDom, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
+ val metalDriver = SimpleBareMetalDriver(driverDom, clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
metalDriver.init()
metalDriver.setImage(vmm)
@@ -108,26 +110,41 @@ internal class HypervisorTest {
var overcommissionedBurst = 0L
root.launch {
+ val clock = simulationContext.clock
val vmm = HypervisorImage
val duration = 5 * 60L
- val vmImageA = VmImage(UUID.randomUUID(), "<unnamed>", emptyMap(), sequenceOf(
- FlopsHistoryFragment(0, 28L * duration, duration * 1000, 28.0, 2),
- FlopsHistoryFragment(0, 3500L * duration, duration * 1000, 3500.0, 2),
- FlopsHistoryFragment(0, 0, duration * 1000, 0.0, 2),
- FlopsHistoryFragment(0, 183L * duration, duration * 1000, 183.0, 2)
- ), 2, 0)
- val vmImageB = VmImage(UUID.randomUUID(), "<unnamed>", emptyMap(), sequenceOf(
- FlopsHistoryFragment(0, 28L * duration, duration * 1000, 28.0, 2),
- FlopsHistoryFragment(0, 3100L * duration, duration * 1000, 3100.0, 2),
- FlopsHistoryFragment(0, 0, duration * 1000, 0.0, 2),
- FlopsHistoryFragment(0, 73L * duration, duration * 1000, 73.0, 2)
- ), 2, 0)
+ val vmImageA = VmImage(
+ UUID.randomUUID(),
+ "<unnamed>",
+ emptyMap(),
+ sequenceOf(
+ FlopsHistoryFragment(0, 28L * duration, duration * 1000, 28.0, 2),
+ FlopsHistoryFragment(0, 3500L * duration, duration * 1000, 3500.0, 2),
+ FlopsHistoryFragment(0, 0, duration * 1000, 0.0, 2),
+ FlopsHistoryFragment(0, 183L * duration, duration * 1000, 183.0, 2)
+ ),
+ 2,
+ 0
+ )
+ val vmImageB = VmImage(
+ UUID.randomUUID(),
+ "<unnamed>",
+ emptyMap(),
+ sequenceOf(
+ FlopsHistoryFragment(0, 28L * duration, duration * 1000, 28.0, 2),
+ FlopsHistoryFragment(0, 3100L * duration, duration * 1000, 3100.0, 2),
+ FlopsHistoryFragment(0, 0, duration * 1000, 0.0, 2),
+ FlopsHistoryFragment(0, 73L * duration, duration * 1000, 73.0, 2)
+ ),
+ 2,
+ 0
+ )
val driverDom = root.newDomain("driver")
val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2)
val cpus = List(2) { ProcessingUnit(cpuNode, it, 3200.0) }
- val metalDriver = SimpleBareMetalDriver(driverDom, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
+ val metalDriver = SimpleBareMetalDriver(driverDom, clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
metalDriver.init()
metalDriver.setImage(vmm)