summaryrefslogtreecommitdiff
path: root/opendc/opendc-compute/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-05-20 01:41:02 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-05-20 01:41:02 +0200
commit3b3ebd47f433fbf49b4cf476029bb168ca621aac (patch)
treef0e0c76e5e4ba9eec5bd88f87662914db84c1a61 /opendc/opendc-compute/src
parentf24c53dd13c40f46ca03b040bda5fc992d6fa9e3 (diff)
refactor: Provide slice batches as Sequence
Diffstat (limited to 'opendc/opendc-compute/src')
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt6
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt29
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt2
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt4
4 files changed, 16 insertions, 25 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt
index 5a9e74a3..f770fa49 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt
@@ -81,7 +81,7 @@ public interface ServerContext {
* deadline.
*/
public suspend fun run(
- batch: List<Slice>,
+ batch: Sequence<Slice>,
triggerMode: TriggerMode = TriggerMode.FIRST,
merge: (Slice, Slice) -> Slice = { _, r -> r }
) = select<Unit> { onRun(batch, triggerMode, merge).invoke {} }
@@ -98,7 +98,7 @@ public interface ServerContext {
* @param triggerMode The trigger condition to resume execution.
*/
public fun onRun(slice: Slice, triggerMode: TriggerMode = TriggerMode.FIRST): SelectClause0 =
- onRun(listOf(slice), triggerMode)
+ onRun(sequenceOf(slice), triggerMode)
/**
* Ask the processors cores to run the specified [batch] of work slices and select when the trigger condition is met
@@ -117,7 +117,7 @@ public interface ServerContext {
* deadline.
*/
public fun onRun(
- batch: List<Slice>,
+ batch: Sequence<Slice>,
triggerMode: TriggerMode = TriggerMode.FIRST,
merge: (Slice, Slice) -> Slice = { _, r -> r }
): SelectClause0
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
index c0abdd76..c615d865 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
@@ -3,10 +3,7 @@ package com.atlarge.opendc.compute.core.image
import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.execution.ServerContext
import com.atlarge.opendc.core.resource.TagContainer
-import kotlinx.coroutines.Job
-import kotlinx.coroutines.ensureActive
import java.util.UUID
-import kotlin.coroutines.coroutineContext
import kotlin.math.min
class VmImage(
@@ -20,23 +17,17 @@ class VmImage(
override suspend fun invoke(ctx: ServerContext) {
val clock = simulationContext.clock
- val job = coroutineContext[Job]!!
-
- for (fragments in flopsHistory.chunked(1024)) {
- job.ensureActive()
-
- var offset = clock.millis()
-
- val batch = fragments.map { fragment ->
- val cores = min(fragment.cores, ctx.server.flavor.cpuCount)
- val burst = LongArray(cores) { fragment.flops / cores }
- val usage = DoubleArray(cores) { fragment.usage / cores }
- offset += fragment.duration
- ServerContext.Slice(burst, usage, offset)
- }
-
- ctx.run(batch)
+ var offset = clock.millis()
+
+ val batch = flopsHistory.map { fragment ->
+ val cores = min(fragment.cores, ctx.server.flavor.cpuCount)
+ val burst = LongArray(cores) { fragment.flops / cores }
+ val usage = DoubleArray(cores) { fragment.usage / cores }
+ offset += fragment.duration
+ ServerContext.Slice(burst, usage, offset)
}
+
+ ctx.run(batch)
}
override fun toString(): String = "VmImage(uid=$uid, name=$name, cores=$maxCores, requiredMemory=$requiredMemory)"
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
index 48937001..30981e6e 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
@@ -261,7 +261,7 @@ public class SimpleBareMetalDriver(
@OptIn(InternalCoroutinesApi::class)
override fun onRun(
- batch: List<ServerContext.Slice>,
+ batch: Sequence<ServerContext.Slice>,
triggerMode: ServerContext.TriggerMode,
merge: (ServerContext.Slice, ServerContext.Slice) -> ServerContext.Slice
): SelectClause0 {
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
index 6c47ade7..6e335797 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
@@ -406,7 +406,7 @@ class SimpleVirtDriver(
/**
* Schedule the given slices on this vCPU, replacing the existing slices.
*/
- fun schedule(slices: List<ServerContext.Slice>) {
+ fun schedule(slices: Sequence<ServerContext.Slice>) {
queue = slices.iterator()
if (queue.hasNext()) {
@@ -579,7 +579,7 @@ class SimpleVirtDriver(
@OptIn(InternalCoroutinesApi::class)
override fun onRun(
- batch: List<ServerContext.Slice>,
+ batch: Sequence<ServerContext.Slice>,
triggerMode: ServerContext.TriggerMode,
merge: (ServerContext.Slice, ServerContext.Slice) -> ServerContext.Slice
): SelectClause0 = object : SelectClause0 {