summaryrefslogtreecommitdiff
path: root/simulator/opendc/opendc-compute/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-09-30 21:14:20 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-09-30 23:40:57 +0200
commitc41d201343263346ac84855a0b2254051ed33c21 (patch)
tree9141a382f9e1b2d924e9a191e53cc6daa9107563 /simulator/opendc/opendc-compute/src
parentc543f55e961f9f7468e19c1c0f5f20566d07dfb5 (diff)
Eliminate use of Domain and simulationContext in OpenDC
This change takes the first step in eliminating the explict use of Domain and simulationContext from OpenDC. In this way, we decouple the logic of various datacenter services from simulation logic, which should promote re-use.
Diffstat (limited to 'simulator/opendc/opendc-compute/src')
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt2
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt6
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt4
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt2
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt2
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt69
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt23
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt2
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt2
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt33
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt2
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt221
-rw-r--r--simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt2
-rw-r--r--simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt6
-rw-r--r--simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt10
-rw-r--r--simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt49
16 files changed, 237 insertions, 198 deletions
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt
index fd0fc836..01968cd8 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt
@@ -28,8 +28,8 @@ import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.core.resource.Resource
import com.atlarge.opendc.core.resource.TagContainer
import com.atlarge.opendc.core.services.ServiceRegistry
-import java.util.UUID
import kotlinx.coroutines.flow.Flow
+import java.util.UUID
/**
* A server instance that is running on some physical or virtual machine.
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt
index f770fa49..817bee4b 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt
@@ -30,12 +30,18 @@ import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.core.services.ServiceKey
import kotlinx.coroutines.selects.SelectClause0
import kotlinx.coroutines.selects.select
+import java.time.Clock
/**
* Represents the execution context in which a bootable [Image] runs on a [Server].
*/
public interface ServerContext {
/**
+ * The virtual clock.
+ */
+ public val clock: Clock
+
+ /**
* The server on which the image runs.
*/
public val server: Server
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
index c615d865..0e1af093 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
@@ -1,6 +1,5 @@
package com.atlarge.opendc.compute.core.image
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.execution.ServerContext
import com.atlarge.opendc.core.resource.TagContainer
import java.util.UUID
@@ -16,8 +15,7 @@ class VmImage(
) : Image {
override suspend fun invoke(ctx: ServerContext) {
- val clock = simulationContext.clock
- var offset = clock.millis()
+ var offset = ctx.clock.millis()
val batch = flopsHistory.map { fragment ->
val cores = min(fragment.cores, ctx.server.flavor.cpuCount)
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt
index cb637aea..7cb4c0c5 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt
@@ -27,8 +27,8 @@ package com.atlarge.opendc.compute.metal
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.core.Identity
-import java.util.UUID
import kotlinx.coroutines.flow.Flow
+import java.util.UUID
/**
* A bare-metal compute node.
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt
index 17d8ee53..41cec291 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt
@@ -30,8 +30,8 @@ import com.atlarge.opendc.compute.metal.Node
import com.atlarge.opendc.core.failure.FailureDomain
import com.atlarge.opendc.core.power.Powerable
import com.atlarge.opendc.core.services.AbstractServiceKey
-import java.util.UUID
import kotlinx.coroutines.flow.Flow
+import java.util.UUID
/**
* A driver interface for the management interface of a bare-metal compute node.
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
index a453e459..c118cc3d 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
@@ -24,8 +24,6 @@
package com.atlarge.opendc.compute.metal.driver
-import com.atlarge.odcsim.Domain
-import com.atlarge.odcsim.SimulationContext
import com.atlarge.odcsim.flow.EventFlow
import com.atlarge.odcsim.flow.StateFlow
import com.atlarge.opendc.compute.core.Flavor
@@ -46,14 +44,6 @@ import com.atlarge.opendc.compute.metal.power.ConstantPowerModel
import com.atlarge.opendc.core.power.PowerModel
import com.atlarge.opendc.core.services.ServiceKey
import com.atlarge.opendc.core.services.ServiceRegistry
-import java.lang.Exception
-import java.time.Clock
-import java.util.UUID
-import kotlin.coroutines.ContinuationInterceptor
-import kotlin.math.ceil
-import kotlin.math.max
-import kotlin.math.min
-import kotlin.random.Random
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Delay
import kotlinx.coroutines.DisposableHandle
@@ -67,12 +57,20 @@ import kotlinx.coroutines.intrinsics.startCoroutineCancellable
import kotlinx.coroutines.launch
import kotlinx.coroutines.selects.SelectClause0
import kotlinx.coroutines.selects.SelectInstance
-import kotlinx.coroutines.withContext
+import java.lang.Exception
+import java.time.Clock
+import java.util.UUID
+import kotlin.coroutines.ContinuationInterceptor
+import kotlin.math.ceil
+import kotlin.math.max
+import kotlin.math.min
+import kotlin.random.Random
/**
* A basic implementation of the [BareMetalDriver] that simulates an [Image] running on a bare-metal machine.
*
- * @param domain The simulation domain the driver runs in.
+ * @param coroutineScope The [CoroutineScope] the driver runs in.
+ * @param clock The virtual clock to keep track of time.
* @param uid The unique identifier of the machine.
* @param name An optional name of the machine.
* @param metadata The initial metadata of the node.
@@ -82,7 +80,8 @@ import kotlinx.coroutines.withContext
*/
@OptIn(ExperimentalCoroutinesApi::class)
public class SimpleBareMetalDriver(
- private val domain: Domain,
+ private val coroutineScope: CoroutineScope,
+ private val clock: Clock,
uid: UUID,
name: String,
metadata: Map<String, Any>,
@@ -129,14 +128,14 @@ public class SimpleBareMetalDriver(
*/
private val random = Random(uid.leastSignificantBits xor uid.mostSignificantBits)
- override suspend fun init(): Node = withContext(domain.coroutineContext) {
- nodeState.value
+ override suspend fun init(): Node {
+ return nodeState.value
}
- override suspend fun start(): Node = withContext(domain.coroutineContext) {
+ override suspend fun start(): Node {
val node = nodeState.value
if (node.state != NodeState.SHUTOFF) {
- return@withContext node
+ return node
}
val events = EventFlow<ServerEvent>()
@@ -153,13 +152,13 @@ public class SimpleBareMetalDriver(
setNode(node.copy(state = NodeState.BOOT, server = server))
serverContext = BareMetalServerContext(events)
- return@withContext nodeState.value
+ return nodeState.value
}
- override suspend fun stop(): Node = withContext(domain.coroutineContext) {
+ override suspend fun stop(): Node {
val node = nodeState.value
if (node.state == NodeState.SHUTOFF) {
- return@withContext node
+ return node
}
// We terminate the image running on the machine
@@ -167,20 +166,20 @@ public class SimpleBareMetalDriver(
serverContext = null
setNode(node.copy(state = NodeState.SHUTOFF, server = null))
- return@withContext node
+ return node
}
- override suspend fun reboot(): Node = withContext(domain.coroutineContext) {
+ override suspend fun reboot(): Node {
stop()
- start()
+ return start()
}
- override suspend fun setImage(image: Image): Node = withContext(domain.coroutineContext) {
+ override suspend fun setImage(image: Image): Node {
setNode(nodeState.value.copy(image = image))
- return@withContext nodeState.value
+ return nodeState.value
}
- override suspend fun refresh(): Node = withContext(domain.coroutineContext) { nodeState.value }
+ override suspend fun refresh(): Node = nodeState.value
private fun setNode(value: Node) {
val field = nodeState.value
@@ -207,7 +206,10 @@ public class SimpleBareMetalDriver(
override val server: Server
get() = nodeState.value.server!!
- private val job = domain.launch {
+ override val clock: Clock
+ get() = this@SimpleBareMetalDriver.clock
+
+ private val job = coroutineScope.launch {
delay(1) // TODO Introduce boot time
init()
try {
@@ -265,18 +267,13 @@ public class SimpleBareMetalDriver(
private var usageFlush: DisposableHandle? = null
/**
- * Cache the [Clock] for timing.
- */
- private val clock = domain.coroutineContext[SimulationContext]!!.clock
-
- /**
* Cache the [Delay] instance for timing.
*
* XXX We need to cache this before the call to [onRun] since doing this in [onRun] is too heavy.
* XXX Note however that this is an ugly hack which may break in the future.
*/
@OptIn(InternalCoroutinesApi::class)
- private val delay = domain.coroutineContext[ContinuationInterceptor] as Delay
+ private val delay = coroutineScope.coroutineContext[ContinuationInterceptor] as Delay
@OptIn(InternalCoroutinesApi::class)
override fun onRun(
@@ -353,10 +350,10 @@ public class SimpleBareMetalDriver(
currentDisposable?.dispose()
// Schedule reset the usage of the machine since the call is returning
- usageFlush = delay.invokeOnTimeout(1, Runnable {
+ usageFlush = delay.invokeOnTimeout(1) {
usageState.value = 0.0
usageFlush = null
- })
+ }
}
select.disposeOnSelect(disposable)
@@ -458,7 +455,7 @@ public class SimpleBareMetalDriver(
}
override val scope: CoroutineScope
- get() = domain
+ get() = coroutineScope
override suspend fun fail() {
serverContext?.unavailable = true
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt
index f6b236ae..f64f9b5a 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt
@@ -24,44 +24,41 @@
package com.atlarge.opendc.compute.metal.service
-import com.atlarge.odcsim.Domain
import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.compute.metal.Node
import com.atlarge.opendc.compute.metal.driver.BareMetalDriver
import kotlinx.coroutines.CancellationException
-import kotlinx.coroutines.withContext
/**
* A very basic implementation of the [ProvisioningService].
*/
-public class SimpleProvisioningService(val domain: Domain) : ProvisioningService {
+public class SimpleProvisioningService : ProvisioningService {
/**
* The active nodes in this service.
*/
private val nodes: MutableMap<Node, BareMetalDriver> = mutableMapOf()
- override suspend fun create(driver: BareMetalDriver): Node = withContext(domain.coroutineContext) {
+ override suspend fun create(driver: BareMetalDriver): Node {
val node = driver.init()
nodes[node] = driver
- return@withContext node
+ return node
}
- override suspend fun nodes(): Set<Node> = withContext(domain.coroutineContext) { nodes.keys }
+ override suspend fun nodes(): Set<Node> = nodes.keys
- override suspend fun refresh(node: Node): Node = withContext(domain.coroutineContext) {
- return@withContext nodes[node]!!.refresh()
+ override suspend fun refresh(node: Node): Node {
+ return nodes[node]!!.refresh()
}
- override suspend fun deploy(node: Node, image: Image): Node = withContext(domain.coroutineContext) {
+ override suspend fun deploy(node: Node, image: Image): Node {
val driver = nodes[node]!!
driver.setImage(image)
- val newNode = driver.reboot()
- return@withContext newNode
+ return driver.reboot()
}
- override suspend fun stop(node: Node): Node = withContext(domain.coroutineContext) {
+ override suspend fun stop(node: Node): Node {
val driver = nodes[node]!!
- try {
+ return try {
driver.stop()
} catch (e: CancellationException) {
node
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt
index 1e7e351f..69b0124d 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt
@@ -25,8 +25,8 @@
package com.atlarge.opendc.compute.virt
import com.atlarge.opendc.core.Identity
-import java.util.UUID
import kotlinx.coroutines.flow.Flow
+import java.util.UUID
/**
* A hypervisor (or virtual machine monitor) is software or firmware that virtualizes the host compute environment
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt
index 607759a8..bd395f0d 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt
@@ -29,9 +29,9 @@ import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver
import com.atlarge.opendc.compute.virt.driver.VirtDriver
import com.atlarge.opendc.core.resource.TagContainer
-import java.util.UUID
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.suspendCancellableCoroutine
+import java.util.UUID
/**
* A hypervisor managing the VMs of a node.
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
index 192db413..df45f440 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
@@ -25,7 +25,6 @@
package com.atlarge.opendc.compute.virt.driver
import com.atlarge.odcsim.flow.EventFlow
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.Server
@@ -40,10 +39,6 @@ import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
import com.atlarge.opendc.compute.virt.HypervisorEvent
import com.atlarge.opendc.core.services.ServiceKey
import com.atlarge.opendc.core.services.ServiceRegistry
-import java.util.UUID
-import kotlin.math.ceil
-import kotlin.math.max
-import kotlin.math.min
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.DisposableHandle
@@ -59,6 +54,17 @@ import kotlinx.coroutines.launch
import kotlinx.coroutines.selects.SelectClause0
import kotlinx.coroutines.selects.SelectInstance
import kotlinx.coroutines.selects.select
+import mu.KotlinLogging
+import java.time.Clock
+import java.util.UUID
+import kotlin.math.ceil
+import kotlin.math.max
+import kotlin.math.min
+
+/**
+ * The logging instance to use.
+ */
+private val logger = KotlinLogging.logger {}
/**
* A [VirtDriver] that is backed by a simple hypervisor implementation.
@@ -97,7 +103,7 @@ class SimpleVirtDriver(
scheduler()
} catch (e: Exception) {
if (e !is CancellationException) {
- simulationContext.log.error("Hypervisor scheduler failed", e)
+ logger.error("Hypervisor scheduler failed", e)
}
throw e
}
@@ -117,8 +123,14 @@ class SimpleVirtDriver(
val events = EventFlow<ServerEvent>()
val server = Server(
- UUID.randomUUID(), name, emptyMap(), flavor, image, ServerState.BUILD,
- ServiceRegistry(), events
+ UUID.randomUUID(),
+ name,
+ emptyMap(),
+ flavor,
+ image,
+ ServerState.BUILD,
+ ServiceRegistry(),
+ events
)
availableMemory -= requiredMemory
vms.add(VmServerContext(server, events))
@@ -181,7 +193,7 @@ class SimpleVirtDriver(
* The scheduling process of the hypervisor.
*/
private suspend fun scheduler() {
- val clock = simulationContext.clock
+ val clock = hostContext.clock
val maxUsage = hostContext.cpus.sumByDouble { it.frequency }
val pCPUs = hostContext.cpus.indices.sortedBy { hostContext.cpus[it].frequency }
@@ -561,6 +573,9 @@ class SimpleVirtDriver(
override val cpus: List<ProcessingUnit> = hostContext.cpus.take(server.flavor.cpuCount)
+ override val clock: Clock
+ get() = hostContext.clock
+
init {
vm = Vm(this)
}
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt
index b1844f67..1002d382 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt
@@ -29,8 +29,8 @@ import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.compute.virt.HypervisorEvent
import com.atlarge.opendc.core.services.AbstractServiceKey
-import java.util.UUID
import kotlinx.coroutines.flow.Flow
+import java.util.UUID
/**
* A driver interface for a hypervisor running on some host server and communicating with the central compute service to
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
index 79388bc3..6b2cfc40 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
@@ -1,8 +1,6 @@
package com.atlarge.opendc.compute.virt.service
-import com.atlarge.odcsim.SimulationContext
import com.atlarge.odcsim.flow.EventFlow
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.ServerEvent
@@ -16,29 +14,25 @@ import com.atlarge.opendc.compute.virt.driver.InsufficientMemoryOnServerExceptio
import com.atlarge.opendc.compute.virt.driver.VirtDriver
import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy
import com.atlarge.opendc.core.services.ServiceKey
-import kotlin.coroutines.Continuation
-import kotlin.coroutines.resume
-import kotlin.math.max
-import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.ExperimentalCoroutinesApi
-import kotlinx.coroutines.Job
-import kotlinx.coroutines.delay
+import kotlinx.coroutines.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.suspendCancellableCoroutine
-import kotlinx.coroutines.withContext
import mu.KotlinLogging
+import java.time.Clock
+import kotlin.coroutines.Continuation
+import kotlin.coroutines.resume
+import kotlin.math.max
private val logger = KotlinLogging.logger {}
@OptIn(ExperimentalCoroutinesApi::class)
class SimpleVirtProvisioningService(
- public override val allocationPolicy: AllocationPolicy,
- private val ctx: SimulationContext,
- private val provisioningService: ProvisioningService
-) : VirtProvisioningService, CoroutineScope by ctx.domain {
+ private val coroutineScope: CoroutineScope,
+ private val clock: Clock,
+ private val provisioningService: ProvisioningService,
+ override val allocationPolicy: AllocationPolicy
+) : VirtProvisioningService {
/**
* The hypervisors that have been launched by the service.
*/
@@ -59,11 +53,11 @@ class SimpleVirtProvisioningService(
*/
private val activeImages: MutableSet<ImageView> = mutableSetOf()
- public var submittedVms = 0
- public var queuedVms = 0
- public var runningVms = 0
- public var finishedVms = 0
- public var unscheduledVms = 0
+ var submittedVms = 0
+ var queuedVms = 0
+ var runningVms = 0
+ var finishedVms = 0
+ var unscheduledVms = 0
private var maxCores = 0
private var maxMemory = 0L
@@ -81,7 +75,7 @@ class SimpleVirtProvisioningService(
override val events: Flow<VirtProvisioningEvent> = eventFlow
init {
- launch {
+ coroutineScope.launch {
val provisionedNodes = provisioningService.nodes()
provisionedNodes.forEach { node ->
val hypervisorImage = HypervisorImage
@@ -96,27 +90,29 @@ class SimpleVirtProvisioningService(
}
}
- override suspend fun drivers(): Set<VirtDriver> = withContext(coroutineContext) {
- availableHypervisors.map { it.driver }.toSet()
+ override suspend fun drivers(): Set<VirtDriver> {
+ return availableHypervisors.map { it.driver }.toSet()
}
override suspend fun deploy(
name: String,
image: Image,
flavor: Flavor
- ): Server = withContext(coroutineContext) {
- eventFlow.emit(VirtProvisioningEvent.MetricsAvailable(
- this@SimpleVirtProvisioningService,
- hypervisors.size,
- availableHypervisors.size,
- ++submittedVms,
- runningVms,
- finishedVms,
- ++queuedVms,
- unscheduledVms
- ))
-
- suspendCancellableCoroutine<Server> { cont ->
+ ): Server {
+ eventFlow.emit(
+ VirtProvisioningEvent.MetricsAvailable(
+ this@SimpleVirtProvisioningService,
+ hypervisors.size,
+ availableHypervisors.size,
+ ++submittedVms,
+ runningVms,
+ finishedVms,
+ ++queuedVms,
+ unscheduledVms
+ )
+ )
+
+ return suspendCancellableCoroutine<Server> { cont ->
val vmInstance = ImageView(name, image, flavor, cont)
incomingImages += vmInstance
requestCycle()
@@ -139,9 +135,9 @@ class SimpleVirtProvisioningService(
// We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120).
// This is important because the slices of the VMs need to be aligned.
// We calculate here the delay until the next scheduling slot.
- val delay = quantum - (ctx.clock.millis() % quantum)
+ val delay = quantum - (clock.millis() % quantum)
- val call = launch {
+ val call = coroutineScope.launch {
delay(delay)
this@SimpleVirtProvisioningService.call = null
schedule()
@@ -150,7 +146,6 @@ class SimpleVirtProvisioningService(
}
private suspend fun schedule() {
- val clock = simulationContext.clock
val imagesToBeScheduled = incomingImages.toSet()
for (imageInstance in imagesToBeScheduled) {
@@ -159,16 +154,18 @@ class SimpleVirtProvisioningService(
if (selectedHv == null) {
if (requiredMemory > maxMemory || imageInstance.flavor.cpuCount > maxCores) {
- eventFlow.emit(VirtProvisioningEvent.MetricsAvailable(
- this@SimpleVirtProvisioningService,
- hypervisors.size,
- availableHypervisors.size,
- submittedVms,
- runningVms,
- finishedVms,
- queuedVms,
- ++unscheduledVms
- ))
+ eventFlow.emit(
+ VirtProvisioningEvent.MetricsAvailable(
+ this@SimpleVirtProvisioningService,
+ hypervisors.size,
+ availableHypervisors.size,
+ submittedVms,
+ runningVms,
+ finishedVms,
+ queuedVms,
+ ++unscheduledVms
+ )
+ )
incomingImages -= imageInstance
@@ -180,7 +177,7 @@ class SimpleVirtProvisioningService(
}
try {
- logger.info { "[${ctx.clock.millis()}] Spawning ${imageInstance.image} on ${selectedHv.server.uid} ${selectedHv.server.name} ${selectedHv.server.flavor}" }
+ logger.info { "[${clock.millis()}] Spawning ${imageInstance.image} on ${selectedHv.server.uid} ${selectedHv.server.name} ${selectedHv.server.flavor}" }
incomingImages -= imageInstance
// Speculatively update the hypervisor view information to prevent other images in the queue from
@@ -197,16 +194,18 @@ class SimpleVirtProvisioningService(
imageInstance.server = server
imageInstance.continuation.resume(server)
- eventFlow.emit(VirtProvisioningEvent.MetricsAvailable(
- this@SimpleVirtProvisioningService,
- hypervisors.size,
- availableHypervisors.size,
- submittedVms,
- ++runningVms,
- finishedVms,
- --queuedVms,
- unscheduledVms
- ))
+ eventFlow.emit(
+ VirtProvisioningEvent.MetricsAvailable(
+ this@SimpleVirtProvisioningService,
+ hypervisors.size,
+ availableHypervisors.size,
+ submittedVms,
+ ++runningVms,
+ finishedVms,
+ --queuedVms,
+ unscheduledVms
+ )
+ )
activeImages += imageInstance
server.events
@@ -214,18 +213,20 @@ class SimpleVirtProvisioningService(
when (event) {
is ServerEvent.StateChanged -> {
if (event.server.state == ServerState.SHUTOFF) {
- logger.info { "[${ctx.clock.millis()}] Server ${event.server.uid} ${event.server.name} ${event.server.flavor} finished." }
-
- eventFlow.emit(VirtProvisioningEvent.MetricsAvailable(
- this@SimpleVirtProvisioningService,
- hypervisors.size,
- availableHypervisors.size,
- submittedVms,
- --runningVms,
- ++finishedVms,
- queuedVms,
- unscheduledVms
- ))
+ logger.info { "[${clock.millis()}] Server ${event.server.uid} ${event.server.name} ${event.server.flavor} finished." }
+
+ eventFlow.emit(
+ VirtProvisioningEvent.MetricsAvailable(
+ this@SimpleVirtProvisioningService,
+ hypervisors.size,
+ availableHypervisors.size,
+ submittedVms,
+ --runningVms,
+ ++finishedVms,
+ queuedVms,
+ unscheduledVms
+ )
+ )
activeImages -= imageInstance
selectedHv.provisionedCores -= server.flavor.cpuCount
@@ -238,7 +239,7 @@ class SimpleVirtProvisioningService(
}
}
}
- .launchIn(this)
+ .launchIn(coroutineScope)
} catch (e: InsufficientMemoryOnServerException) {
logger.error("Failed to deploy VM", e)
@@ -254,7 +255,7 @@ class SimpleVirtProvisioningService(
private fun stateChanged(server: Server) {
when (server.state) {
ServerState.ACTIVE -> {
- logger.debug { "[${ctx.clock.millis()}] Server ${server.uid} available: ${server.state}" }
+ logger.debug { "[${clock.millis()}] Server ${server.uid} available: ${server.state}" }
if (server in hypervisors) {
// Corner case for when the hypervisor already exists
@@ -272,16 +273,18 @@ class SimpleVirtProvisioningService(
hypervisors[server] = hv
}
- eventFlow.emit(VirtProvisioningEvent.MetricsAvailable(
- this@SimpleVirtProvisioningService,
- hypervisors.size,
- availableHypervisors.size,
- submittedVms,
- runningVms,
- finishedVms,
- queuedVms,
- unscheduledVms
- ))
+ eventFlow.emit(
+ VirtProvisioningEvent.MetricsAvailable(
+ this@SimpleVirtProvisioningService,
+ hypervisors.size,
+ availableHypervisors.size,
+ submittedVms,
+ runningVms,
+ finishedVms,
+ queuedVms,
+ unscheduledVms
+ )
+ )
// Re-schedule on the new machine
if (incomingImages.isNotEmpty()) {
@@ -289,20 +292,22 @@ class SimpleVirtProvisioningService(
}
}
ServerState.SHUTOFF, ServerState.ERROR -> {
- logger.debug { "[${ctx.clock.millis()}] Server ${server.uid} unavailable: ${server.state}" }
+ logger.debug { "[${clock.millis()}] Server ${server.uid} unavailable: ${server.state}" }
val hv = hypervisors[server] ?: return
availableHypervisors -= hv
- eventFlow.emit(VirtProvisioningEvent.MetricsAvailable(
- this@SimpleVirtProvisioningService,
- hypervisors.size,
- availableHypervisors.size,
- submittedVms,
- runningVms,
- finishedVms,
- queuedVms,
- unscheduledVms
- ))
+ eventFlow.emit(
+ VirtProvisioningEvent.MetricsAvailable(
+ this@SimpleVirtProvisioningService,
+ hypervisors.size,
+ availableHypervisors.size,
+ submittedVms,
+ runningVms,
+ finishedVms,
+ queuedVms,
+ unscheduledVms
+ )
+ )
if (incomingImages.isNotEmpty()) {
requestCycle()
@@ -318,16 +323,18 @@ class SimpleVirtProvisioningService(
hv.driver = server.services[VirtDriver]
availableHypervisors += hv
- eventFlow.emit(VirtProvisioningEvent.MetricsAvailable(
- this@SimpleVirtProvisioningService,
- hypervisors.size,
- availableHypervisors.size,
- submittedVms,
- runningVms,
- finishedVms,
- queuedVms,
- unscheduledVms
- ))
+ eventFlow.emit(
+ VirtProvisioningEvent.MetricsAvailable(
+ this@SimpleVirtProvisioningService,
+ hypervisors.size,
+ availableHypervisors.size,
+ submittedVms,
+ runningVms,
+ finishedVms,
+ queuedVms,
+ unscheduledVms
+ )
+ )
hv.driver.events
.onEach { event ->
@@ -335,7 +342,7 @@ class SimpleVirtProvisioningService(
hv.numberOfActiveServers = event.numberOfActiveServers
hv.availableMemory = event.availableMemory
}
- }.launchIn(this)
+ }.launchIn(coroutineScope)
requestCycle()
}
diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt
index 1c7b751c..417db77d 100644
--- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt
+++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt
@@ -24,10 +24,10 @@
package com.atlarge.opendc.compute.core.image
-import java.util.UUID
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
+import java.util.UUID
/**
* Test suite for [FlopsApplicationImage]
diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
index af9d3421..80c9c547 100644
--- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
+++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
@@ -31,8 +31,6 @@ import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.ServerEvent
import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.core.image.FlopsApplicationImage
-import java.util.ServiceLoader
-import java.util.UUID
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
@@ -41,6 +39,8 @@ import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
+import java.util.ServiceLoader
+import java.util.UUID
internal class SimpleBareMetalDriverTest {
/**
@@ -56,7 +56,7 @@ internal class SimpleBareMetalDriverTest {
val dom = root.newDomain(name = "driver")
val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4)
val cpus = List(4) { ProcessingUnit(cpuNode, it, 2400.0) }
- val driver = SimpleBareMetalDriver(dom, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
+ val driver = SimpleBareMetalDriver(dom, simulationContext.clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1_000, 2)
// Batch driver commands
diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt
index ed2256c0..37cd5898 100644
--- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt
+++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt
@@ -25,17 +25,18 @@
package com.atlarge.opendc.compute.metal.service
import com.atlarge.odcsim.SimulationEngineProvider
+import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.ProcessingNode
import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.image.FlopsApplicationImage
import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver
-import java.util.ServiceLoader
-import java.util.UUID
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Test
+import java.util.ServiceLoader
+import java.util.UUID
/**
* Test suite for the [SimpleProvisioningService].
@@ -50,14 +51,15 @@ internal class SimpleProvisioningServiceTest {
val system = provider("sim")
val root = system.newDomain(name = "root")
root.launch {
+ val clock = simulationContext.clock
val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1000, 2)
val dom = root.newDomain("provisioner")
val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4)
val cpus = List(4) { ProcessingUnit(cpuNode, it, 2400.0) }
- val driver = SimpleBareMetalDriver(dom.newDomain(), UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
+ val driver = SimpleBareMetalDriver(dom.newDomain(), clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
- val provisioner = SimpleProvisioningService(dom)
+ val provisioner = SimpleProvisioningService()
provisioner.create(driver)
delay(5)
val nodes = provisioner.nodes()
diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt
index 622b185e..528434b1 100644
--- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt
+++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt
@@ -25,6 +25,7 @@
package com.atlarge.opendc.compute.virt
import com.atlarge.odcsim.SimulationEngineProvider
+import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.ProcessingNode
import com.atlarge.opendc.compute.core.ProcessingUnit
@@ -33,8 +34,6 @@ import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment
import com.atlarge.opendc.compute.core.image.VmImage
import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver
import com.atlarge.opendc.compute.virt.driver.VirtDriver
-import java.util.ServiceLoader
-import java.util.UUID
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.launchIn
@@ -45,6 +44,8 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
+import java.util.ServiceLoader
+import java.util.UUID
/**
* Basic test-suite for the hypervisor.
@@ -62,6 +63,7 @@ internal class HypervisorTest {
val root = system.newDomain("root")
root.launch {
+ val clock = simulationContext.clock
val vmm = HypervisorImage
val workloadA = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1_000, 1)
val workloadB = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 2_000, 1)
@@ -70,7 +72,7 @@ internal class HypervisorTest {
val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 1)
val cpus = List(1) { ProcessingUnit(cpuNode, it, 2000.0) }
- val metalDriver = SimpleBareMetalDriver(driverDom, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
+ val metalDriver = SimpleBareMetalDriver(driverDom, clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
metalDriver.init()
metalDriver.setImage(vmm)
@@ -108,26 +110,41 @@ internal class HypervisorTest {
var overcommissionedBurst = 0L
root.launch {
+ val clock = simulationContext.clock
val vmm = HypervisorImage
val duration = 5 * 60L
- val vmImageA = VmImage(UUID.randomUUID(), "<unnamed>", emptyMap(), sequenceOf(
- FlopsHistoryFragment(0, 28L * duration, duration * 1000, 28.0, 2),
- FlopsHistoryFragment(0, 3500L * duration, duration * 1000, 3500.0, 2),
- FlopsHistoryFragment(0, 0, duration * 1000, 0.0, 2),
- FlopsHistoryFragment(0, 183L * duration, duration * 1000, 183.0, 2)
- ), 2, 0)
- val vmImageB = VmImage(UUID.randomUUID(), "<unnamed>", emptyMap(), sequenceOf(
- FlopsHistoryFragment(0, 28L * duration, duration * 1000, 28.0, 2),
- FlopsHistoryFragment(0, 3100L * duration, duration * 1000, 3100.0, 2),
- FlopsHistoryFragment(0, 0, duration * 1000, 0.0, 2),
- FlopsHistoryFragment(0, 73L * duration, duration * 1000, 73.0, 2)
- ), 2, 0)
+ val vmImageA = VmImage(
+ UUID.randomUUID(),
+ "<unnamed>",
+ emptyMap(),
+ sequenceOf(
+ FlopsHistoryFragment(0, 28L * duration, duration * 1000, 28.0, 2),
+ FlopsHistoryFragment(0, 3500L * duration, duration * 1000, 3500.0, 2),
+ FlopsHistoryFragment(0, 0, duration * 1000, 0.0, 2),
+ FlopsHistoryFragment(0, 183L * duration, duration * 1000, 183.0, 2)
+ ),
+ 2,
+ 0
+ )
+ val vmImageB = VmImage(
+ UUID.randomUUID(),
+ "<unnamed>",
+ emptyMap(),
+ sequenceOf(
+ FlopsHistoryFragment(0, 28L * duration, duration * 1000, 28.0, 2),
+ FlopsHistoryFragment(0, 3100L * duration, duration * 1000, 3100.0, 2),
+ FlopsHistoryFragment(0, 0, duration * 1000, 0.0, 2),
+ FlopsHistoryFragment(0, 73L * duration, duration * 1000, 73.0, 2)
+ ),
+ 2,
+ 0
+ )
val driverDom = root.newDomain("driver")
val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2)
val cpus = List(2) { ProcessingUnit(cpuNode, it, 3200.0) }
- val metalDriver = SimpleBareMetalDriver(driverDom, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
+ val metalDriver = SimpleBareMetalDriver(driverDom, clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
metalDriver.init()
metalDriver.setImage(vmm)