summaryrefslogtreecommitdiff
path: root/opendc-simulator/opendc-simulator-compute/src/main
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-10-19 17:27:01 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-10-25 17:58:54 +0200
commit86c65e875b7dde8872dc81a37aa9dca72eee7782 (patch)
tree6249023f8f0d56392400c7ebb72238ee848f740a /opendc-simulator/opendc-simulator-compute/src/main
parentba310a3545c9631e1e4ff61a0a1759228ec5cf63 (diff)
refactor(simulator): Support running workloads without coroutines
This change updates the SimMachine interface to drop the coroutine requirement for running a workload on a machines. Users can now asynchronously start a workload and receive notifications via the workload callbacks. Users still have the possibility to suspend execution during workload execution by using the new `runWorkload` method, which is implemented on top of the new `startWorkload` primitive.
Diffstat (limited to 'opendc-simulator/opendc-simulator-compute/src/main')
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt69
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt115
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt17
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt81
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt9
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt2
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt2
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt2
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt7
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt53
10 files changed, 264 insertions, 93 deletions
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt
new file mode 100644
index 00000000..c23f48dc
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt
@@ -0,0 +1,69 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.compute
+
+import kotlinx.coroutines.suspendCancellableCoroutine
+import org.opendc.simulator.compute.workload.SimWorkload
+import kotlin.coroutines.resume
+import kotlin.coroutines.resumeWithException
+
+/**
+ * Run the specified [SimWorkload] on this machine and suspend execution util [workload] has finished.
+ *
+ * @param workload The workload to start on the machine.
+ * @param meta The metadata to pass to the workload.
+ * @return A [SimMachineContext] that represents the execution context for the workload.
+ * @throws IllegalStateException if a workload is already active on the machine or if the machine is closed.
+ */
+public suspend fun SimMachine.runWorkload(workload: SimWorkload, meta: Map<String, Any> = emptyMap()) {
+ return suspendCancellableCoroutine { cont ->
+ cont.invokeOnCancellation { this@runWorkload.cancel() }
+
+ startWorkload(
+ object : SimWorkload {
+ override fun onStart(ctx: SimMachineContext) {
+ try {
+ workload.onStart(ctx)
+ } catch (cause: Throwable) {
+ cont.resumeWithException(cause)
+ throw cause
+ }
+ }
+
+ override fun onStop(ctx: SimMachineContext) {
+ try {
+ workload.onStop(ctx)
+
+ if (!cont.isCompleted) {
+ cont.resume(Unit)
+ }
+ } catch (cause: Throwable) {
+ cont.resumeWithException(cause)
+ throw cause
+ }
+ }
+ },
+ meta
+ )
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
index 5909d980..6a4c594d 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
@@ -23,6 +23,7 @@
package org.opendc.simulator.compute
import kotlinx.coroutines.*
+import mu.KotlinLogging
import org.opendc.simulator.compute.device.SimNetworkAdapter
import org.opendc.simulator.compute.device.SimPeripheral
import org.opendc.simulator.compute.model.MachineModel
@@ -31,8 +32,6 @@ import org.opendc.simulator.compute.model.NetworkAdapter
import org.opendc.simulator.compute.model.StorageDevice
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.flow.*
-import kotlin.coroutines.Continuation
-import kotlin.coroutines.resume
/**
* Abstract implementation of the [SimMachine] interface.
@@ -72,55 +71,19 @@ public abstract class SimAbstractMachine(
public override val peripherals: List<SimPeripheral> = net.map { it as SimNetworkAdapter }
/**
- * A flag to indicate that the machine is terminated.
- */
- private var isTerminated = false
-
- /**
* The current active [Context].
*/
private var _ctx: Context? = null
- /**
- * This method is invoked when the machine is started.
- */
- protected open fun onStart(ctx: SimMachineContext) {}
-
- /**
- * This method is invoked when the machine is stopped.
- */
- protected open fun onStop(ctx: SimMachineContext) {
- _ctx = null
- }
-
- /**
- * Converge the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
- */
- override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) {
- check(!isTerminated) { "Machine is terminated" }
+ override fun startWorkload(workload: SimWorkload, meta: Map<String, Any>): SimMachineContext {
check(_ctx == null) { "A machine cannot run concurrently" }
- return suspendCancellableCoroutine { cont ->
- val ctx = Context(meta, cont)
- _ctx = ctx
-
- // Cancel all cpus on cancellation
- cont.invokeOnCancellation { ctx.close() }
-
- engine.batch {
- onStart(ctx)
-
- workload.onStart(ctx)
- }
- }
+ val ctx = Context(workload, meta)
+ ctx.start()
+ return ctx
}
- override fun close() {
- if (isTerminated) {
- return
- }
-
- isTerminated = true
+ override fun cancel() {
_ctx?.close()
}
@@ -130,15 +93,33 @@ public abstract class SimAbstractMachine(
/**
* The execution context in which the workload runs.
+ *
+ * @param workload The workload that is running on the machine.
+ * @param meta The metadata passed to the workload.
*/
- private inner class Context(override val meta: Map<String, Any>, private val cont: Continuation<Unit>) : SimMachineContext {
+ private inner class Context(
+ private val workload: SimWorkload,
+ override val meta: Map<String, Any>
+ ) : SimMachineContext {
/**
* A flag to indicate that the context has been closed.
*/
private var isClosed = false
- override val engine: FlowEngine
- get() = this@SimAbstractMachine.engine
+ override val engine: FlowEngine = this@SimAbstractMachine.engine
+
+ /**
+ * Start this context.
+ */
+ fun start() {
+ try {
+ _ctx = this
+ engine.batch { workload.onStart(this) }
+ } catch (cause: Throwable) {
+ logger.warn(cause) { "Workload failed during onStart callback" }
+ close()
+ }
+ }
override val cpus: List<SimProcessingUnit> = this@SimAbstractMachine.cpus
@@ -154,15 +135,43 @@ public abstract class SimAbstractMachine(
}
isClosed = true
+ assert(_ctx == this) { "Invariant violation: multiple contexts active for a single machine" }
+ _ctx = null
+
+ // Cancel all the resources associated with the machine
+ doCancel()
+
+ try {
+ workload.onStop(this)
+ } catch (cause: Throwable) {
+ logger.warn(cause) { "Workload failed during onStop callback" }
+ }
+ }
+
+ /**
+ * Run the stop procedures for the resources associated with the machine.
+ */
+ private fun doCancel() {
engine.batch {
for (cpu in cpus) {
cpu.cancel()
}
- }
- onStop(this)
- cont.resume(Unit)
+ memory.cancel()
+
+ for (ifx in net) {
+ (ifx as NetworkAdapterImpl).disconnect()
+ }
+
+ for (storage in storage) {
+ val impl = storage as StorageDeviceImpl
+ impl.read.cancel()
+ impl.write.cancel()
+ }
+ }
}
+
+ override fun toString(): String = "SimAbstractMachine.Context"
}
/**
@@ -218,4 +227,12 @@ public abstract class SimAbstractMachine(
override fun toString(): String = "SimAbstractMachine.StorageDeviceImpl[name=$name,capacity=$capacity]"
}
+
+ private companion object {
+ /**
+ * The logging instance associated with this class.
+ */
+ @JvmStatic
+ private val logger = KotlinLogging.logger {}
+ }
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt
index ab0b56ae..94581e89 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt
@@ -29,7 +29,7 @@ import org.opendc.simulator.compute.workload.SimWorkload
/**
* A generic machine that is able to run a [SimWorkload].
*/
-public interface SimMachine : AutoCloseable {
+public interface SimMachine {
/**
* The model of the machine containing its specifications.
*/
@@ -41,12 +41,19 @@ public interface SimMachine : AutoCloseable {
public val peripherals: List<SimPeripheral>
/**
- * Converge the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
+ * Start the specified [SimWorkload] on this machine.
+ *
+ * @param workload The workload to start on the machine.
+ * @param meta The metadata to pass to the workload.
+ * @return A [SimMachineContext] that represents the execution context for the workload.
+ * @throws IllegalStateException if a workload is already active on the machine or if the machine is closed.
*/
- public suspend fun run(workload: SimWorkload, meta: Map<String, Any> = emptyMap())
+ public fun startWorkload(workload: SimWorkload, meta: Map<String, Any> = emptyMap()): SimMachineContext
/**
- * Terminate this machine.
+ * Cancel the workload that is currently running on this machine.
+ *
+ * If no workload is active, this operation is a no-op.
*/
- public override fun close()
+ public fun cancel()
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt
index eda59d2d..07465126 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt
@@ -28,6 +28,7 @@ import org.opendc.simulator.compute.kernel.cpufreq.ScalingPolicy
import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.flow.*
import org.opendc.simulator.flow.interference.InterferenceKey
import org.opendc.simulator.flow.mux.FlowMultiplexer
@@ -93,13 +94,20 @@ public abstract class SimAbstractHypervisor(
private val governors = mutableListOf<ScalingGovernor.Logic>()
/* SimHypervisor */
- override fun createMachine(model: MachineModel, interferenceId: String?): SimVirtualMachine {
+ override fun newMachine(model: MachineModel, interferenceId: String?): SimVirtualMachine {
require(canFit(model)) { "Machine does not fit" }
val vm = VirtualMachine(model, interferenceId)
_vms.add(vm)
return vm
}
+ override fun removeMachine(machine: SimVirtualMachine) {
+ if (_vms.remove(machine)) {
+ // This cast must always succeed, since `_vms` only contains `VirtualMachine` types.
+ (machine as VirtualMachine).close()
+ }
+ }
+
/* SimWorkload */
override fun onStart(ctx: SimMachineContext) {
context = ctx
@@ -122,6 +130,8 @@ public abstract class SimAbstractHypervisor(
}
}
+ override fun onStop(ctx: SimMachineContext) {}
+
private var _cpuCount = 0
private var _cpuCapacity = 0.0
@@ -145,11 +155,16 @@ public abstract class SimAbstractHypervisor(
private inner class VirtualMachine(
model: MachineModel,
interferenceId: String? = null
- ) : SimAbstractMachine(engine, parent = null, model), SimVirtualMachine {
+ ) : SimAbstractMachine(engine, parent = null, model), SimVirtualMachine, AutoCloseable {
+ /**
+ * A flag to indicate that the machine is closed.
+ */
+ private var isClosed = false
+
/**
* The interference key of this virtual machine.
*/
- private var interferenceKey: InterferenceKey? = interferenceId?.let { interferenceDomain?.createKey(it) }
+ private val interferenceKey: InterferenceKey? = interferenceId?.let { interferenceDomain?.createKey(it) }
/**
* The vCPUs of the machine.
@@ -181,36 +196,60 @@ public abstract class SimAbstractHypervisor(
override val cpuUsage: Double
get() = cpus.sumOf(FlowConsumer::rate)
- override fun onStart(ctx: SimMachineContext) {
- val interferenceKey = interferenceKey
- if (interferenceKey != null) {
- interferenceDomain?.join(interferenceKey)
- }
-
- super.onStart(ctx)
+ override fun startWorkload(workload: SimWorkload, meta: Map<String, Any>): SimMachineContext {
+ check(!isClosed) { "Machine is closed" }
+
+ return super.startWorkload(
+ object : SimWorkload {
+ override fun onStart(ctx: SimMachineContext) {
+ try {
+ joinInterferenceDomain()
+ workload.onStart(ctx)
+ } catch (cause: Throwable) {
+ leaveInterferenceDomain()
+ throw cause
+ }
+ }
+
+ override fun onStop(ctx: SimMachineContext) {
+ leaveInterferenceDomain()
+ workload.onStop(ctx)
+ }
+ },
+ meta
+ )
}
- override fun onStop(ctx: SimMachineContext) {
- super.onStop(ctx)
-
- val interferenceKey = interferenceKey
- if (interferenceKey != null) {
- interferenceDomain?.leave(interferenceKey)
+ override fun close() {
+ if (isClosed) {
+ return
}
- }
- override fun close() {
- super.close()
+ isClosed = true
+ cancel()
for (cpu in cpus) {
cpu.close()
}
+ }
- _vms.remove(this)
+ /**
+ * Join the interference domain of the hypervisor.
+ */
+ private fun joinInterferenceDomain() {
+ val interferenceKey = interferenceKey
+ if (interferenceKey != null) {
+ interferenceDomain?.join(interferenceKey)
+ }
+ }
+ /**
+ * Leave the interference domain of the hypervisor.
+ */
+ private fun leaveInterferenceDomain() {
val interferenceKey = interferenceKey
if (interferenceKey != null) {
- interferenceDomain?.removeKey(interferenceKey)
+ interferenceDomain?.leave(interferenceKey)
}
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt
index 57d4cf20..a69f419f 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt
@@ -67,5 +67,12 @@ public interface SimHypervisor : SimWorkload {
* @param model The machine to create.
* @param interferenceId An identifier for the interference model.
*/
- public fun createMachine(model: MachineModel, interferenceId: String? = null): SimVirtualMachine
+ public fun newMachine(model: MachineModel, interferenceId: String? = null): SimVirtualMachine
+
+ /**
+ * Remove the specified [machine] from the hypervisor.
+ *
+ * @param machine The machine to remove.
+ */
+ public fun removeMachine(machine: SimVirtualMachine)
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt
index 99f4a1e1..726d1f56 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt
@@ -48,5 +48,7 @@ public class SimFlopsWorkload(
}
}
+ override fun onStop(ctx: SimMachineContext) {}
+
override fun toString(): String = "SimFlopsWorkload(FLOPs=$flops,utilization=$utilization)"
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt
index 2ef3bc43..8a3f5f84 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt
@@ -48,5 +48,7 @@ public class SimRuntimeWorkload(
}
}
+ override fun onStop(ctx: SimMachineContext) {}
+
override fun toString(): String = "SimRuntimeWorkload(duration=$duration,utilization=$utilization)"
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
index 53c98409..ce04a790 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
@@ -40,5 +40,7 @@ public class SimTraceWorkload(private val trace: SimTrace, private val offset: L
}
}
+ override fun onStop(ctx: SimMachineContext) {}
+
override fun toString(): String = "SimTraceWorkload"
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt
index b80665fa..61c6e2ad 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt
@@ -37,4 +37,11 @@ public interface SimWorkload {
* @param ctx The execution context in which the machine runs.
*/
public fun onStart(ctx: SimMachineContext)
+
+ /**
+ * This method is invoked when the workload is stopped.
+ *
+ * @param ctx The execution context in which the machine runs.
+ */
+ public fun onStop(ctx: SimMachineContext)
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt
index cc4f1f6a..742470a1 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt
@@ -33,31 +33,50 @@ public class SimWorkloadLifecycle(private val ctx: SimMachineContext) {
/**
* The resource consumers which represent the lifecycle of the workload.
*/
- private val waiting = mutableSetOf<FlowSource>()
+ private val waiting = HashSet<Wrapper>()
/**
- * Wait for the specified [consumer] to complete before ending the lifecycle of the workload.
+ * Wait for the specified [source] to complete before ending the lifecycle of the workload.
*/
- public fun waitFor(consumer: FlowSource): FlowSource {
- waiting.add(consumer)
- return object : FlowSource by consumer {
- override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
- try {
- consumer.onStop(conn, now, delta)
- } finally {
- complete(consumer)
- }
- }
- override fun toString(): String = "SimWorkloadLifecycle.Consumer[delegate=$consumer]"
- }
+ public fun waitFor(source: FlowSource): FlowSource {
+ val wrapper = Wrapper(source)
+ waiting.add(wrapper)
+ return wrapper
}
/**
- * Complete the specified [FlowSource].
+ * Complete the specified [Wrapper].
*/
- private fun complete(consumer: FlowSource) {
- if (waiting.remove(consumer) && waiting.isEmpty()) {
+ private fun complete(wrapper: Wrapper) {
+ if (waiting.remove(wrapper) && waiting.isEmpty()) {
ctx.close()
}
}
+
+ /**
+ * A [FlowSource] that wraps [delegate] and informs [SimWorkloadLifecycle] that is has completed.
+ */
+ private inner class Wrapper(private val delegate: FlowSource) : FlowSource {
+ override fun onStart(conn: FlowConnection, now: Long) {
+ delegate.onStart(conn, now)
+ }
+
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ return delegate.onPull(conn, now, delta)
+ }
+
+ override fun onConverge(conn: FlowConnection, now: Long, delta: Long) {
+ delegate.onConverge(conn, now, delta)
+ }
+
+ override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
+ try {
+ delegate.onStop(conn, now, delta)
+ } finally {
+ complete(this)
+ }
+ }
+
+ override fun toString(): String = "SimWorkloadLifecycle.Wrapper[delegate=$delegate]"
+ }
}