summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt6
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt40
2 files changed, 45 insertions, 1 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt
index 1330158e..57c62c31 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt
@@ -28,12 +28,18 @@ import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.compute.core.monitor.ServerMonitor
import com.atlarge.opendc.compute.metal.Node
import com.atlarge.opendc.compute.metal.PowerState
+import kotlinx.coroutines.flow.Flow
/**
* A driver interface for the management interface of a bare-metal compute node.
*/
public interface BareMetalDriver {
/**
+ * The load of the machine.
+ */
+ public val load: Flow<Double>
+
+ /**
* Initialize the driver.
*/
public suspend fun init(monitor: ServerMonitor): Node
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
index fcdc9363..90080b91 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
@@ -38,8 +38,14 @@ import com.atlarge.opendc.compute.core.monitor.ServerMonitor
import com.atlarge.opendc.compute.metal.Node
import com.atlarge.opendc.compute.metal.PowerState
import kotlinx.coroutines.CancellationException
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.Job
+import kotlinx.coroutines.channels.BroadcastChannel
+import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
+import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.launch
import java.util.UUID
import kotlin.math.ceil
@@ -83,6 +89,19 @@ public class SimpleBareMetalDriver(
*/
private var job: Job? = null
+ /**
+ * The channel containing the load of the server.
+ */
+ @UseExperimental(ExperimentalCoroutinesApi::class)
+ private val loadChannel = BroadcastChannel<Double>(Channel.CONFLATED)
+
+ @UseExperimental(FlowPreview::class)
+ override val load: Flow<Double> = loadChannel.asFlow()
+
+ init {
+ loadChannel.offer(0.0)
+ }
+
override suspend fun init(monitor: ServerMonitor): Node = withContext(domain.coroutineContext) {
this@SimpleBareMetalDriver.monitor = monitor
return@withContext node
@@ -167,11 +186,18 @@ public class SimpleBareMetalDriver(
domain.launch { monitor.onUpdate(server, previousState) }
}
+ private var flush: Job? = null
+
override suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) {
require(burst.size == limit.size) { "Array dimensions do not match" }
+ // If run is called in at the same timestamp as the previous call, cancel the load flush
+ flush?.cancel()
+ flush = null
+
val start = simulationContext.clock.millis()
var duration = max(0, deadline - start)
+ var load = 0.0
// Determine the duration of the first CPU to finish
for (i in 0 until min(cpus.size, burst.size)) {
@@ -179,19 +205,31 @@ public class SimpleBareMetalDriver(
val usage = min(limit[i], cpu.frequency) * 1_000_000 // Usage from MHz to Hz
val cpuDuration = ceil(burst[i] / usage * 1000).toLong() // Convert from seconds to milliseconds
+ load += usage / cpu.frequency
+
if (cpuDuration != 0L) { // We only wait for processor cores with a non-zero burst
duration = min(duration, cpuDuration)
}
}
+ loadChannel.offer(load)
+
try {
delay(duration)
} catch (_: CancellationException) {
// On cancellation, we compute and return the remaining burst
}
-
val end = simulationContext.clock.millis()
+ // Flush the load if the do not receive a new run call for the same timestamp
+ flush = domain.launch {
+ delay(1)
+ loadChannel.offer(0.0)
+ }
+ flush!!.invokeOnCompletion {
+ flush = null
+ }
+
// Write back the remaining burst time
for (i in 0 until min(cpus.size, burst.size)) {
val usage = min(limit[i], cpus[i].frequency) * 1_000_000