summaryrefslogtreecommitdiff
path: root/opendc
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-05-17 18:30:54 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-05-18 22:58:12 +0200
commitdf5d9363e4e3558cb6e2f7f421412548b6d7d36a (patch)
treeb568fc48418b2146c989d7e519073d96e5d13073 /opendc
parentee494d6ce6f817cf4e9ab0dba0d9f9f1987c0029 (diff)
perf: Batch slice submission
Diffstat (limited to 'opendc')
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt113
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt7
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt2
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt3
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt200
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt19
-rw-r--r--opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt12
-rw-r--r--opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt1
8 files changed, 270 insertions, 87 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt
index 663fa5e4..027ba410 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt
@@ -51,36 +51,113 @@ public interface ServerContext {
public suspend fun <T : Any> publishService(key: ServiceKey<T>, service: T)
/**
- * Request the specified burst time from the processor cores and suspend execution until a processor core finishes
- * processing a **non-zero** burst or until the deadline is reached.
+ * Ask the processor cores to run the specified [slice] and suspend execution until the trigger condition is met as
+ * specified by [triggerMode].
*
- * After the method returns, [burst] will contain the remaining burst length for each of the cores (which may be
- * zero).
+ * After the method returns, [Slice.burst] will contain the remaining burst length for each of the cores (which
+ * may be zero). These changes may happen anytime during execution of this method and callers should not rely on
+ * the timing of this change.
*
- * Both [burst] and [limit] must be of the same size and in any other case the method will throw an
- * [IllegalArgumentException].
+ * @param slice The representation of work to run on the processors.
+ * @param triggerMode The trigger condition to resume execution.
+ */
+ public suspend fun run(slice: Slice, triggerMode: TriggerMode = TriggerMode.FIRST) =
+ select<Unit> { onRun(slice, triggerMode).invoke {} }
+
+ /**
+ * Ask the processors cores to run the specified [batch] of work slices and suspend execution until the trigger
+ * condition is met as specified by [triggerMode].
*
- * @param burst The burst time to request from each of the processor cores.
- * @param limit The maximum usage in terms of MHz that the processing core may use while running the burst.
- * @param deadline The instant at which this request needs to be fulfilled.
+ * After the method returns, [Slice.burst] will contain the remaining burst length for each of the cores (which
+ * may be zero). These changes may happen anytime during execution of this method and callers should not rely on
+ * the timing of this change.
+ *
+ * In case slices in the batch do not finish processing before their deadline, [merge] is called to merge these
+ * slices with the next slice to be executed.
+ *
+ * @param batch The batch of work to run on the processors.
+ * @param triggerMode The trigger condition to resume execution.
+ * @param merge The merge function for consecutive slices in case the last slice was not completed within its
+ * deadline.
*/
- public suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) {
- select<Unit> { onRun(burst, limit, deadline).invoke {} }
- }
+ public suspend fun run(
+ batch: List<Slice>,
+ triggerMode: TriggerMode = TriggerMode.FIRST,
+ merge: (Slice, Slice) -> Slice = { _, r -> r }
+ ) = select<Unit> { onRun(batch, triggerMode, merge).invoke {} }
+
+ /**
+ * Ask the processor cores to run the specified [slice] and select when the trigger condition is met as specified
+ * by [triggerMode].
+ *
+ * After the method returns, [Slice.burst] will contain the remaining burst length for each of the cores (which
+ * may be zero). These changes may happen anytime during execution of this method and callers should not rely on
+ * the timing of this change.
+ *
+ * @param slice The representation of work to request from the processors.
+ * @param triggerMode The trigger condition to resume execution.
+ */
+ public fun onRun(slice: Slice, triggerMode: TriggerMode = TriggerMode.FIRST): SelectClause0 =
+ onRun(listOf(slice), triggerMode)
/**
- * Request the specified burst time from the processor cores and suspend execution until a processor core finishes
- * processing a **non-zero** burst or until the deadline is reached.
+ * Ask the processors cores to run the specified [batch] of work slices and select when the trigger condition is met
+ * as specified by [triggerMode].
+ *
+ * After the method returns, [Slice.burst] will contain the remaining burst length for each of the cores (which
+ * may be zero). These changes may happen anytime during execution of this method and callers should not rely on
+ * the timing of this change.
*
- * After the method returns, [burst] will contain the remaining burst length for each of the cores (which may be
- * zero).
+ * In case slices in the batch do not finish processing before their deadline, [merge] is called to merge these
+ * slices with the next slice to be executed.
+ *
+ * @param batch The batch of work to run on the processors.
+ * @param triggerMode The trigger condition to resume execution during the **last** slice.
+ * @param merge The merge function for consecutive slices in case the last slice was not completed within its
+ * deadline.
+ */
+ public fun onRun(
+ batch: List<Slice>,
+ triggerMode: TriggerMode = TriggerMode.FIRST,
+ merge: (Slice, Slice) -> Slice = { _, r -> r }
+ ): SelectClause0
+
+ /**
+ * A request to the host machine for a slice of CPU time from the processor cores.
*
* Both [burst] and [limit] must be of the same size and in any other case the method will throw an
* [IllegalArgumentException].
*
+ *
* @param burst The burst time to request from each of the processor cores.
* @param limit The maximum usage in terms of MHz that the processing core may use while running the burst.
- * @param deadline The instant at which this request needs to be fulfilled.
+ * @param deadline The instant at which this slice needs to be fulfilled.
*/
- public fun onRun(burst: LongArray, limit: DoubleArray, deadline: Long): SelectClause0
+ public class Slice(val burst: LongArray, val limit: DoubleArray, val deadline: Long) {
+ init {
+ require(burst.size == limit.size) { "Incompatible array dimensions" }
+ }
+ }
+
+ /**
+ * The modes for triggering a machine exit from the machine.
+ */
+ public enum class TriggerMode {
+ /**
+ * A machine exit occurs when either the first processor finishes processing a **non-zero** burst or the
+ * deadline is reached.
+ */
+ FIRST,
+
+ /**
+ * A machine exit occurs when either the last processor finishes processing a **non-zero** burst or the deadline
+ * is reached.
+ */
+ LAST,
+
+ /**
+ * A machine exit occurs only when the deadline is reached.
+ */
+ DEADLINE
+ }
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt
index e77b55a6..d65e7e94 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt
@@ -26,9 +26,7 @@ package com.atlarge.opendc.compute.core.image
import com.atlarge.opendc.compute.core.execution.ServerContext
import com.atlarge.opendc.core.resource.TagContainer
-import kotlinx.coroutines.ensureActive
import java.util.UUID
-import kotlin.coroutines.coroutineContext
import kotlin.math.min
/**
@@ -64,9 +62,6 @@ data class FlopsApplicationImage(
val burst = LongArray(cores) { flops / cores }
val maxUsage = DoubleArray(cores) { i -> ctx.cpus[i].frequency * utilization }
- while (burst.any { it != 0L }) {
- coroutineContext.ensureActive()
- ctx.run(burst, maxUsage, Long.MAX_VALUE)
- }
+ ctx.run(ServerContext.Slice(burst, maxUsage, Long.MAX_VALUE), triggerMode = ServerContext.TriggerMode.LAST)
}
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
index 36bbfa45..30a091b1 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
@@ -33,7 +33,7 @@ class VmImage(
val burst = LongArray(cores) { fragment.flops / cores }
val usage = DoubleArray(cores) { fragment.usage / cores }
- ctx.run(burst, usage, clock.millis() + fragment.duration)
+ ctx.run(ServerContext.Slice(burst, usage, clock.millis() + fragment.duration))
}
}
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt
index 41cec291..a84fb905 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt
@@ -43,8 +43,7 @@ public interface BareMetalDriver : Powerable, FailureDomain {
public val node: Flow<Node>
/**
- * The amount of work done by the machine in percentage with respect to the total amount of processing power
- * available.
+ * The amount of work done by the machine in MHz.
*/
public val usage: Flow<Double>
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
index 08f04760..2d885a8c 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
@@ -28,12 +28,14 @@ import com.atlarge.odcsim.Domain
import com.atlarge.odcsim.SimulationContext
import com.atlarge.odcsim.flow.EventFlow
import com.atlarge.odcsim.flow.StateFlow
+import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.MemoryUnit
import com.atlarge.opendc.compute.core.ServerEvent
import com.atlarge.opendc.compute.core.ServerState
+import com.atlarge.opendc.compute.core.execution.ServerContext
import com.atlarge.opendc.compute.core.execution.ServerManagementContext
import com.atlarge.opendc.compute.core.execution.ShutdownException
import com.atlarge.opendc.compute.core.image.EmptyImage
@@ -48,9 +50,11 @@ import com.atlarge.opendc.core.services.ServiceRegistry
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Delay
import kotlinx.coroutines.DisposableHandle
+import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.debounce
import kotlinx.coroutines.intrinsics.startCoroutineCancellable
import kotlinx.coroutines.launch
import kotlinx.coroutines.selects.SelectClause0
@@ -62,6 +66,7 @@ import kotlin.math.min
import kotlinx.coroutines.withContext
import java.lang.Exception
import kotlin.coroutines.ContinuationInterceptor
+import kotlin.math.round
import kotlin.random.Random
/**
@@ -113,7 +118,11 @@ public class SimpleBareMetalDriver(
override val node: Flow<Node> = nodeState
+ @OptIn(FlowPreview::class)
override val usage: Flow<Double> = usageState
+ // Debounce changes for 1 ms to prevent emitting two values during the same instant (e.g. slices finishes and
+ // new slice starts).
+ .debounce(1)
override val powerDraw: Flow<Double> = powerModel(this)
@@ -252,79 +261,168 @@ public class SimpleBareMetalDriver(
setNode(nodeState.value.copy(state = newNodeState, server = server))
}
- private var flush: DisposableHandle? = null
-
@OptIn(InternalCoroutinesApi::class)
- override fun onRun(burst: LongArray, limit: DoubleArray, deadline: Long): SelectClause0 {
- require(burst.size == limit.size) { "Array dimensions do not match" }
+ override fun onRun(
+ batch: List<ServerContext.Slice>,
+ triggerMode: ServerContext.TriggerMode,
+ merge: (ServerContext.Slice, ServerContext.Slice) -> ServerContext.Slice
+ ): SelectClause0 {
assert(!finalized) { "Server instance is already finalized" }
return object : SelectClause0 {
@InternalCoroutinesApi
override fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R) {
- // If run is called in at the same timestamp as the previous call, cancel the load flush
- flush?.dispose()
- flush = null
-
val context = select.completion.context
- val simulationContext = context[SimulationContext]!!
+ val clock = context[SimulationContext]!!.clock
val delay = context[ContinuationInterceptor] as Delay
- val start = simulationContext.clock.millis()
- var duration = max(0, deadline - start)
- var totalUsage = 0.0
+ val queue = batch.iterator()
+ var start = Long.MIN_VALUE
+ var currentWork: SliceWork? = null
+ var currentDisposable: DisposableHandle? = null
- // Determine the duration of the first CPU to finish
- for (i in 0 until min(cpus.size, burst.size)) {
- val cpu = cpus[i]
- val usage = min(limit[i], cpu.frequency)
- val cpuDuration = ceil(burst[i] / usage * 1000).toLong() // Convert from seconds to milliseconds
+ fun schedule(slice: ServerContext.Slice) {
+ start = clock.millis()
- totalUsage += usage / cpu.frequency
-
- if (cpuDuration != 0L) { // We only wait for processor cores with a non-zero burst
- duration = min(duration, cpuDuration)
+ val isLastSlice = !queue.hasNext()
+ val work = SliceWork(slice)
+ val duration = when (triggerMode) {
+ ServerContext.TriggerMode.FIRST -> min(work.minExit, slice.deadline - start)
+ ServerContext.TriggerMode.LAST -> min(work.maxExit, slice.deadline - start)
+ ServerContext.TriggerMode.DEADLINE -> slice.deadline - start
}
- }
- if (!unavailable) {
- delay.invokeOnTimeout(1, Runnable {
- usageState.value = totalUsage / cpus.size
- })
- }
+ val action = Runnable {
+ currentWork = null
+
- val action = Runnable {
- // todo: we could have replaced startCoroutine with startCoroutineUndispatched
- // But we need a way to know that Delay.invokeOnTimeout had used the right thread
- if (select.trySelect()) {
- block.startCoroutineCancellable(select.completion) // shall be cancellable while waits for dispatch
+ // Flush all the work that was performed
+ val hasFinished = work.stop(duration)
+
+ if (!isLastSlice) {
+ val candidateSlice = queue.next()
+ val nextSlice =
+ // If our previous slice exceeds its deadline, merge it with the next candidate slice
+ if (hasFinished)
+ candidateSlice
+ else
+ merge(candidateSlice, slice)
+ schedule(nextSlice)
+ } else if (select.trySelect()) {
+ block.startCoroutineCancellable(select.completion)
+ }
}
+
+ // Schedule the flush after the entire slice has finished
+ currentDisposable = delay.invokeOnTimeout(duration, action)
+
+ // Start the slice work
+ currentWork = work
+ work.start()
}
- val disposable = delay.invokeOnTimeout(duration, action)
- val flush = DisposableHandle {
- val end = simulationContext.clock.millis()
-
- // Flush the load if they do not receive a new run call for the same timestamp
- flush = delay.invokeOnTimeout(1, Runnable {
- usageState.value = 0.0
- flush = null
- })
-
- if (!unavailable) {
- // Write back the remaining burst time
- for (i in 0 until min(cpus.size, burst.size)) {
- val usage = min(limit[i], cpus[i].frequency)
- val granted = ceil((end - start) / 1000.0 * usage).toLong()
- burst[i] = max(0, burst[i] - granted)
- }
+ // Schedule the first work
+ if (queue.hasNext()) {
+ schedule(queue.next())
+
+ // A DisposableHandle to flush the work in case the call is cancelled
+ val flush = DisposableHandle {
+ val end = clock.millis()
+ val duration = end - start
+
+ currentWork?.stop(duration)
+ currentDisposable?.dispose()
}
- disposable.dispose()
+ select.disposeOnSelect(flush)
+ } else if (select.trySelect()) {
+ // No work has been given: select immediately
+ block.startCoroutineCancellable(select.completion)
}
+ }
+ }
+ }
+
+ /**
+ * A slice to be processed.
+ */
+ private inner class SliceWork(val slice: ServerContext.Slice) {
+ /**
+ * The duration after which the first processor finishes processing this slice.
+ */
+ public val minExit: Long
+
+ /**
+ * The duration after which the last processor finishes processing this slice.
+ */
+ public val maxExit: Long
+
+ /**
+ * A flag to indicate that the slice will exceed the deadline.
+ */
+ public val exceedsDeadline: Boolean
+ get() = slice.deadline < maxExit
+
+ /**
+ * The total amount of CPU usage.
+ */
+ public val totalUsage: Double
+
+ init {
+ var totalUsage = 0.0
+ var minExit = Long.MAX_VALUE
+ var maxExit = 0L
+
+ // Determine the duration of the first/last CPU to finish
+ for (i in 0 until min(cpus.size, slice.burst.size)) {
+ val cpu = cpus[i]
+ val usage = min(slice.limit[i], cpu.frequency)
+ val cpuDuration = ceil(slice.burst[i] / usage * 1000).toLong() // Convert from seconds to milliseconds
+
+ totalUsage += usage
+
+ if (cpuDuration != 0L) { // We only wait for processor cores with a non-zero burst
+ minExit = min(minExit, cpuDuration)
+ maxExit = max(maxExit, cpuDuration)
+ }
+ }
+
+ this.totalUsage = totalUsage
+ this.minExit = minExit
+ this.maxExit = maxExit
+ }
+
+ /**
+ * Indicate that the work on the slice has started.
+ */
+ public fun start() {
+ usageState.value = totalUsage
+ }
- select.disposeOnSelect(flush)
+ /**
+ * Flush the work performed on the slice.
+ */
+ public fun stop(duration: Long): Boolean {
+ var hasFinished = true
+
+ // Only flush the work if the machine is available
+ if (!unavailable) {
+ for (i in 0 until min(cpus.size, slice.burst.size)) {
+ val usage = min(slice.limit[i], cpus[i].frequency)
+ val granted = ceil(duration / 1000.0 * usage).toLong()
+ val res = max(0, slice.burst[i] - granted)
+ slice.burst[i] = res
+
+ if (res != 0L) {
+ hasFinished = false
+ }
+ }
}
+
+ // Reset the usage of the machine since the slice has finished
+ usageState.value = 0.0
+
+ return hasFinished
}
}
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
index ce814dd8..9eab3353 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
@@ -265,9 +265,9 @@ class SimpleVirtDriver(
// We run the total burst on the host processor. Note that this call may be cancelled at any moment in
// time, so not all of the burst may be executed.
- val interrupted = select<Boolean> {
+ select<Boolean> {
schedulingQueue.onReceive { schedulingQueue.offer(it); true }
- hostContext.onRun(burst, usage, deadline).invoke { false }
+ hostContext.onRun(ServerContext.Slice(burst, usage, deadline)).invoke { false }
}
val end = clock.millis()
@@ -451,10 +451,9 @@ class SimpleVirtDriver(
events.close()
}
- override suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) {
- require(burst.size == limit.size) { "Array dimensions do not match" }
- this.deadline = deadline
- this.burst = burst
+ override suspend fun run(slice: ServerContext.Slice, triggerMode: ServerContext.TriggerMode) {
+ deadline = slice.deadline
+ burst = slice.burst
val requests = cpus.asSequence()
.take(burst.size)
@@ -463,7 +462,7 @@ class SimpleVirtDriver(
this,
cpu,
burst[i],
- limit[i]
+ slice.limit[i]
)
}
.toList()
@@ -482,6 +481,10 @@ class SimpleVirtDriver(
}
@OptIn(InternalCoroutinesApi::class)
- override fun onRun(burst: LongArray, limit: DoubleArray, deadline: Long): SelectClause0 = TODO()
+ override fun onRun(
+ batch: List<ServerContext.Slice>,
+ triggerMode: ServerContext.TriggerMode,
+ merge: (ServerContext.Slice, ServerContext.Slice) -> ServerContext.Slice
+ ): SelectClause0 = TODO()
}
}
diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
index 0fc64373..1b5d62a2 100644
--- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
+++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
@@ -25,12 +25,15 @@
package com.atlarge.opendc.compute.metal.driver
import com.atlarge.odcsim.SimulationEngineProvider
+import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.ProcessingNode
import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.ServerEvent
import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.core.image.FlopsApplicationImage
import kotlinx.coroutines.flow.collect
+import kotlinx.coroutines.flow.launchIn
+import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
@@ -61,11 +64,18 @@ internal class SimpleBareMetalDriverTest {
driver.init()
driver.setImage(image)
val server = driver.start().server!!
+ driver.usage
+ .onEach { println("${simulationContext.clock.millis()} $it") }
+ .launchIn(this)
server.events.collect { event ->
when (event) {
- is ServerEvent.StateChanged -> { println(event); finalState = event.server.state }
+ is ServerEvent.StateChanged -> {
+ println("${simulationContext.clock.millis()} $event");
+ finalState = event.server.state
+ }
}
}
+
}
}
diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt
index 4f3abc02..318fc279 100644
--- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt
+++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt
@@ -71,6 +71,7 @@ internal class HypervisorTest {
val node = metalDriver.start()
node.server?.events?.onEach { println(it) }?.launchIn(this)
+
delay(5)
val flavor = Flavor(1, 0)