summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-10-28 16:46:47 +0200
committerGitHub <noreply@github.com>2022-10-28 16:46:47 +0200
commitb96acc687f59b698fbc4d4c984d77b008cd4051b (patch)
treeff4d99454259b92ec4484433bb716acd6319faa1
parentc4cfb6f6e0507f335fd88935857f20e88c34abd0 (diff)
parent8bf940eb7b59b5e5e326cfc06d51bdb54393f33b (diff)
Support custom start-up and clean-up time for VMs (#112)
This pull request implements customizable startup and clean-up time for virtual machine. We achieve this by implementing `SimWorkload` chaining and modelling start-up and clean-up time using `SimWorkload` as well. Implements #33 ## Implementation Notes :hammer_and_pick: * Store method parameters in class files * Add completion parameter to startWorkload * Provide workload constructors in SimWorkloads * Add support for resetting machine context * Add support for chaining workloads * Use workload chaining for boot delay * Model host boot time * Do not suspend on guest start * Use static logger field ## Breaking API Changes :warning: * `SimMachine#startWorkload` now has a third parameter `completion` which is invoked when the workload finishes executing (either due to failure or success). * `SimFlopsWorkload` and `SimRuntimeWorkload` can be instantiated via `SimWorkloads`.
-rw-r--r--buildSrc/src/main/kotlin/java-conventions.gradle.kts4
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt11
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt64
-rw-r--r--opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt4
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt115
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimWorkloadMapper.kt1
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/DefaultWorkloadMapper.kt (renamed from opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.java)46
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt81
-rw-r--r--opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt87
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt22
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt3
-rw-r--r--opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt4
-rw-r--r--opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt4
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimAbstractMachine.java77
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimBareMetalMachine.java16
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimMachine.java4
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimMachineContext.java5
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/kernel/SimHypervisor.java19
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java182
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimFlopsWorkload.java4
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimRuntimeWorkload.java4
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkloads.java70
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt29
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt14
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt13
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimChainWorkloadTest.kt176
-rw-r--r--opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt2
27 files changed, 767 insertions, 294 deletions
diff --git a/buildSrc/src/main/kotlin/java-conventions.gradle.kts b/buildSrc/src/main/kotlin/java-conventions.gradle.kts
index a639a9e1..8857d4ab 100644
--- a/buildSrc/src/main/kotlin/java-conventions.gradle.kts
+++ b/buildSrc/src/main/kotlin/java-conventions.gradle.kts
@@ -34,3 +34,7 @@ java {
sourceCompatibility = Libs.jvmTarget
targetCompatibility = Libs.jvmTarget
}
+
+tasks.withType<JavaCompile> {
+ options.compilerArgs.add("-parameters")
+}
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt
index fad8757e..efcc0f2c 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt
@@ -70,11 +70,8 @@ public interface Host {
/**
* Register the specified [instance][server] on the host.
- *
- * Once the method returns, the instance should be running if [start] is true or else the instance should be
- * stopped.
*/
- public suspend fun spawn(server: Server, start: Boolean = true)
+ public fun spawn(server: Server)
/**
* Determine whether the specified [instance][server] exists on the host.
@@ -86,19 +83,19 @@ public interface Host {
*
* @throws IllegalArgumentException if the server is not present on the host.
*/
- public suspend fun start(server: Server)
+ public fun start(server: Server)
/**
* Stop the server [instance][server] if it is currently running on this host.
*
* @throws IllegalArgumentException if the server is not present on the host.
*/
- public suspend fun stop(server: Server)
+ public fun stop(server: Server)
/**
* Delete the specified [instance][server] on this host and cleanup all resources associated with it.
*/
- public suspend fun delete(server: Server)
+ public fun delete(server: Server)
/**
* Add a [HostListener] to this host.
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
index 0fe016aa..b377c3e3 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
@@ -22,11 +22,6 @@
package org.opendc.compute.service.internal
-import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.Job
-import kotlinx.coroutines.cancel
-import kotlinx.coroutines.isActive
-import kotlinx.coroutines.launch
import mu.KotlinLogging
import org.opendc.common.util.Pacer
import org.opendc.compute.api.ComputeClient
@@ -53,23 +48,18 @@ import kotlin.math.max
/**
* Internal implementation of the OpenDC Compute service.
*
- * @param context The [CoroutineContext] to use in the service.
+ * @param coroutineContext The [CoroutineContext] to use in the service.
* @param clock The clock instance to use.
* @param scheduler The scheduler implementation to use.
* @param schedulingQuantum The interval between scheduling cycles.
*/
internal class ComputeServiceImpl(
- private val context: CoroutineContext,
+ coroutineContext: CoroutineContext,
private val clock: Clock,
private val scheduler: ComputeScheduler,
schedulingQuantum: Duration
) : ComputeService, HostListener {
/**
- * The [CoroutineScope] of the service bounded by the lifecycle of the service.
- */
- private val scope = CoroutineScope(context + Job())
-
- /**
* The logger instance of this server.
*/
private val logger = KotlinLogging.logger {}
@@ -115,6 +105,9 @@ internal class ComputeServiceImpl(
private val serverById = mutableMapOf<UUID, InternalServer>()
override val servers: MutableList<Server> = mutableListOf()
+ override val hosts: Set<Host>
+ get() = hostToView.keys
+
private var maxCores = 0
private var maxMemory = 0L
private var _attemptsSuccess = 0L
@@ -122,17 +115,15 @@ internal class ComputeServiceImpl(
private var _attemptsError = 0L
private var _serversPending = 0
private var _serversActive = 0
+ private var isClosed = false
/**
* The [Pacer] to use for scheduling the scheduler cycles.
*/
- private val pacer = Pacer(scope.coroutineContext, clock, schedulingQuantum.toMillis()) { doSchedule() }
-
- override val hosts: Set<Host>
- get() = hostToView.keys
+ private val pacer = Pacer(coroutineContext, clock, schedulingQuantum.toMillis()) { doSchedule() }
override fun newClient(): ComputeClient {
- check(scope.isActive) { "Service is already closed" }
+ check(!isClosed) { "Service is already closed" }
return object : ComputeClient {
private var isClosed: Boolean = false
@@ -285,7 +276,12 @@ internal class ComputeServiceImpl(
}
override fun close() {
- scope.cancel()
+ if (isClosed) {
+ return
+ }
+
+ isClosed = true
+ pacer.cancel()
}
override fun getSchedulerStats(): SchedulerStats {
@@ -379,29 +375,23 @@ internal class ComputeServiceImpl(
logger.info { "Assigned server $server to host $host." }
- // Speculatively update the hypervisor view information to prevent other images in the queue from
- // deciding on stale values.
- hv.instanceCount++
- hv.provisionedCores += server.flavor.cpuCount
- hv.availableMemory -= server.flavor.memorySize // XXX Temporary hack
+ try {
+ server.host = host
- scope.launch {
- try {
- server.host = host
- host.spawn(server)
- activeServers[server] = host
+ host.spawn(server)
+ host.start(server)
- _serversActive++
- _attemptsSuccess++
- } catch (e: Throwable) {
- logger.error(e) { "Failed to deploy VM" }
+ _serversActive++
+ _attemptsSuccess++
- hv.instanceCount--
- hv.provisionedCores -= server.flavor.cpuCount
- hv.availableMemory += server.flavor.memorySize
+ hv.instanceCount++
+ hv.provisionedCores += server.flavor.cpuCount
+ hv.availableMemory -= server.flavor.memorySize
- _attemptsError++
- }
+ activeServers[server] = host
+ } catch (e: Throwable) {
+ logger.error(e) { "Failed to deploy VM" }
+ _attemptsError++
}
}
}
diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
index 73e9b3d7..c18709f3 100644
--- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
+++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
@@ -348,7 +348,7 @@ internal class ComputeServiceTest {
// Start server
server.start()
delay(5L * 60 * 1000)
- coVerify { host.spawn(capture(slot), true) }
+ coVerify { host.spawn(capture(slot)) }
listeners.forEach { it.onStateChanged(host, slot.captured, ServerState.RUNNING) }
@@ -376,7 +376,7 @@ internal class ComputeServiceTest {
every { host.state } returns HostState.UP
every { host.canFit(any()) } returns true
every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) }
- coEvery { host.spawn(any(), true) } throws IllegalStateException()
+ coEvery { host.spawn(any()) } throws IllegalStateException()
service.addHost(host)
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index c07649bd..b3e56f38 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -22,7 +22,6 @@
package org.opendc.compute.simulator
-import kotlinx.coroutines.yield
import org.opendc.compute.api.Flavor
import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
@@ -34,6 +33,7 @@ import org.opendc.compute.service.driver.telemetry.GuestCpuStats
import org.opendc.compute.service.driver.telemetry.GuestSystemStats
import org.opendc.compute.service.driver.telemetry.HostCpuStats
import org.opendc.compute.service.driver.telemetry.HostSystemStats
+import org.opendc.compute.simulator.internal.DefaultWorkloadMapper
import org.opendc.compute.simulator.internal.Guest
import org.opendc.compute.simulator.internal.GuestListener
import org.opendc.simulator.compute.SimBareMetalMachine
@@ -44,30 +44,37 @@ import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.workload.SimWorkload
-import org.opendc.simulator.flow2.FlowGraph
+import org.opendc.simulator.compute.workload.SimWorkloads
+import java.time.Clock
import java.time.Duration
import java.time.Instant
import java.util.UUID
-import kotlin.coroutines.CoroutineContext
+import java.util.function.Supplier
/**
- * A [Host] that is simulates virtual machines on a physical machine using [SimHypervisor].
+ * A [Host] implementation that simulates virtual machines on a physical machine using [SimHypervisor].
+ *
+ * @param uid The unique identifier of the host.
+ * @param name The name of the host.
+ * @param meta The metadata of the host.
+ * @param clock The (virtual) clock used to track time.
+ * @param machine The [SimBareMetalMachine] on which the host runs.
+ * @param hypervisor The [SimHypervisor] to run on top of the machine.
+ * @param mapper A [SimWorkloadMapper] to map a [Server] to a [SimWorkload].
+ * @param bootModel A [Supplier] providing the [SimWorkload] to execute during the boot procedure of the hypervisor.
+ * @param optimize A flag to indicate to optimize the machine models of the virtual machines.
*/
public class SimHost(
override val uid: UUID,
override val name: String,
override val meta: Map<String, Any>,
- private val context: CoroutineContext,
- graph: FlowGraph,
+ private val clock: Clock,
private val machine: SimBareMetalMachine,
private val hypervisor: SimHypervisor,
- private val mapper: SimWorkloadMapper = SimMetaWorkloadMapper(),
+ private val mapper: SimWorkloadMapper = DefaultWorkloadMapper,
+ private val bootModel: Supplier<SimWorkload?> = Supplier { null },
private val optimize: Boolean = false
) : Host, AutoCloseable {
- /**
- * The clock instance used by the host.
- */
- private val clock = graph.engine.clock
/**
* The event listeners registered with this host.
@@ -124,13 +131,12 @@ public class SimHost(
return sufficientMemory && enoughCpus && canFit
}
- override suspend fun spawn(server: Server, start: Boolean) {
- val guest = guests.computeIfAbsent(server) { key ->
+ override fun spawn(server: Server) {
+ guests.computeIfAbsent(server) { key ->
require(canFit(key)) { "Server does not fit" }
val machine = hypervisor.newMachine(key.flavor.toMachineModel())
val newGuest = Guest(
- context,
clock,
this,
hypervisor,
@@ -143,27 +149,23 @@ public class SimHost(
_guests.add(newGuest)
newGuest
}
-
- if (start) {
- guest.start()
- }
}
override fun contains(server: Server): Boolean {
return server in guests
}
- override suspend fun start(server: Server) {
+ override fun start(server: Server) {
val guest = requireNotNull(guests[server]) { "Unknown server ${server.uid} at host $uid" }
guest.start()
}
- override suspend fun stop(server: Server) {
+ override fun stop(server: Server) {
val guest = requireNotNull(guests[server]) { "Unknown server ${server.uid} at host $uid" }
guest.stop()
}
- override suspend fun delete(server: Server) {
+ override fun delete(server: Server) {
val guest = guests[server] ?: return
guest.delete()
}
@@ -251,7 +253,7 @@ public class SimHost(
override fun toString(): String = "SimHost[uid=$uid,name=$name,model=$model]"
- public suspend fun fail() {
+ public fun fail() {
reset(HostState.ERROR)
for (guest in _guests) {
@@ -259,55 +261,54 @@ public class SimHost(
}
}
- public suspend fun recover() {
+ public fun recover() {
updateUptime()
launch()
-
- // Wait for the hypervisor to launch before recovering the guests
- yield()
-
- for (guest in _guests) {
- guest.recover()
- }
}
/**
- * The [Job] that represents the machine running the hypervisor.
+ * The [SimMachineContext] that represents the machine running the hypervisor.
*/
- private var _ctx: SimMachineContext? = null
+ private var ctx: SimMachineContext? = null
/**
* Launch the hypervisor.
*/
private fun launch() {
- check(_ctx == null) { "Concurrent hypervisor running" }
-
- // Launch hypervisor onto machine
- _ctx = machine.startWorkload(
- object : SimWorkload {
- override fun onStart(ctx: SimMachineContext) {
- try {
- _bootTime = clock.instant()
- _state = HostState.UP
- hypervisor.onStart(ctx)
- } catch (cause: Throwable) {
- _state = HostState.ERROR
- _ctx = null
- throw cause
+ check(ctx == null) { "Concurrent hypervisor running" }
+
+ val bootWorkload = bootModel.get()
+ val hypervisor = hypervisor
+ val hypervisorWorkload = object : SimWorkload {
+ override fun onStart(ctx: SimMachineContext) {
+ try {
+ _bootTime = clock.instant()
+ _state = HostState.UP
+ hypervisor.onStart(ctx)
+
+ // Recover the guests that were running on the hypervisor.
+ for (guest in _guests) {
+ guest.recover()
}
+ } catch (cause: Throwable) {
+ _state = HostState.ERROR
+ throw cause
}
+ }
- override fun onStop(ctx: SimMachineContext) {
- try {
- hypervisor.onStop(ctx)
- } finally {
- _ctx = null
- }
- }
- },
- emptyMap()
- )
+ override fun onStop(ctx: SimMachineContext) {
+ hypervisor.onStop(ctx)
+ }
+ }
+
+ val workload = if (bootWorkload != null) SimWorkloads.chain(bootWorkload, hypervisorWorkload) else hypervisorWorkload
+
+ // Launch hypervisor onto machine
+ ctx = machine.startWorkload(workload, emptyMap()) { cause ->
+ _state = if (cause != null) HostState.ERROR else HostState.DOWN
+ ctx = null
+ }
}
/**
@@ -317,7 +318,7 @@ public class SimHost(
updateUptime()
// Stop the hypervisor
- _ctx?.shutdown()
+ ctx?.shutdown()
_state = state
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimWorkloadMapper.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimWorkloadMapper.kt
index 7082c5cf..83baa61a 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimWorkloadMapper.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimWorkloadMapper.kt
@@ -22,6 +22,7 @@
package org.opendc.compute.simulator
+import org.opendc.compute.api.Image
import org.opendc.compute.api.Server
import org.opendc.simulator.compute.workload.SimWorkload
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.java b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/DefaultWorkloadMapper.kt
index f0e2561f..c5293a8d 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.java
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/DefaultWorkloadMapper.kt
@@ -20,41 +20,25 @@
* SOFTWARE.
*/
-package org.opendc.simulator.compute.workload;
+package org.opendc.compute.simulator.internal
-import java.util.HashSet;
-import org.opendc.simulator.compute.SimMachineContext;
+import org.opendc.compute.api.Server
+import org.opendc.compute.simulator.SimMetaWorkloadMapper
+import org.opendc.compute.simulator.SimWorkloadMapper
+import org.opendc.simulator.compute.workload.SimWorkload
+import org.opendc.simulator.compute.workload.SimWorkloads
+import java.time.Duration
/**
- * A helper class to manage the lifecycle of a {@link SimWorkload}.
+ * A [SimWorkloadMapper] to introduces a boot delay of 1 ms. This object exists to retain the old behavior while
+ * introducing the possibility of adding custom boot delays.
*/
-public final class SimWorkloadLifecycle {
- private final SimMachineContext ctx;
- private final HashSet<Runnable> waiting = new HashSet<>();
+internal object DefaultWorkloadMapper : SimWorkloadMapper {
+ private val delegate = SimMetaWorkloadMapper()
- /**
- * Construct a {@link SimWorkloadLifecycle} instance.
- *
- * @param ctx The {@link SimMachineContext} of the workload.
- */
- public SimWorkloadLifecycle(SimMachineContext ctx) {
- this.ctx = ctx;
- }
-
- /**
- * Register a "completer" callback that must be invoked before ending the lifecycle of the workload.
- */
- public Runnable newCompleter() {
- Runnable completer = new Runnable() {
- @Override
- public void run() {
- final HashSet<Runnable> waiting = SimWorkloadLifecycle.this.waiting;
- if (waiting.remove(this) && waiting.isEmpty()) {
- ctx.shutdown();
- }
- }
- };
- waiting.add(completer);
- return completer;
+ override fun createWorkload(server: Server): SimWorkload {
+ val workload = delegate.createWorkload(server)
+ val bootWorkload = SimWorkloads.runtime(Duration.ofMillis(1), 0.8)
+ return SimWorkloads.chain(bootWorkload, workload)
}
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
index 790d8047..ca947625 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
@@ -22,12 +22,6 @@
package org.opendc.compute.simulator.internal
-import kotlinx.coroutines.CancellationException
-import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.Job
-import kotlinx.coroutines.cancel
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.launch
import mu.KotlinLogging
import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
@@ -35,20 +29,17 @@ import org.opendc.compute.service.driver.telemetry.GuestCpuStats
import org.opendc.compute.service.driver.telemetry.GuestSystemStats
import org.opendc.compute.simulator.SimHost
import org.opendc.compute.simulator.SimWorkloadMapper
+import org.opendc.simulator.compute.SimMachineContext
import org.opendc.simulator.compute.kernel.SimHypervisor
import org.opendc.simulator.compute.kernel.SimVirtualMachine
-import org.opendc.simulator.compute.runWorkload
-import org.opendc.simulator.compute.workload.SimWorkload
import java.time.Clock
import java.time.Duration
import java.time.Instant
-import kotlin.coroutines.CoroutineContext
/**
* A virtual machine instance that is managed by a [SimHost].
*/
internal class Guest(
- context: CoroutineContext,
private val clock: Clock,
val host: SimHost,
private val hypervisor: SimHypervisor,
@@ -58,35 +49,26 @@ internal class Guest(
val machine: SimVirtualMachine
) {
/**
- * The [CoroutineScope] of the guest.
- */
- private val scope: CoroutineScope = CoroutineScope(context + Job())
-
- /**
- * The logger instance of this guest.
- */
- private val logger = KotlinLogging.logger {}
-
- /**
* The state of the [Guest].
*
* [ServerState.PROVISIONING] is an invalid value for a guest, since it applies before the host is selected for
* a server.
*/
var state: ServerState = ServerState.TERMINATED
+ private set
/**
* Start the guest.
*/
- suspend fun start() {
+ fun start() {
when (state) {
ServerState.TERMINATED, ServerState.ERROR -> {
- logger.info { "User requested to start server ${server.uid}" }
+ LOGGER.info { "User requested to start server ${server.uid}" }
doStart()
}
ServerState.RUNNING -> return
ServerState.DELETED -> {
- logger.warn { "User tried to start deleted server" }
+ LOGGER.warn { "User tried to start deleted server" }
throw IllegalArgumentException("Server is deleted")
}
else -> assert(false) { "Invalid state transition" }
@@ -96,7 +78,7 @@ internal class Guest(
/**
* Stop the guest.
*/
- suspend fun stop() {
+ fun stop() {
when (state) {
ServerState.RUNNING -> doStop(ServerState.TERMINATED)
ServerState.ERROR -> doRecover()
@@ -111,12 +93,11 @@ internal class Guest(
* This operation will stop the guest if it is running on the host and remove all resources associated with the
* guest.
*/
- suspend fun delete() {
+ fun delete() {
stop()
state = ServerState.DELETED
hypervisor.removeMachine(machine)
- scope.cancel()
}
/**
@@ -124,7 +105,7 @@ internal class Guest(
*
* This operation forcibly stops the guest and puts the server into an error state.
*/
- suspend fun fail() {
+ fun fail() {
if (state != ServerState.RUNNING) {
return
}
@@ -135,7 +116,7 @@ internal class Guest(
/**
* Recover the guest if it is in an error state.
*/
- suspend fun recover() {
+ fun recover() {
if (state != ServerState.ERROR) {
return
}
@@ -175,37 +156,34 @@ internal class Guest(
}
/**
- * The [Job] representing the current active virtual machine instance or `null` if no virtual machine is active.
+ * The [SimMachineContext] representing the current active virtual machine instance or `null` if no virtual machine
+ * is active.
*/
- private var job: Job? = null
+ private var ctx: SimMachineContext? = null
/**
* Launch the guest on the simulated
*/
- private suspend fun doStart() {
- assert(job == null) { "Concurrent job running" }
- val workload = mapper.createWorkload(server)
-
- val job = scope.launch { runMachine(workload) }
- this.job = job
+ private fun doStart() {
+ assert(ctx == null) { "Concurrent job running" }
- state = ServerState.RUNNING
onStart()
- job.invokeOnCompletion { cause ->
- this.job = null
- onStop(if (cause != null && cause !is CancellationException) ServerState.ERROR else ServerState.TERMINATED)
+ val workload = mapper.createWorkload(server)
+ val meta = mapOf("driver" to host, "server" to server) + server.meta
+ ctx = machine.startWorkload(workload, meta) { cause ->
+ onStop(if (cause != null) ServerState.ERROR else ServerState.TERMINATED)
+ ctx = null
}
}
/**
* Attempt to stop the server and put it into [target] state.
*/
- private suspend fun doStop(target: ServerState) {
- assert(job != null) { "Invalid job state" }
- val job = job ?: return
- job.cancel()
- job.join()
+ private fun doStop(target: ServerState) {
+ assert(ctx != null) { "Invalid job state" }
+ val ctx = ctx ?: return
+ ctx.shutdown()
state = target
}
@@ -218,14 +196,6 @@ internal class Guest(
}
/**
- * Converge the process that models the virtual machine lifecycle as a coroutine.
- */
- private suspend fun runMachine(workload: SimWorkload) {
- delay(1) // TODO Introduce model for boot time
- machine.runWorkload(workload, mapOf("driver" to host, "server" to server) + server.meta)
- }
-
- /**
* This method is invoked when the guest was started on the host and has booted into a running state.
*/
private fun onStart() {
@@ -264,4 +234,9 @@ internal class Guest(
_downtime += duration
}
}
+
+ private companion object {
+ @JvmStatic
+ private val LOGGER = KotlinLogging.logger {}
+ }
}
diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index a5999bcd..fc581d3e 100644
--- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -24,7 +24,6 @@ package org.opendc.compute.simulator
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
-import kotlinx.coroutines.launch
import kotlinx.coroutines.suspendCancellableCoroutine
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
@@ -70,6 +69,73 @@ internal class SimHostTest {
}
/**
+ * Test a single virtual machine hosted by the hypervisor.
+ */
+ @Test
+ fun testSingle() = runSimulation {
+ val duration = 5 * 60L
+
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val machine = SimBareMetalMachine.create(graph, machineModel)
+ val hypervisor = SimHypervisor.create(FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1))
+
+ val host = SimHost(
+ uid = UUID.randomUUID(),
+ name = "test",
+ meta = emptyMap(),
+ clock,
+ machine,
+ hypervisor
+ )
+ val vmImage = MockImage(
+ UUID.randomUUID(),
+ "<unnamed>",
+ emptyMap(),
+ mapOf(
+ "workload" to
+ SimTrace.ofFragments(
+ SimTraceFragment(0, duration * 1000, 2 * 28.0, 2),
+ SimTraceFragment(duration * 1000, duration * 1000, 2 * 3500.0, 2),
+ SimTraceFragment(duration * 2000, duration * 1000, 0.0, 2),
+ SimTraceFragment(duration * 3000, duration * 1000, 2 * 183.0, 2)
+ ).createWorkload(1)
+ )
+ )
+
+ val flavor = MockFlavor(2, 0)
+
+ suspendCancellableCoroutine { cont ->
+ host.addListener(object : HostListener {
+ private var finished = 0
+
+ override fun onStateChanged(host: Host, server: Server, newState: ServerState) {
+ if (newState == ServerState.TERMINATED && ++finished == 1) {
+ cont.resume(Unit)
+ }
+ }
+ })
+ val server = MockServer(UUID.randomUUID(), "a", flavor, vmImage)
+ host.spawn(server)
+ host.start(server)
+ }
+
+ // Ensure last cycle is collected
+ delay(1000L * duration)
+ host.close()
+
+ val cpuStats = host.getCpuStats()
+
+ assertAll(
+ { assertEquals(639, cpuStats.activeTime, "Active time does not match") },
+ { assertEquals(2360, cpuStats.idleTime, "Idle time does not match") },
+ { assertEquals(56, cpuStats.stealTime, "Steal time does not match") },
+ { assertEquals(1500001, clock.millis()) }
+ )
+ }
+
+ /**
* Test overcommitting of resources by the hypervisor.
*/
@Test
@@ -86,8 +152,7 @@ internal class SimHostTest {
uid = UUID.randomUUID(),
name = "test",
meta = emptyMap(),
- coroutineContext,
- graph,
+ clock,
machine,
hypervisor
)
@@ -123,9 +188,6 @@ internal class SimHostTest {
val flavor = MockFlavor(2, 0)
coroutineScope {
- launch { host.spawn(MockServer(UUID.randomUUID(), "a", flavor, vmImageA)) }
- launch { host.spawn(MockServer(UUID.randomUUID(), "b", flavor, vmImageB)) }
-
suspendCancellableCoroutine { cont ->
host.addListener(object : HostListener {
private var finished = 0
@@ -136,6 +198,13 @@ internal class SimHostTest {
}
}
})
+ val serverA = MockServer(UUID.randomUUID(), "a", flavor, vmImageA)
+ host.spawn(serverA)
+ val serverB = MockServer(UUID.randomUUID(), "b", flavor, vmImageB)
+ host.spawn(serverB)
+
+ host.start(serverA)
+ host.start(serverB)
}
}
@@ -169,8 +238,7 @@ internal class SimHostTest {
uid = UUID.randomUUID(),
name = "test",
meta = emptyMap(),
- coroutineContext,
- graph,
+ clock,
machine,
hypervisor
)
@@ -193,12 +261,13 @@ internal class SimHostTest {
coroutineScope {
host.spawn(server)
+ host.start(server)
delay(5000L)
host.fail()
delay(duration * 1000)
host.recover()
- suspendCancellableCoroutine<Unit> { cont ->
+ suspendCancellableCoroutine { cont ->
host.addListener(object : HostListener {
override fun onStateChanged(host: Host, server: Server, newState: ServerState) {
if (newState == ServerState.TERMINATED) {
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 47058caa..77b0d09f 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -120,9 +120,9 @@ class CapelinIntegrationTest {
{ assertEquals(0, monitor.serversActive, "All VMs should finish after a run") },
{ assertEquals(0, monitor.attemptsFailure, "No VM should be unscheduled") },
{ assertEquals(0, monitor.serversPending, "No VM should not be in the queue") },
- { assertEquals(223394204, monitor.idleTime) { "Incorrect idle time" } },
- { assertEquals(66976984, monitor.activeTime) { "Incorrect active time" } },
- { assertEquals(3160316, monitor.stealTime) { "Incorrect steal time" } },
+ { assertEquals(223394101, monitor.idleTime) { "Incorrect idle time" } },
+ { assertEquals(66977086, monitor.activeTime) { "Incorrect active time" } },
+ { assertEquals(3160276, monitor.stealTime) { "Incorrect steal time" } },
{ assertEquals(0, monitor.lostTime) { "Incorrect lost time" } },
{ assertEquals(5.84093E9, monitor.energyUsage, 1E4) { "Incorrect power draw" } }
)
@@ -160,8 +160,8 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(10999504, monitor.idleTime) { "Idle time incorrect" } },
- { assertEquals(9741294, monitor.activeTime) { "Active time incorrect" } },
+ { assertEquals(10999514, monitor.idleTime) { "Idle time incorrect" } },
+ { assertEquals(9741285, monitor.activeTime) { "Active time incorrect" } },
{ assertEquals(0, monitor.stealTime) { "Steal time incorrect" } },
{ assertEquals(0, monitor.lostTime) { "Lost time incorrect" } },
{ assertEquals(7.0116E8, monitor.energyUsage, 1E4) { "Incorrect power draw" } }
@@ -199,10 +199,10 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(6027979, monitor.idleTime) { "Idle time incorrect" } },
- { assertEquals(14712820, monitor.activeTime) { "Active time incorrect" } },
- { assertEquals(12532979, monitor.stealTime) { "Steal time incorrect" } },
- { assertEquals(445913, monitor.lostTime) { "Lost time incorrect" } }
+ { assertEquals(6028018, monitor.idleTime) { "Idle time incorrect" } },
+ { assertEquals(14712781, monitor.activeTime) { "Active time incorrect" } },
+ { assertEquals(12532934, monitor.stealTime) { "Steal time incorrect" } },
+ { assertEquals(424267, monitor.lostTime) { "Lost time incorrect" } }
)
}
@@ -229,8 +229,8 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(10085103, monitor.idleTime) { "Idle time incorrect" } },
- { assertEquals(8539212, monitor.activeTime) { "Active time incorrect" } },
+ { assertEquals(10085111, monitor.idleTime) { "Idle time incorrect" } },
+ { assertEquals(8539204, monitor.activeTime) { "Active time incorrect" } },
{ assertEquals(0, monitor.stealTime) { "Steal time incorrect" } },
{ assertEquals(0, monitor.lostTime) { "Lost time incorrect" } },
{ assertEquals(2328039558, monitor.uptime) { "Uptime incorrect" } }
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt
index 292be929..e224fb84 100644
--- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt
@@ -58,8 +58,7 @@ public class HostsProvisioningStep internal constructor(
spec.uid,
spec.name,
spec.meta,
- ctx.coroutineContext,
- graph,
+ ctx.clock,
machine,
hypervisor,
optimize = optimize
diff --git a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt
index 4dc3a775..b622362a 100644
--- a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt
+++ b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt
@@ -27,7 +27,7 @@ package org.opendc.experiments.workflow
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
-import org.opendc.simulator.compute.workload.SimFlopsWorkload
+import org.opendc.simulator.compute.workload.SimWorkloads
import org.opendc.trace.Trace
import org.opendc.trace.conv.TABLE_TASKS
import org.opendc.trace.conv.TASK_ALLOC_NCPUS
@@ -74,7 +74,7 @@ public fun Trace.toJobs(): List<Job> {
val submitTime = reader.getInstant(TASK_SUBMIT_TIME)!!
val runtime = reader.getDuration(TASK_RUNTIME)!!
val flops: Long = 4000 * runtime.seconds * grantedCpus
- val workload = SimFlopsWorkload(flops, 1.0)
+ val workload = SimWorkloads.flops(flops, 1.0)
val task = Task(
UUID(0L, id),
"<unnamed>",
diff --git a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt
index aac54f57..6baee7ea 100644
--- a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt
+++ b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt
@@ -40,8 +40,8 @@ import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
-import org.opendc.simulator.compute.workload.SimRuntimeWorkload
import org.opendc.simulator.compute.workload.SimWorkload
+import org.opendc.simulator.compute.workload.SimWorkloads
import org.opendc.simulator.kotlin.runSimulation
import java.time.Duration
import java.util.Random
@@ -66,7 +66,7 @@ internal class SimFaaSServiceTest {
@Test
fun testSmoke() = runSimulation {
val random = Random(0)
- val workload = spyk(object : SimFaaSWorkload, SimWorkload by SimRuntimeWorkload(1000, 1.0) {
+ val workload = spyk(object : SimFaaSWorkload, SimWorkload by SimWorkloads.runtime(1000, 1.0) {
override suspend fun invoke() {
delay(random.nextInt(1000).toLong())
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimAbstractMachine.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimAbstractMachine.java
index cf5aed03..d968d884 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimAbstractMachine.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimAbstractMachine.java
@@ -25,6 +25,7 @@ package org.opendc.simulator.compute;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.function.Consumer;
import org.opendc.simulator.compute.device.SimNetworkAdapter;
import org.opendc.simulator.compute.model.MachineModel;
import org.opendc.simulator.compute.model.MemoryUnit;
@@ -62,12 +63,13 @@ public abstract class SimAbstractMachine implements SimMachine {
}
@Override
- public final SimMachineContext startWorkload(SimWorkload workload, Map<String, Object> meta) {
+ public final SimMachineContext startWorkload(
+ SimWorkload workload, Map<String, Object> meta, Consumer<Exception> completion) {
if (activeContext != null) {
throw new IllegalStateException("A machine cannot run multiple workloads concurrently");
}
- final Context ctx = createContext(workload, new HashMap<>(meta));
+ final Context ctx = createContext(workload, new HashMap<>(meta), completion);
ctx.start();
return ctx;
}
@@ -83,10 +85,12 @@ public abstract class SimAbstractMachine implements SimMachine {
/**
* Construct a new {@link Context} instance representing the active execution.
*
- * @param workload The workload to start on the machine.
- * @param meta The metadata to pass to the workload.
+ * @param workload The workload to start on the machine.
+ * @param meta The metadata to pass to the workload.
+ * @param completion A block that is invoked when the workload completes carrying an exception if thrown by the workload.
*/
- protected abstract Context createContext(SimWorkload workload, Map<String, Object> meta);
+ protected abstract Context createContext(
+ SimWorkload workload, Map<String, Object> meta, Consumer<Exception> completion);
/**
* Return the active {@link Context} instance (if any).
@@ -102,7 +106,9 @@ public abstract class SimAbstractMachine implements SimMachine {
private final SimAbstractMachine machine;
private final SimWorkload workload;
private final Map<String, Object> meta;
+ private final Consumer<Exception> completion;
private boolean isClosed;
+ private Exception cause;
/**
* Construct a new {@link Context} instance.
@@ -110,11 +116,17 @@ public abstract class SimAbstractMachine implements SimMachine {
* @param machine The {@link SimAbstractMachine} to which the context belongs.
* @param workload The {@link SimWorkload} to which the context belongs.
* @param meta The metadata passed to the context.
+ * @param completion A block that is invoked when the workload completes carrying an exception if thrown by the workload.
*/
- public Context(SimAbstractMachine machine, SimWorkload workload, Map<String, Object> meta) {
+ public Context(
+ SimAbstractMachine machine,
+ SimWorkload workload,
+ Map<String, Object> meta,
+ Consumer<Exception> completion) {
this.machine = machine;
this.workload = workload;
this.meta = meta;
+ this.completion = completion;
}
@Override
@@ -123,6 +135,28 @@ public abstract class SimAbstractMachine implements SimMachine {
}
@Override
+ public void reset() {
+ final FlowGraph graph = getMemory().getInput().getGraph();
+
+ for (SimProcessingUnit cpu : getCpus()) {
+ final Inlet inlet = cpu.getInput();
+ graph.disconnect(inlet);
+ }
+
+ graph.disconnect(getMemory().getInput());
+
+ for (SimNetworkInterface ifx : getNetworkInterfaces()) {
+ ((NetworkAdapter) ifx).disconnect();
+ }
+
+ for (SimStorageInterface storage : getStorageInterfaces()) {
+ StorageDevice impl = (StorageDevice) storage;
+ graph.disconnect(impl.getRead());
+ graph.disconnect(impl.getWrite());
+ }
+ }
+
+ @Override
public final void shutdown() {
if (isClosed) {
return;
@@ -136,11 +170,19 @@ public abstract class SimAbstractMachine implements SimMachine {
// Cancel all the resources associated with the machine
doCancel();
+ Exception e = this.cause;
+
try {
workload.onStop(this);
} catch (Exception cause) {
- LOGGER.warn("Workload failed during onStop callback", cause);
+ if (e != null) {
+ e.addSuppressed(cause);
+ } else {
+ e = cause;
+ }
}
+
+ completion.accept(e);
}
/**
@@ -151,7 +193,7 @@ public abstract class SimAbstractMachine implements SimMachine {
machine.activeContext = this;
workload.onStart(this);
} catch (Exception cause) {
- LOGGER.warn("Workload failed during onStart callback", cause);
+ this.cause = cause;
shutdown();
}
}
@@ -160,24 +202,7 @@ public abstract class SimAbstractMachine implements SimMachine {
* Run the stop procedures for the resources associated with the machine.
*/
protected void doCancel() {
- final FlowGraph graph = getMemory().getInput().getGraph();
-
- for (SimProcessingUnit cpu : getCpus()) {
- final Inlet inlet = cpu.getInput();
- graph.disconnect(inlet);
- }
-
- graph.disconnect(getMemory().getInput());
-
- for (SimNetworkInterface ifx : getNetworkInterfaces()) {
- ((NetworkAdapter) ifx).disconnect();
- }
-
- for (SimStorageInterface storage : getStorageInterfaces()) {
- StorageDevice impl = (StorageDevice) storage;
- graph.disconnect(impl.getRead());
- graph.disconnect(impl.getWrite());
- }
+ reset();
}
@Override
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimBareMetalMachine.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimBareMetalMachine.java
index aa7502d6..11356eb2 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimBareMetalMachine.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimBareMetalMachine.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.function.Consumer;
import org.opendc.simulator.compute.device.SimPeripheral;
import org.opendc.simulator.compute.model.MachineModel;
import org.opendc.simulator.compute.model.ProcessingUnit;
@@ -39,7 +40,7 @@ import org.opendc.simulator.flow2.Inlet;
*
* <p>
* A {@link SimBareMetalMachine} is a stateful object, and you should be careful when operating this object concurrently. For
- * example, the class expects only a single concurrent call to {@link #startWorkload(SimWorkload, Map)}.
+ * example, the class expects only a single concurrent call to {@link #startWorkload(SimWorkload, Map, Consumer)} )}.
*/
public final class SimBareMetalMachine extends SimAbstractMachine {
/**
@@ -192,8 +193,9 @@ public final class SimBareMetalMachine extends SimAbstractMachine {
}
@Override
- protected SimAbstractMachine.Context createContext(SimWorkload workload, Map<String, Object> meta) {
- return new Context(this, workload, meta);
+ protected SimAbstractMachine.Context createContext(
+ SimWorkload workload, Map<String, Object> meta, Consumer<Exception> completion) {
+ return new Context(this, workload, meta, completion);
}
/**
@@ -206,8 +208,12 @@ public final class SimBareMetalMachine extends SimAbstractMachine {
private final List<NetworkAdapter> net;
private final List<StorageDevice> disk;
- private Context(SimBareMetalMachine machine, SimWorkload workload, Map<String, Object> meta) {
- super(machine, workload, meta);
+ private Context(
+ SimBareMetalMachine machine,
+ SimWorkload workload,
+ Map<String, Object> meta,
+ Consumer<Exception> completion) {
+ super(machine, workload, meta, completion);
this.graph = machine.graph;
this.cpus = machine.cpus;
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimMachine.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimMachine.java
index 59599875..1f86aa02 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimMachine.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimMachine.java
@@ -24,6 +24,7 @@ package org.opendc.simulator.compute;
import java.util.List;
import java.util.Map;
+import java.util.function.Consumer;
import org.opendc.simulator.compute.device.SimPeripheral;
import org.opendc.simulator.compute.model.MachineModel;
import org.opendc.simulator.compute.workload.SimWorkload;
@@ -47,10 +48,11 @@ public interface SimMachine {
*
* @param workload The workload to start on the machine.
* @param meta The metadata to pass to the workload.
+ * @param completion A block that is invoked when the workload completes carrying an exception if thrown by the workload.
* @return A {@link SimMachineContext} that represents the execution context for the workload.
* @throws IllegalStateException if a workload is already active on the machine or if the machine is closed.
*/
- SimMachineContext startWorkload(SimWorkload workload, Map<String, Object> meta);
+ SimMachineContext startWorkload(SimWorkload workload, Map<String, Object> meta, Consumer<Exception> completion);
/**
* Cancel the active workload on this machine (if any).
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimMachineContext.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimMachineContext.java
index f6a3bd38..5d08e2b7 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimMachineContext.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimMachineContext.java
@@ -68,6 +68,11 @@ public interface SimMachineContext {
List<? extends SimStorageInterface> getStorageInterfaces();
/**
+ * Reset all resources of the machine.
+ */
+ void reset();
+
+ /**
* Shutdown the workload.
*/
void shutdown();
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/kernel/SimHypervisor.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/kernel/SimHypervisor.java
index 6e295837..f03a0c20 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/kernel/SimHypervisor.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/kernel/SimHypervisor.java
@@ -28,6 +28,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.SplittableRandom;
+import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.opendc.simulator.compute.SimAbstractMachine;
import org.opendc.simulator.compute.SimMachine;
@@ -471,7 +472,8 @@ public final class SimHypervisor implements SimWorkload {
}
@Override
- protected Context createContext(SimWorkload workload, Map<String, Object> meta) {
+ protected Context createContext(
+ SimWorkload workload, Map<String, Object> meta, Consumer<Exception> completion) {
if (isClosed) {
throw new IllegalStateException("Virtual machine does not exist anymore");
}
@@ -482,7 +484,15 @@ public final class SimHypervisor implements SimWorkload {
}
return new VmContext(
- context, this, random, interferenceDomain, counters, SimHypervisor.this.counters, workload, meta);
+ context,
+ this,
+ random,
+ interferenceDomain,
+ counters,
+ SimHypervisor.this.counters,
+ workload,
+ meta,
+ completion);
}
@Override
@@ -538,8 +548,9 @@ public final class SimHypervisor implements SimWorkload {
VmCounters vmCounters,
HvCounters hvCounters,
SimWorkload workload,
- Map<String, Object> meta) {
- super(machine, workload, meta);
+ Map<String, Object> meta,
+ Consumer<Exception> completion) {
+ super(machine, workload, meta, completion);
this.context = context;
this.random = random;
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java
new file mode 100644
index 00000000..9304122a
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java
@@ -0,0 +1,182 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.compute.workload;
+
+import java.util.List;
+import java.util.Map;
+import org.opendc.simulator.compute.SimMachineContext;
+import org.opendc.simulator.compute.SimMemory;
+import org.opendc.simulator.compute.SimNetworkInterface;
+import org.opendc.simulator.compute.SimProcessingUnit;
+import org.opendc.simulator.compute.SimStorageInterface;
+import org.opendc.simulator.flow2.FlowGraph;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link SimWorkload} that composes two {@link SimWorkload}s.
+ */
+final class SimChainWorkload implements SimWorkload {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SimChainWorkload.class);
+
+ private final SimWorkload[] workloads;
+ private int activeWorkloadIndex;
+
+ private Context activeContext;
+
+ /**
+ * Construct a {@link SimChainWorkload} instance.
+ *
+ * @param workloads The workloads to chain.
+ */
+ SimChainWorkload(SimWorkload... workloads) {
+ this.workloads = workloads;
+ }
+
+ @Override
+ public void onStart(SimMachineContext ctx) {
+ final SimWorkload[] workloads = this.workloads;
+ final int activeWorkloadIndex = this.activeWorkloadIndex;
+
+ if (activeWorkloadIndex >= workloads.length) {
+ return;
+ }
+
+ final Context context = new Context(ctx);
+ activeContext = context;
+
+ if (!context.doStart(workloads[activeWorkloadIndex])) {
+ ctx.shutdown();
+ }
+ }
+
+ @Override
+ public void onStop(SimMachineContext ctx) {
+ final SimWorkload[] workloads = this.workloads;
+ final int activeWorkloadIndex = this.activeWorkloadIndex;
+
+ if (activeWorkloadIndex >= workloads.length) {
+ return;
+ }
+
+ final Context context = activeContext;
+ activeContext = null;
+
+ context.doStop(workloads[activeWorkloadIndex]);
+ }
+
+ /**
+ * A {@link SimMachineContext} that intercepts the shutdown calls.
+ */
+ private class Context implements SimMachineContext {
+ private final SimMachineContext ctx;
+
+ private Context(SimMachineContext ctx) {
+ this.ctx = ctx;
+ }
+
+ @Override
+ public FlowGraph getGraph() {
+ return ctx.getGraph();
+ }
+
+ @Override
+ public Map<String, Object> getMeta() {
+ return ctx.getMeta();
+ }
+
+ @Override
+ public List<? extends SimProcessingUnit> getCpus() {
+ return ctx.getCpus();
+ }
+
+ @Override
+ public SimMemory getMemory() {
+ return ctx.getMemory();
+ }
+
+ @Override
+ public List<? extends SimNetworkInterface> getNetworkInterfaces() {
+ return ctx.getNetworkInterfaces();
+ }
+
+ @Override
+ public List<? extends SimStorageInterface> getStorageInterfaces() {
+ return ctx.getStorageInterfaces();
+ }
+
+ @Override
+ public void reset() {
+ ctx.reset();
+ }
+
+ @Override
+ public void shutdown() {
+ final SimWorkload[] workloads = SimChainWorkload.this.workloads;
+ final int activeWorkloadIndex = ++SimChainWorkload.this.activeWorkloadIndex;
+
+ if (doStop(workloads[activeWorkloadIndex - 1]) && activeWorkloadIndex < workloads.length) {
+ ctx.reset();
+
+ if (doStart(workloads[activeWorkloadIndex])) {
+ return;
+ }
+ }
+
+ ctx.shutdown();
+ }
+
+ /**
+ * Start the specified workload.
+ *
+ * @return <code>true</code> if the workload started successfully, <code>false</code> otherwise.
+ */
+ private boolean doStart(SimWorkload workload) {
+ try {
+ workload.onStart(this);
+ } catch (Exception cause) {
+ LOGGER.warn("Workload failed during onStart callback", cause);
+ doStop(workload);
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * Stop the specified workload.
+ *
+ * @return <code>true</code> if the workload stopped successfully, <code>false</code> otherwise.
+ */
+ private boolean doStop(SimWorkload workload) {
+ try {
+ workload.onStop(this);
+ } catch (Exception cause) {
+ LOGGER.warn("Workload failed during onStop callback", cause);
+ return false;
+ }
+
+ return true;
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimFlopsWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimFlopsWorkload.java
index 255fd1b2..f3efbebb 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimFlopsWorkload.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimFlopsWorkload.java
@@ -49,9 +49,9 @@ public class SimFlopsWorkload implements SimWorkload, FlowStageLogic {
* Construct a new {@link SimFlopsWorkload}.
*
* @param flops The number of floating point operations to perform for this task in MFLOPs.
- * @param utilization A model of the CPU utilization of the application.
+ * @param utilization The CPU utilization of the workload.
*/
- public SimFlopsWorkload(long flops, double utilization) {
+ SimFlopsWorkload(long flops, double utilization) {
if (flops < 0) {
throw new IllegalArgumentException("Number of FLOPs must be positive");
} else if (utilization <= 0.0 || utilization > 1.0) {
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimRuntimeWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimRuntimeWorkload.java
index c3380b31..194efafd 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimRuntimeWorkload.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimRuntimeWorkload.java
@@ -48,9 +48,9 @@ public class SimRuntimeWorkload implements SimWorkload, FlowStageLogic {
* Construct a new {@link SimRuntimeWorkload}.
*
* @param duration The duration of the workload in milliseconds.
- * @param utilization A model of the CPU utilization of the application.
+ * @param utilization The CPU utilization of the workload.
*/
- public SimRuntimeWorkload(long duration, double utilization) {
+ SimRuntimeWorkload(long duration, double utilization) {
if (duration < 0) {
throw new IllegalArgumentException("Duration must be positive");
} else if (utilization <= 0.0 || utilization > 1.0) {
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkloads.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkloads.java
new file mode 100644
index 00000000..82557d06
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkloads.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.compute.workload;
+
+import java.time.Duration;
+
+/**
+ * Helper methods for constructing {@link SimWorkload}s.
+ */
+public class SimWorkloads {
+ private SimWorkloads() {}
+
+ /**
+ * Create a {@link SimWorkload} that executes a specified number of floating point operations (FLOPs) at the given
+ * utilization.
+ *
+ * @param flops The number of floating point operations to perform for this task in MFLOPs.
+ * @param utilization The CPU utilization of the workload.
+ */
+ public static SimWorkload flops(long flops, double utilization) {
+ return new SimFlopsWorkload(flops, utilization);
+ }
+
+ /**
+ * Create a {@link SimWorkload} that consumes the CPU resources for a specified duration at the given utilization.
+ *
+ * @param duration The duration of the workload in milliseconds.
+ * @param utilization The CPU utilization of the workload.
+ */
+ public static SimWorkload runtime(long duration, double utilization) {
+ return new SimRuntimeWorkload(duration, utilization);
+ }
+
+ /**
+ * Create a {@link SimWorkload} that consumes the CPU resources for a specified duration at the given utilization.
+ *
+ * @param duration The duration of the workload.
+ * @param utilization The CPU utilization of the workload.
+ */
+ public static SimWorkload runtime(Duration duration, double utilization) {
+ return runtime(duration.toMillis(), utilization);
+ }
+
+ /**
+ * Chain the specified <code>workloads</code> into a single {@link SimWorkload}.
+ */
+ public static SimWorkload chain(SimWorkload... workloads) {
+ return new SimChainWorkload(workloads);
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt
index c23f48dc..b354caff 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt
@@ -39,31 +39,8 @@ public suspend fun SimMachine.runWorkload(workload: SimWorkload, meta: Map<Strin
return suspendCancellableCoroutine { cont ->
cont.invokeOnCancellation { this@runWorkload.cancel() }
- startWorkload(
- object : SimWorkload {
- override fun onStart(ctx: SimMachineContext) {
- try {
- workload.onStart(ctx)
- } catch (cause: Throwable) {
- cont.resumeWithException(cause)
- throw cause
- }
- }
-
- override fun onStop(ctx: SimMachineContext) {
- try {
- workload.onStop(ctx)
-
- if (!cont.isCompleted) {
- cont.resume(Unit)
- }
- } catch (cause: Throwable) {
- cont.resumeWithException(cause)
- throw cause
- }
- }
- },
- meta
- )
+ startWorkload(workload, meta) { cause ->
+ if (cause != null) cont.resumeWithException(cause) else cont.resume(Unit)
+ }
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
index f0aae15b..266839bd 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
@@ -40,9 +40,9 @@ import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.model.StorageDevice
import org.opendc.simulator.compute.power.CpuPowerModels
-import org.opendc.simulator.compute.workload.SimFlopsWorkload
import org.opendc.simulator.compute.workload.SimTrace
import org.opendc.simulator.compute.workload.SimWorkload
+import org.opendc.simulator.compute.workload.SimWorkloads
import org.opendc.simulator.flow2.FlowEngine
import org.opendc.simulator.flow2.source.SimpleFlowSource
import org.opendc.simulator.kotlin.runSimulation
@@ -78,7 +78,7 @@ class SimMachineTest {
machineModel
)
- machine.runWorkload(SimFlopsWorkload(2_000, /*utilization*/ 1.0))
+ machine.runWorkload(SimWorkloads.flops(2_000, /*utilization*/ 1.0))
// Two cores execute 1000 MFlOps per second (1000 ms)
assertEquals(1000, clock.millis())
@@ -123,7 +123,7 @@ class SimMachineTest {
machineModel
)
- machine.runWorkload(SimFlopsWorkload(2_000, /*utilization*/ 1.0))
+ machine.runWorkload(SimWorkloads.flops(2_000, /*utilization*/ 1.0))
// Two sockets with two cores execute 2000 MFlOps per second (500 ms)
assertEquals(500, clock.millis())
@@ -142,7 +142,7 @@ class SimMachineTest {
source.connect(machine.psu)
coroutineScope {
- launch { machine.runWorkload(SimFlopsWorkload(2_000, /*utilization*/ 1.0)) }
+ launch { machine.runWorkload(SimWorkloads.flops(2_000, /*utilization*/ 1.0)) }
yield()
assertAll(
@@ -304,7 +304,7 @@ class SimMachineTest {
try {
coroutineScope {
- launch { machine.runWorkload(SimFlopsWorkload(2_000, /*utilization*/ 1.0)) }
+ launch { machine.runWorkload(SimWorkloads.flops(2_000, /*utilization*/ 1.0)) }
cancel()
}
} catch (_: CancellationException) {
@@ -326,11 +326,11 @@ class SimMachineTest {
coroutineScope {
launch {
- machine.runWorkload(SimFlopsWorkload(2_000, /*utilization*/ 1.0))
+ machine.runWorkload(SimWorkloads.flops(2_000, /*utilization*/ 1.0))
}
assertThrows<IllegalStateException> {
- machine.runWorkload(SimFlopsWorkload(2_000, /*utilization*/ 1.0))
+ machine.runWorkload(SimWorkloads.flops(2_000, /*utilization*/ 1.0))
}
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt
index ba5a5c68..d11b91ee 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt
@@ -38,10 +38,9 @@ import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.runWorkload
-import org.opendc.simulator.compute.workload.SimFlopsWorkload
-import org.opendc.simulator.compute.workload.SimRuntimeWorkload
import org.opendc.simulator.compute.workload.SimTrace
import org.opendc.simulator.compute.workload.SimTraceFragment
+import org.opendc.simulator.compute.workload.SimWorkloads
import org.opendc.simulator.flow2.FlowEngine
import org.opendc.simulator.flow2.mux.FlowMultiplexerFactory
import org.opendc.simulator.kotlin.runSimulation
@@ -99,7 +98,7 @@ internal class SimSpaceSharedHypervisorTest {
@Test
fun testRuntimeWorkload() = runSimulation {
val duration = 5 * 60L * 1000
- val workload = SimRuntimeWorkload(duration, 1.0)
+ val workload = SimWorkloads.runtime(duration, 1.0)
val engine = FlowEngine.create(coroutineContext, clock)
val graph = engine.newGraph()
@@ -123,7 +122,7 @@ internal class SimSpaceSharedHypervisorTest {
@Test
fun testFlopsWorkload() = runSimulation {
val duration = 5 * 60L * 1000
- val workload = SimFlopsWorkload((duration * 3.2).toLong(), 1.0)
+ val workload = SimWorkloads.flops((duration * 3.2).toLong(), 1.0)
val engine = FlowEngine.create(coroutineContext, clock)
val graph = engine.newGraph()
@@ -155,13 +154,13 @@ internal class SimSpaceSharedHypervisorTest {
yield()
val vm = hypervisor.newMachine(machineModel)
- vm.runWorkload(SimRuntimeWorkload(duration, 1.0))
+ vm.runWorkload(SimWorkloads.runtime(duration, 1.0))
hypervisor.removeMachine(vm)
yield()
val vm2 = hypervisor.newMachine(machineModel)
- vm2.runWorkload(SimRuntimeWorkload(duration, 1.0))
+ vm2.runWorkload(SimWorkloads.runtime(duration, 1.0))
hypervisor.removeMachine(vm2)
machine.cancel()
@@ -184,7 +183,7 @@ internal class SimSpaceSharedHypervisorTest {
yield()
val vm = hypervisor.newMachine(machineModel)
- launch { vm.runWorkload(SimFlopsWorkload(10_000, 1.0)) }
+ launch { vm.runWorkload(SimWorkloads.runtime(10_000, 1.0)) }
yield()
assertAll(
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimChainWorkloadTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimChainWorkloadTest.kt
new file mode 100644
index 00000000..6bf05f65
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimChainWorkloadTest.kt
@@ -0,0 +1,176 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.compute.workload
+
+import io.mockk.every
+import io.mockk.mockk
+import io.mockk.spyk
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.BeforeEach
+import org.junit.jupiter.api.Test
+import org.opendc.simulator.compute.SimBareMetalMachine
+import org.opendc.simulator.compute.model.MachineModel
+import org.opendc.simulator.compute.model.MemoryUnit
+import org.opendc.simulator.compute.model.ProcessingNode
+import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.runWorkload
+import org.opendc.simulator.flow2.FlowEngine
+import org.opendc.simulator.kotlin.runSimulation
+
+/**
+ * Test suite for the [SimChainWorkload] class.
+ */
+class SimChainWorkloadTest {
+ private lateinit var machineModel: MachineModel
+
+ @BeforeEach
+ fun setUp() {
+ val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2)
+
+ machineModel = MachineModel(
+ /*cpus*/ List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 1000.0) },
+ /*memory*/ List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
+ )
+ }
+
+ @Test
+ fun testMultipleWorkloads() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val machine = SimBareMetalMachine.create(
+ graph,
+ machineModel
+ )
+
+ val workload =
+ SimWorkloads.chain(
+ SimRuntimeWorkload(1000, 1.0),
+ SimRuntimeWorkload(1000, 1.0)
+ )
+
+ machine.runWorkload(workload)
+
+ assertEquals(2000, clock.millis())
+ }
+
+ @Test
+ fun testStartFailure() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val machine = SimBareMetalMachine.create(
+ graph,
+ machineModel
+ )
+
+ val workloadA = mockk<SimWorkload>()
+ every { workloadA.onStart(any()) } throws IllegalStateException("Staged")
+ every { workloadA.onStop(any()) } returns Unit
+
+ val workload =
+ SimWorkloads.chain(
+ workloadA,
+ SimRuntimeWorkload(1000, 1.0)
+ )
+
+ machine.runWorkload(workload)
+
+ assertEquals(0, clock.millis())
+ }
+
+ @Test
+ fun testStartFailureSecond() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val machine = SimBareMetalMachine.create(
+ graph,
+ machineModel
+ )
+
+ val workloadA = mockk<SimWorkload>()
+ every { workloadA.onStart(any()) } throws IllegalStateException("Staged")
+ every { workloadA.onStop(any()) } returns Unit
+
+ val workload =
+ SimWorkloads.chain(
+ SimRuntimeWorkload(1000, 1.0),
+ workloadA,
+ SimRuntimeWorkload(1000, 1.0)
+ )
+
+ machine.runWorkload(workload)
+
+ assertEquals(1000, clock.millis())
+ }
+
+ @Test
+ fun testStopFailure() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val machine = SimBareMetalMachine.create(
+ graph,
+ machineModel
+ )
+
+ val workloadA = spyk<SimWorkload>(SimRuntimeWorkload(1000, 1.0))
+ every { workloadA.onStop(any()) } throws IllegalStateException("Staged")
+
+ val workload =
+ SimWorkloads.chain(
+ workloadA,
+ SimRuntimeWorkload(1000, 1.0)
+ )
+
+ machine.runWorkload(workload)
+
+ assertEquals(1000, clock.millis())
+ }
+
+ @Test
+ fun testStopFailureSecond() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val machine = SimBareMetalMachine.create(
+ graph,
+ machineModel
+ )
+
+ val workloadA = spyk<SimWorkload>(SimRuntimeWorkload(1000, 1.0))
+ every { workloadA.onStop(any()) } throws IllegalStateException("Staged")
+
+ val workload =
+ SimWorkloads.chain(
+ SimRuntimeWorkload(1000, 1.0),
+ workloadA,
+ SimRuntimeWorkload(1000, 1.0)
+ )
+
+ machine.runWorkload(workload)
+
+ assertEquals(2000, clock.millis())
+ }
+}
diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
index c7123000..b165418a 100644
--- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
+++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
@@ -119,7 +119,7 @@ internal class WorkflowServiceTest {
},
{ assertEquals(0, metrics.tasksRunning, "Not all started tasks finished") },
{ assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") },
- { assertEquals(46102707L, clock.millis()) { "Total duration incorrect" } }
+ { assertEquals(45977707L, clock.millis()) { "Total duration incorrect" } }
)
}
}