diff options
2 files changed, 45 insertions, 1 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt index 1330158e..57c62c31 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt @@ -28,12 +28,18 @@ import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.metal.Node import com.atlarge.opendc.compute.metal.PowerState +import kotlinx.coroutines.flow.Flow /** * A driver interface for the management interface of a bare-metal compute node. */ public interface BareMetalDriver { /** + * The load of the machine. + */ + public val load: Flow<Double> + + /** * Initialize the driver. */ public suspend fun init(monitor: ServerMonitor): Node diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt index fcdc9363..90080b91 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt @@ -38,8 +38,14 @@ import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.metal.Node import com.atlarge.opendc.compute.metal.PowerState import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.Job +import kotlinx.coroutines.channels.BroadcastChannel +import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.launch import java.util.UUID import kotlin.math.ceil @@ -83,6 +89,19 @@ public class SimpleBareMetalDriver( */ private var job: Job? = null + /** + * The channel containing the load of the server. + */ + @UseExperimental(ExperimentalCoroutinesApi::class) + private val loadChannel = BroadcastChannel<Double>(Channel.CONFLATED) + + @UseExperimental(FlowPreview::class) + override val load: Flow<Double> = loadChannel.asFlow() + + init { + loadChannel.offer(0.0) + } + override suspend fun init(monitor: ServerMonitor): Node = withContext(domain.coroutineContext) { this@SimpleBareMetalDriver.monitor = monitor return@withContext node @@ -167,11 +186,18 @@ public class SimpleBareMetalDriver( domain.launch { monitor.onUpdate(server, previousState) } } + private var flush: Job? = null + override suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) { require(burst.size == limit.size) { "Array dimensions do not match" } + // If run is called in at the same timestamp as the previous call, cancel the load flush + flush?.cancel() + flush = null + val start = simulationContext.clock.millis() var duration = max(0, deadline - start) + var load = 0.0 // Determine the duration of the first CPU to finish for (i in 0 until min(cpus.size, burst.size)) { @@ -179,19 +205,31 @@ public class SimpleBareMetalDriver( val usage = min(limit[i], cpu.frequency) * 1_000_000 // Usage from MHz to Hz val cpuDuration = ceil(burst[i] / usage * 1000).toLong() // Convert from seconds to milliseconds + load += usage / cpu.frequency + if (cpuDuration != 0L) { // We only wait for processor cores with a non-zero burst duration = min(duration, cpuDuration) } } + loadChannel.offer(load) + try { delay(duration) } catch (_: CancellationException) { // On cancellation, we compute and return the remaining burst } - val end = simulationContext.clock.millis() + // Flush the load if the do not receive a new run call for the same timestamp + flush = domain.launch { + delay(1) + loadChannel.offer(0.0) + } + flush!!.invokeOnCompletion { + flush = null + } + // Write back the remaining burst time for (i in 0 until min(cpus.size, burst.size)) { val usage = min(limit[i], cpus[i].frequency) * 1_000_000 |
