diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-10-28 16:46:47 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2022-10-28 16:46:47 +0200 |
| commit | b96acc687f59b698fbc4d4c984d77b008cd4051b (patch) | |
| tree | ff4d99454259b92ec4484433bb716acd6319faa1 | |
| parent | c4cfb6f6e0507f335fd88935857f20e88c34abd0 (diff) | |
| parent | 8bf940eb7b59b5e5e326cfc06d51bdb54393f33b (diff) | |
Support custom start-up and clean-up time for VMs (#112)
This pull request implements customizable startup and clean-up time for virtual machine.
We achieve this by implementing `SimWorkload` chaining and modelling start-up and
clean-up time using `SimWorkload` as well.
Implements #33
## Implementation Notes :hammer_and_pick:
* Store method parameters in class files
* Add completion parameter to startWorkload
* Provide workload constructors in SimWorkloads
* Add support for resetting machine context
* Add support for chaining workloads
* Use workload chaining for boot delay
* Model host boot time
* Do not suspend on guest start
* Use static logger field
## Breaking API Changes :warning:
* `SimMachine#startWorkload` now has a third parameter `completion` which is invoked when
the workload finishes executing (either due to failure or success).
* `SimFlopsWorkload` and `SimRuntimeWorkload` can be instantiated via `SimWorkloads`.
27 files changed, 767 insertions, 294 deletions
diff --git a/buildSrc/src/main/kotlin/java-conventions.gradle.kts b/buildSrc/src/main/kotlin/java-conventions.gradle.kts index a639a9e1..8857d4ab 100644 --- a/buildSrc/src/main/kotlin/java-conventions.gradle.kts +++ b/buildSrc/src/main/kotlin/java-conventions.gradle.kts @@ -34,3 +34,7 @@ java { sourceCompatibility = Libs.jvmTarget targetCompatibility = Libs.jvmTarget } + +tasks.withType<JavaCompile> { + options.compilerArgs.add("-parameters") +} diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt index fad8757e..efcc0f2c 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt @@ -70,11 +70,8 @@ public interface Host { /** * Register the specified [instance][server] on the host. - * - * Once the method returns, the instance should be running if [start] is true or else the instance should be - * stopped. */ - public suspend fun spawn(server: Server, start: Boolean = true) + public fun spawn(server: Server) /** * Determine whether the specified [instance][server] exists on the host. @@ -86,19 +83,19 @@ public interface Host { * * @throws IllegalArgumentException if the server is not present on the host. */ - public suspend fun start(server: Server) + public fun start(server: Server) /** * Stop the server [instance][server] if it is currently running on this host. * * @throws IllegalArgumentException if the server is not present on the host. */ - public suspend fun stop(server: Server) + public fun stop(server: Server) /** * Delete the specified [instance][server] on this host and cleanup all resources associated with it. */ - public suspend fun delete(server: Server) + public fun delete(server: Server) /** * Add a [HostListener] to this host. diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index 0fe016aa..b377c3e3 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -22,11 +22,6 @@ package org.opendc.compute.service.internal -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Job -import kotlinx.coroutines.cancel -import kotlinx.coroutines.isActive -import kotlinx.coroutines.launch import mu.KotlinLogging import org.opendc.common.util.Pacer import org.opendc.compute.api.ComputeClient @@ -53,23 +48,18 @@ import kotlin.math.max /** * Internal implementation of the OpenDC Compute service. * - * @param context The [CoroutineContext] to use in the service. + * @param coroutineContext The [CoroutineContext] to use in the service. * @param clock The clock instance to use. * @param scheduler The scheduler implementation to use. * @param schedulingQuantum The interval between scheduling cycles. */ internal class ComputeServiceImpl( - private val context: CoroutineContext, + coroutineContext: CoroutineContext, private val clock: Clock, private val scheduler: ComputeScheduler, schedulingQuantum: Duration ) : ComputeService, HostListener { /** - * The [CoroutineScope] of the service bounded by the lifecycle of the service. - */ - private val scope = CoroutineScope(context + Job()) - - /** * The logger instance of this server. */ private val logger = KotlinLogging.logger {} @@ -115,6 +105,9 @@ internal class ComputeServiceImpl( private val serverById = mutableMapOf<UUID, InternalServer>() override val servers: MutableList<Server> = mutableListOf() + override val hosts: Set<Host> + get() = hostToView.keys + private var maxCores = 0 private var maxMemory = 0L private var _attemptsSuccess = 0L @@ -122,17 +115,15 @@ internal class ComputeServiceImpl( private var _attemptsError = 0L private var _serversPending = 0 private var _serversActive = 0 + private var isClosed = false /** * The [Pacer] to use for scheduling the scheduler cycles. */ - private val pacer = Pacer(scope.coroutineContext, clock, schedulingQuantum.toMillis()) { doSchedule() } - - override val hosts: Set<Host> - get() = hostToView.keys + private val pacer = Pacer(coroutineContext, clock, schedulingQuantum.toMillis()) { doSchedule() } override fun newClient(): ComputeClient { - check(scope.isActive) { "Service is already closed" } + check(!isClosed) { "Service is already closed" } return object : ComputeClient { private var isClosed: Boolean = false @@ -285,7 +276,12 @@ internal class ComputeServiceImpl( } override fun close() { - scope.cancel() + if (isClosed) { + return + } + + isClosed = true + pacer.cancel() } override fun getSchedulerStats(): SchedulerStats { @@ -379,29 +375,23 @@ internal class ComputeServiceImpl( logger.info { "Assigned server $server to host $host." } - // Speculatively update the hypervisor view information to prevent other images in the queue from - // deciding on stale values. - hv.instanceCount++ - hv.provisionedCores += server.flavor.cpuCount - hv.availableMemory -= server.flavor.memorySize // XXX Temporary hack + try { + server.host = host - scope.launch { - try { - server.host = host - host.spawn(server) - activeServers[server] = host + host.spawn(server) + host.start(server) - _serversActive++ - _attemptsSuccess++ - } catch (e: Throwable) { - logger.error(e) { "Failed to deploy VM" } + _serversActive++ + _attemptsSuccess++ - hv.instanceCount-- - hv.provisionedCores -= server.flavor.cpuCount - hv.availableMemory += server.flavor.memorySize + hv.instanceCount++ + hv.provisionedCores += server.flavor.cpuCount + hv.availableMemory -= server.flavor.memorySize - _attemptsError++ - } + activeServers[server] = host + } catch (e: Throwable) { + logger.error(e) { "Failed to deploy VM" } + _attemptsError++ } } } diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt index 73e9b3d7..c18709f3 100644 --- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt +++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt @@ -348,7 +348,7 @@ internal class ComputeServiceTest { // Start server server.start() delay(5L * 60 * 1000) - coVerify { host.spawn(capture(slot), true) } + coVerify { host.spawn(capture(slot)) } listeners.forEach { it.onStateChanged(host, slot.captured, ServerState.RUNNING) } @@ -376,7 +376,7 @@ internal class ComputeServiceTest { every { host.state } returns HostState.UP every { host.canFit(any()) } returns true every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) } - coEvery { host.spawn(any(), true) } throws IllegalStateException() + coEvery { host.spawn(any()) } throws IllegalStateException() service.addHost(host) diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index c07649bd..b3e56f38 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -22,7 +22,6 @@ package org.opendc.compute.simulator -import kotlinx.coroutines.yield import org.opendc.compute.api.Flavor import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState @@ -34,6 +33,7 @@ import org.opendc.compute.service.driver.telemetry.GuestCpuStats import org.opendc.compute.service.driver.telemetry.GuestSystemStats import org.opendc.compute.service.driver.telemetry.HostCpuStats import org.opendc.compute.service.driver.telemetry.HostSystemStats +import org.opendc.compute.simulator.internal.DefaultWorkloadMapper import org.opendc.compute.simulator.internal.Guest import org.opendc.compute.simulator.internal.GuestListener import org.opendc.simulator.compute.SimBareMetalMachine @@ -44,30 +44,37 @@ import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.simulator.flow2.FlowGraph +import org.opendc.simulator.compute.workload.SimWorkloads +import java.time.Clock import java.time.Duration import java.time.Instant import java.util.UUID -import kotlin.coroutines.CoroutineContext +import java.util.function.Supplier /** - * A [Host] that is simulates virtual machines on a physical machine using [SimHypervisor]. + * A [Host] implementation that simulates virtual machines on a physical machine using [SimHypervisor]. + * + * @param uid The unique identifier of the host. + * @param name The name of the host. + * @param meta The metadata of the host. + * @param clock The (virtual) clock used to track time. + * @param machine The [SimBareMetalMachine] on which the host runs. + * @param hypervisor The [SimHypervisor] to run on top of the machine. + * @param mapper A [SimWorkloadMapper] to map a [Server] to a [SimWorkload]. + * @param bootModel A [Supplier] providing the [SimWorkload] to execute during the boot procedure of the hypervisor. + * @param optimize A flag to indicate to optimize the machine models of the virtual machines. */ public class SimHost( override val uid: UUID, override val name: String, override val meta: Map<String, Any>, - private val context: CoroutineContext, - graph: FlowGraph, + private val clock: Clock, private val machine: SimBareMetalMachine, private val hypervisor: SimHypervisor, - private val mapper: SimWorkloadMapper = SimMetaWorkloadMapper(), + private val mapper: SimWorkloadMapper = DefaultWorkloadMapper, + private val bootModel: Supplier<SimWorkload?> = Supplier { null }, private val optimize: Boolean = false ) : Host, AutoCloseable { - /** - * The clock instance used by the host. - */ - private val clock = graph.engine.clock /** * The event listeners registered with this host. @@ -124,13 +131,12 @@ public class SimHost( return sufficientMemory && enoughCpus && canFit } - override suspend fun spawn(server: Server, start: Boolean) { - val guest = guests.computeIfAbsent(server) { key -> + override fun spawn(server: Server) { + guests.computeIfAbsent(server) { key -> require(canFit(key)) { "Server does not fit" } val machine = hypervisor.newMachine(key.flavor.toMachineModel()) val newGuest = Guest( - context, clock, this, hypervisor, @@ -143,27 +149,23 @@ public class SimHost( _guests.add(newGuest) newGuest } - - if (start) { - guest.start() - } } override fun contains(server: Server): Boolean { return server in guests } - override suspend fun start(server: Server) { + override fun start(server: Server) { val guest = requireNotNull(guests[server]) { "Unknown server ${server.uid} at host $uid" } guest.start() } - override suspend fun stop(server: Server) { + override fun stop(server: Server) { val guest = requireNotNull(guests[server]) { "Unknown server ${server.uid} at host $uid" } guest.stop() } - override suspend fun delete(server: Server) { + override fun delete(server: Server) { val guest = guests[server] ?: return guest.delete() } @@ -251,7 +253,7 @@ public class SimHost( override fun toString(): String = "SimHost[uid=$uid,name=$name,model=$model]" - public suspend fun fail() { + public fun fail() { reset(HostState.ERROR) for (guest in _guests) { @@ -259,55 +261,54 @@ public class SimHost( } } - public suspend fun recover() { + public fun recover() { updateUptime() launch() - - // Wait for the hypervisor to launch before recovering the guests - yield() - - for (guest in _guests) { - guest.recover() - } } /** - * The [Job] that represents the machine running the hypervisor. + * The [SimMachineContext] that represents the machine running the hypervisor. */ - private var _ctx: SimMachineContext? = null + private var ctx: SimMachineContext? = null /** * Launch the hypervisor. */ private fun launch() { - check(_ctx == null) { "Concurrent hypervisor running" } - - // Launch hypervisor onto machine - _ctx = machine.startWorkload( - object : SimWorkload { - override fun onStart(ctx: SimMachineContext) { - try { - _bootTime = clock.instant() - _state = HostState.UP - hypervisor.onStart(ctx) - } catch (cause: Throwable) { - _state = HostState.ERROR - _ctx = null - throw cause + check(ctx == null) { "Concurrent hypervisor running" } + + val bootWorkload = bootModel.get() + val hypervisor = hypervisor + val hypervisorWorkload = object : SimWorkload { + override fun onStart(ctx: SimMachineContext) { + try { + _bootTime = clock.instant() + _state = HostState.UP + hypervisor.onStart(ctx) + + // Recover the guests that were running on the hypervisor. + for (guest in _guests) { + guest.recover() } + } catch (cause: Throwable) { + _state = HostState.ERROR + throw cause } + } - override fun onStop(ctx: SimMachineContext) { - try { - hypervisor.onStop(ctx) - } finally { - _ctx = null - } - } - }, - emptyMap() - ) + override fun onStop(ctx: SimMachineContext) { + hypervisor.onStop(ctx) + } + } + + val workload = if (bootWorkload != null) SimWorkloads.chain(bootWorkload, hypervisorWorkload) else hypervisorWorkload + + // Launch hypervisor onto machine + ctx = machine.startWorkload(workload, emptyMap()) { cause -> + _state = if (cause != null) HostState.ERROR else HostState.DOWN + ctx = null + } } /** @@ -317,7 +318,7 @@ public class SimHost( updateUptime() // Stop the hypervisor - _ctx?.shutdown() + ctx?.shutdown() _state = state } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimWorkloadMapper.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimWorkloadMapper.kt index 7082c5cf..83baa61a 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimWorkloadMapper.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimWorkloadMapper.kt @@ -22,6 +22,7 @@ package org.opendc.compute.simulator +import org.opendc.compute.api.Image import org.opendc.compute.api.Server import org.opendc.simulator.compute.workload.SimWorkload diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.java b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/DefaultWorkloadMapper.kt index f0e2561f..c5293a8d 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.java +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/DefaultWorkloadMapper.kt @@ -20,41 +20,25 @@ * SOFTWARE. */ -package org.opendc.simulator.compute.workload; +package org.opendc.compute.simulator.internal -import java.util.HashSet; -import org.opendc.simulator.compute.SimMachineContext; +import org.opendc.compute.api.Server +import org.opendc.compute.simulator.SimMetaWorkloadMapper +import org.opendc.compute.simulator.SimWorkloadMapper +import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.simulator.compute.workload.SimWorkloads +import java.time.Duration /** - * A helper class to manage the lifecycle of a {@link SimWorkload}. + * A [SimWorkloadMapper] to introduces a boot delay of 1 ms. This object exists to retain the old behavior while + * introducing the possibility of adding custom boot delays. */ -public final class SimWorkloadLifecycle { - private final SimMachineContext ctx; - private final HashSet<Runnable> waiting = new HashSet<>(); +internal object DefaultWorkloadMapper : SimWorkloadMapper { + private val delegate = SimMetaWorkloadMapper() - /** - * Construct a {@link SimWorkloadLifecycle} instance. - * - * @param ctx The {@link SimMachineContext} of the workload. - */ - public SimWorkloadLifecycle(SimMachineContext ctx) { - this.ctx = ctx; - } - - /** - * Register a "completer" callback that must be invoked before ending the lifecycle of the workload. - */ - public Runnable newCompleter() { - Runnable completer = new Runnable() { - @Override - public void run() { - final HashSet<Runnable> waiting = SimWorkloadLifecycle.this.waiting; - if (waiting.remove(this) && waiting.isEmpty()) { - ctx.shutdown(); - } - } - }; - waiting.add(completer); - return completer; + override fun createWorkload(server: Server): SimWorkload { + val workload = delegate.createWorkload(server) + val bootWorkload = SimWorkloads.runtime(Duration.ofMillis(1), 0.8) + return SimWorkloads.chain(bootWorkload, workload) } } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt index 790d8047..ca947625 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt @@ -22,12 +22,6 @@ package org.opendc.compute.simulator.internal -import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Job -import kotlinx.coroutines.cancel -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch import mu.KotlinLogging import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState @@ -35,20 +29,17 @@ import org.opendc.compute.service.driver.telemetry.GuestCpuStats import org.opendc.compute.service.driver.telemetry.GuestSystemStats import org.opendc.compute.simulator.SimHost import org.opendc.compute.simulator.SimWorkloadMapper +import org.opendc.simulator.compute.SimMachineContext import org.opendc.simulator.compute.kernel.SimHypervisor import org.opendc.simulator.compute.kernel.SimVirtualMachine -import org.opendc.simulator.compute.runWorkload -import org.opendc.simulator.compute.workload.SimWorkload import java.time.Clock import java.time.Duration import java.time.Instant -import kotlin.coroutines.CoroutineContext /** * A virtual machine instance that is managed by a [SimHost]. */ internal class Guest( - context: CoroutineContext, private val clock: Clock, val host: SimHost, private val hypervisor: SimHypervisor, @@ -58,35 +49,26 @@ internal class Guest( val machine: SimVirtualMachine ) { /** - * The [CoroutineScope] of the guest. - */ - private val scope: CoroutineScope = CoroutineScope(context + Job()) - - /** - * The logger instance of this guest. - */ - private val logger = KotlinLogging.logger {} - - /** * The state of the [Guest]. * * [ServerState.PROVISIONING] is an invalid value for a guest, since it applies before the host is selected for * a server. */ var state: ServerState = ServerState.TERMINATED + private set /** * Start the guest. */ - suspend fun start() { + fun start() { when (state) { ServerState.TERMINATED, ServerState.ERROR -> { - logger.info { "User requested to start server ${server.uid}" } + LOGGER.info { "User requested to start server ${server.uid}" } doStart() } ServerState.RUNNING -> return ServerState.DELETED -> { - logger.warn { "User tried to start deleted server" } + LOGGER.warn { "User tried to start deleted server" } throw IllegalArgumentException("Server is deleted") } else -> assert(false) { "Invalid state transition" } @@ -96,7 +78,7 @@ internal class Guest( /** * Stop the guest. */ - suspend fun stop() { + fun stop() { when (state) { ServerState.RUNNING -> doStop(ServerState.TERMINATED) ServerState.ERROR -> doRecover() @@ -111,12 +93,11 @@ internal class Guest( * This operation will stop the guest if it is running on the host and remove all resources associated with the * guest. */ - suspend fun delete() { + fun delete() { stop() state = ServerState.DELETED hypervisor.removeMachine(machine) - scope.cancel() } /** @@ -124,7 +105,7 @@ internal class Guest( * * This operation forcibly stops the guest and puts the server into an error state. */ - suspend fun fail() { + fun fail() { if (state != ServerState.RUNNING) { return } @@ -135,7 +116,7 @@ internal class Guest( /** * Recover the guest if it is in an error state. */ - suspend fun recover() { + fun recover() { if (state != ServerState.ERROR) { return } @@ -175,37 +156,34 @@ internal class Guest( } /** - * The [Job] representing the current active virtual machine instance or `null` if no virtual machine is active. + * The [SimMachineContext] representing the current active virtual machine instance or `null` if no virtual machine + * is active. */ - private var job: Job? = null + private var ctx: SimMachineContext? = null /** * Launch the guest on the simulated */ - private suspend fun doStart() { - assert(job == null) { "Concurrent job running" } - val workload = mapper.createWorkload(server) - - val job = scope.launch { runMachine(workload) } - this.job = job + private fun doStart() { + assert(ctx == null) { "Concurrent job running" } - state = ServerState.RUNNING onStart() - job.invokeOnCompletion { cause -> - this.job = null - onStop(if (cause != null && cause !is CancellationException) ServerState.ERROR else ServerState.TERMINATED) + val workload = mapper.createWorkload(server) + val meta = mapOf("driver" to host, "server" to server) + server.meta + ctx = machine.startWorkload(workload, meta) { cause -> + onStop(if (cause != null) ServerState.ERROR else ServerState.TERMINATED) + ctx = null } } /** * Attempt to stop the server and put it into [target] state. */ - private suspend fun doStop(target: ServerState) { - assert(job != null) { "Invalid job state" } - val job = job ?: return - job.cancel() - job.join() + private fun doStop(target: ServerState) { + assert(ctx != null) { "Invalid job state" } + val ctx = ctx ?: return + ctx.shutdown() state = target } @@ -218,14 +196,6 @@ internal class Guest( } /** - * Converge the process that models the virtual machine lifecycle as a coroutine. - */ - private suspend fun runMachine(workload: SimWorkload) { - delay(1) // TODO Introduce model for boot time - machine.runWorkload(workload, mapOf("driver" to host, "server" to server) + server.meta) - } - - /** * This method is invoked when the guest was started on the host and has booted into a running state. */ private fun onStart() { @@ -264,4 +234,9 @@ internal class Guest( _downtime += duration } } + + private companion object { + @JvmStatic + private val LOGGER = KotlinLogging.logger {} + } } diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index a5999bcd..fc581d3e 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -24,7 +24,6 @@ package org.opendc.compute.simulator import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay -import kotlinx.coroutines.launch import kotlinx.coroutines.suspendCancellableCoroutine import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach @@ -70,6 +69,73 @@ internal class SimHostTest { } /** + * Test a single virtual machine hosted by the hypervisor. + */ + @Test + fun testSingle() = runSimulation { + val duration = 5 * 60L + + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val machine = SimBareMetalMachine.create(graph, machineModel) + val hypervisor = SimHypervisor.create(FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1)) + + val host = SimHost( + uid = UUID.randomUUID(), + name = "test", + meta = emptyMap(), + clock, + machine, + hypervisor + ) + val vmImage = MockImage( + UUID.randomUUID(), + "<unnamed>", + emptyMap(), + mapOf( + "workload" to + SimTrace.ofFragments( + SimTraceFragment(0, duration * 1000, 2 * 28.0, 2), + SimTraceFragment(duration * 1000, duration * 1000, 2 * 3500.0, 2), + SimTraceFragment(duration * 2000, duration * 1000, 0.0, 2), + SimTraceFragment(duration * 3000, duration * 1000, 2 * 183.0, 2) + ).createWorkload(1) + ) + ) + + val flavor = MockFlavor(2, 0) + + suspendCancellableCoroutine { cont -> + host.addListener(object : HostListener { + private var finished = 0 + + override fun onStateChanged(host: Host, server: Server, newState: ServerState) { + if (newState == ServerState.TERMINATED && ++finished == 1) { + cont.resume(Unit) + } + } + }) + val server = MockServer(UUID.randomUUID(), "a", flavor, vmImage) + host.spawn(server) + host.start(server) + } + + // Ensure last cycle is collected + delay(1000L * duration) + host.close() + + val cpuStats = host.getCpuStats() + + assertAll( + { assertEquals(639, cpuStats.activeTime, "Active time does not match") }, + { assertEquals(2360, cpuStats.idleTime, "Idle time does not match") }, + { assertEquals(56, cpuStats.stealTime, "Steal time does not match") }, + { assertEquals(1500001, clock.millis()) } + ) + } + + /** * Test overcommitting of resources by the hypervisor. */ @Test @@ -86,8 +152,7 @@ internal class SimHostTest { uid = UUID.randomUUID(), name = "test", meta = emptyMap(), - coroutineContext, - graph, + clock, machine, hypervisor ) @@ -123,9 +188,6 @@ internal class SimHostTest { val flavor = MockFlavor(2, 0) coroutineScope { - launch { host.spawn(MockServer(UUID.randomUUID(), "a", flavor, vmImageA)) } - launch { host.spawn(MockServer(UUID.randomUUID(), "b", flavor, vmImageB)) } - suspendCancellableCoroutine { cont -> host.addListener(object : HostListener { private var finished = 0 @@ -136,6 +198,13 @@ internal class SimHostTest { } } }) + val serverA = MockServer(UUID.randomUUID(), "a", flavor, vmImageA) + host.spawn(serverA) + val serverB = MockServer(UUID.randomUUID(), "b", flavor, vmImageB) + host.spawn(serverB) + + host.start(serverA) + host.start(serverB) } } @@ -169,8 +238,7 @@ internal class SimHostTest { uid = UUID.randomUUID(), name = "test", meta = emptyMap(), - coroutineContext, - graph, + clock, machine, hypervisor ) @@ -193,12 +261,13 @@ internal class SimHostTest { coroutineScope { host.spawn(server) + host.start(server) delay(5000L) host.fail() delay(duration * 1000) host.recover() - suspendCancellableCoroutine<Unit> { cont -> + suspendCancellableCoroutine { cont -> host.addListener(object : HostListener { override fun onStateChanged(host: Host, server: Server, newState: ServerState) { if (newState == ServerState.TERMINATED) { diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 47058caa..77b0d09f 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -120,9 +120,9 @@ class CapelinIntegrationTest { { assertEquals(0, monitor.serversActive, "All VMs should finish after a run") }, { assertEquals(0, monitor.attemptsFailure, "No VM should be unscheduled") }, { assertEquals(0, monitor.serversPending, "No VM should not be in the queue") }, - { assertEquals(223394204, monitor.idleTime) { "Incorrect idle time" } }, - { assertEquals(66976984, monitor.activeTime) { "Incorrect active time" } }, - { assertEquals(3160316, monitor.stealTime) { "Incorrect steal time" } }, + { assertEquals(223394101, monitor.idleTime) { "Incorrect idle time" } }, + { assertEquals(66977086, monitor.activeTime) { "Incorrect active time" } }, + { assertEquals(3160276, monitor.stealTime) { "Incorrect steal time" } }, { assertEquals(0, monitor.lostTime) { "Incorrect lost time" } }, { assertEquals(5.84093E9, monitor.energyUsage, 1E4) { "Incorrect power draw" } } ) @@ -160,8 +160,8 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(10999504, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(9741294, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(10999514, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(9741285, monitor.activeTime) { "Active time incorrect" } }, { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, { assertEquals(7.0116E8, monitor.energyUsage, 1E4) { "Incorrect power draw" } } @@ -199,10 +199,10 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(6027979, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(14712820, monitor.activeTime) { "Active time incorrect" } }, - { assertEquals(12532979, monitor.stealTime) { "Steal time incorrect" } }, - { assertEquals(445913, monitor.lostTime) { "Lost time incorrect" } } + { assertEquals(6028018, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(14712781, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(12532934, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(424267, monitor.lostTime) { "Lost time incorrect" } } ) } @@ -229,8 +229,8 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(10085103, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(8539212, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(10085111, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(8539204, monitor.activeTime) { "Active time incorrect" } }, { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, { assertEquals(2328039558, monitor.uptime) { "Uptime incorrect" } } diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt index 292be929..e224fb84 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt @@ -58,8 +58,7 @@ public class HostsProvisioningStep internal constructor( spec.uid, spec.name, spec.meta, - ctx.coroutineContext, - graph, + ctx.clock, machine, hypervisor, optimize = optimize diff --git a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt index 4dc3a775..b622362a 100644 --- a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt +++ b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt @@ -27,7 +27,7 @@ package org.opendc.experiments.workflow import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.launch -import org.opendc.simulator.compute.workload.SimFlopsWorkload +import org.opendc.simulator.compute.workload.SimWorkloads import org.opendc.trace.Trace import org.opendc.trace.conv.TABLE_TASKS import org.opendc.trace.conv.TASK_ALLOC_NCPUS @@ -74,7 +74,7 @@ public fun Trace.toJobs(): List<Job> { val submitTime = reader.getInstant(TASK_SUBMIT_TIME)!! val runtime = reader.getDuration(TASK_RUNTIME)!! val flops: Long = 4000 * runtime.seconds * grantedCpus - val workload = SimFlopsWorkload(flops, 1.0) + val workload = SimWorkloads.flops(flops, 1.0) val task = Task( UUID(0L, id), "<unnamed>", diff --git a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt index aac54f57..6baee7ea 100644 --- a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt +++ b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt @@ -40,8 +40,8 @@ import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.compute.workload.SimRuntimeWorkload import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.simulator.compute.workload.SimWorkloads import org.opendc.simulator.kotlin.runSimulation import java.time.Duration import java.util.Random @@ -66,7 +66,7 @@ internal class SimFaaSServiceTest { @Test fun testSmoke() = runSimulation { val random = Random(0) - val workload = spyk(object : SimFaaSWorkload, SimWorkload by SimRuntimeWorkload(1000, 1.0) { + val workload = spyk(object : SimFaaSWorkload, SimWorkload by SimWorkloads.runtime(1000, 1.0) { override suspend fun invoke() { delay(random.nextInt(1000).toLong()) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimAbstractMachine.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimAbstractMachine.java index cf5aed03..d968d884 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimAbstractMachine.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimAbstractMachine.java @@ -25,6 +25,7 @@ package org.opendc.simulator.compute; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import org.opendc.simulator.compute.device.SimNetworkAdapter; import org.opendc.simulator.compute.model.MachineModel; import org.opendc.simulator.compute.model.MemoryUnit; @@ -62,12 +63,13 @@ public abstract class SimAbstractMachine implements SimMachine { } @Override - public final SimMachineContext startWorkload(SimWorkload workload, Map<String, Object> meta) { + public final SimMachineContext startWorkload( + SimWorkload workload, Map<String, Object> meta, Consumer<Exception> completion) { if (activeContext != null) { throw new IllegalStateException("A machine cannot run multiple workloads concurrently"); } - final Context ctx = createContext(workload, new HashMap<>(meta)); + final Context ctx = createContext(workload, new HashMap<>(meta), completion); ctx.start(); return ctx; } @@ -83,10 +85,12 @@ public abstract class SimAbstractMachine implements SimMachine { /** * Construct a new {@link Context} instance representing the active execution. * - * @param workload The workload to start on the machine. - * @param meta The metadata to pass to the workload. + * @param workload The workload to start on the machine. + * @param meta The metadata to pass to the workload. + * @param completion A block that is invoked when the workload completes carrying an exception if thrown by the workload. */ - protected abstract Context createContext(SimWorkload workload, Map<String, Object> meta); + protected abstract Context createContext( + SimWorkload workload, Map<String, Object> meta, Consumer<Exception> completion); /** * Return the active {@link Context} instance (if any). @@ -102,7 +106,9 @@ public abstract class SimAbstractMachine implements SimMachine { private final SimAbstractMachine machine; private final SimWorkload workload; private final Map<String, Object> meta; + private final Consumer<Exception> completion; private boolean isClosed; + private Exception cause; /** * Construct a new {@link Context} instance. @@ -110,11 +116,17 @@ public abstract class SimAbstractMachine implements SimMachine { * @param machine The {@link SimAbstractMachine} to which the context belongs. * @param workload The {@link SimWorkload} to which the context belongs. * @param meta The metadata passed to the context. + * @param completion A block that is invoked when the workload completes carrying an exception if thrown by the workload. */ - public Context(SimAbstractMachine machine, SimWorkload workload, Map<String, Object> meta) { + public Context( + SimAbstractMachine machine, + SimWorkload workload, + Map<String, Object> meta, + Consumer<Exception> completion) { this.machine = machine; this.workload = workload; this.meta = meta; + this.completion = completion; } @Override @@ -123,6 +135,28 @@ public abstract class SimAbstractMachine implements SimMachine { } @Override + public void reset() { + final FlowGraph graph = getMemory().getInput().getGraph(); + + for (SimProcessingUnit cpu : getCpus()) { + final Inlet inlet = cpu.getInput(); + graph.disconnect(inlet); + } + + graph.disconnect(getMemory().getInput()); + + for (SimNetworkInterface ifx : getNetworkInterfaces()) { + ((NetworkAdapter) ifx).disconnect(); + } + + for (SimStorageInterface storage : getStorageInterfaces()) { + StorageDevice impl = (StorageDevice) storage; + graph.disconnect(impl.getRead()); + graph.disconnect(impl.getWrite()); + } + } + + @Override public final void shutdown() { if (isClosed) { return; @@ -136,11 +170,19 @@ public abstract class SimAbstractMachine implements SimMachine { // Cancel all the resources associated with the machine doCancel(); + Exception e = this.cause; + try { workload.onStop(this); } catch (Exception cause) { - LOGGER.warn("Workload failed during onStop callback", cause); + if (e != null) { + e.addSuppressed(cause); + } else { + e = cause; + } } + + completion.accept(e); } /** @@ -151,7 +193,7 @@ public abstract class SimAbstractMachine implements SimMachine { machine.activeContext = this; workload.onStart(this); } catch (Exception cause) { - LOGGER.warn("Workload failed during onStart callback", cause); + this.cause = cause; shutdown(); } } @@ -160,24 +202,7 @@ public abstract class SimAbstractMachine implements SimMachine { * Run the stop procedures for the resources associated with the machine. */ protected void doCancel() { - final FlowGraph graph = getMemory().getInput().getGraph(); - - for (SimProcessingUnit cpu : getCpus()) { - final Inlet inlet = cpu.getInput(); - graph.disconnect(inlet); - } - - graph.disconnect(getMemory().getInput()); - - for (SimNetworkInterface ifx : getNetworkInterfaces()) { - ((NetworkAdapter) ifx).disconnect(); - } - - for (SimStorageInterface storage : getStorageInterfaces()) { - StorageDevice impl = (StorageDevice) storage; - graph.disconnect(impl.getRead()); - graph.disconnect(impl.getWrite()); - } + reset(); } @Override diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimBareMetalMachine.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimBareMetalMachine.java index aa7502d6..11356eb2 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimBareMetalMachine.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimBareMetalMachine.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import org.opendc.simulator.compute.device.SimPeripheral; import org.opendc.simulator.compute.model.MachineModel; import org.opendc.simulator.compute.model.ProcessingUnit; @@ -39,7 +40,7 @@ import org.opendc.simulator.flow2.Inlet; * * <p> * A {@link SimBareMetalMachine} is a stateful object, and you should be careful when operating this object concurrently. For - * example, the class expects only a single concurrent call to {@link #startWorkload(SimWorkload, Map)}. + * example, the class expects only a single concurrent call to {@link #startWorkload(SimWorkload, Map, Consumer)} )}. */ public final class SimBareMetalMachine extends SimAbstractMachine { /** @@ -192,8 +193,9 @@ public final class SimBareMetalMachine extends SimAbstractMachine { } @Override - protected SimAbstractMachine.Context createContext(SimWorkload workload, Map<String, Object> meta) { - return new Context(this, workload, meta); + protected SimAbstractMachine.Context createContext( + SimWorkload workload, Map<String, Object> meta, Consumer<Exception> completion) { + return new Context(this, workload, meta, completion); } /** @@ -206,8 +208,12 @@ public final class SimBareMetalMachine extends SimAbstractMachine { private final List<NetworkAdapter> net; private final List<StorageDevice> disk; - private Context(SimBareMetalMachine machine, SimWorkload workload, Map<String, Object> meta) { - super(machine, workload, meta); + private Context( + SimBareMetalMachine machine, + SimWorkload workload, + Map<String, Object> meta, + Consumer<Exception> completion) { + super(machine, workload, meta, completion); this.graph = machine.graph; this.cpus = machine.cpus; diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimMachine.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimMachine.java index 59599875..1f86aa02 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimMachine.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimMachine.java @@ -24,6 +24,7 @@ package org.opendc.simulator.compute; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import org.opendc.simulator.compute.device.SimPeripheral; import org.opendc.simulator.compute.model.MachineModel; import org.opendc.simulator.compute.workload.SimWorkload; @@ -47,10 +48,11 @@ public interface SimMachine { * * @param workload The workload to start on the machine. * @param meta The metadata to pass to the workload. + * @param completion A block that is invoked when the workload completes carrying an exception if thrown by the workload. * @return A {@link SimMachineContext} that represents the execution context for the workload. * @throws IllegalStateException if a workload is already active on the machine or if the machine is closed. */ - SimMachineContext startWorkload(SimWorkload workload, Map<String, Object> meta); + SimMachineContext startWorkload(SimWorkload workload, Map<String, Object> meta, Consumer<Exception> completion); /** * Cancel the active workload on this machine (if any). diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimMachineContext.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimMachineContext.java index f6a3bd38..5d08e2b7 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimMachineContext.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimMachineContext.java @@ -68,6 +68,11 @@ public interface SimMachineContext { List<? extends SimStorageInterface> getStorageInterfaces(); /** + * Reset all resources of the machine. + */ + void reset(); + + /** * Shutdown the workload. */ void shutdown(); diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/kernel/SimHypervisor.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/kernel/SimHypervisor.java index 6e295837..f03a0c20 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/kernel/SimHypervisor.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/kernel/SimHypervisor.java @@ -28,6 +28,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.SplittableRandom; +import java.util.function.Consumer; import java.util.stream.Collectors; import org.opendc.simulator.compute.SimAbstractMachine; import org.opendc.simulator.compute.SimMachine; @@ -471,7 +472,8 @@ public final class SimHypervisor implements SimWorkload { } @Override - protected Context createContext(SimWorkload workload, Map<String, Object> meta) { + protected Context createContext( + SimWorkload workload, Map<String, Object> meta, Consumer<Exception> completion) { if (isClosed) { throw new IllegalStateException("Virtual machine does not exist anymore"); } @@ -482,7 +484,15 @@ public final class SimHypervisor implements SimWorkload { } return new VmContext( - context, this, random, interferenceDomain, counters, SimHypervisor.this.counters, workload, meta); + context, + this, + random, + interferenceDomain, + counters, + SimHypervisor.this.counters, + workload, + meta, + completion); } @Override @@ -538,8 +548,9 @@ public final class SimHypervisor implements SimWorkload { VmCounters vmCounters, HvCounters hvCounters, SimWorkload workload, - Map<String, Object> meta) { - super(machine, workload, meta); + Map<String, Object> meta, + Consumer<Exception> completion) { + super(machine, workload, meta, completion); this.context = context; this.random = random; diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java new file mode 100644 index 00000000..9304122a --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java @@ -0,0 +1,182 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.workload; + +import java.util.List; +import java.util.Map; +import org.opendc.simulator.compute.SimMachineContext; +import org.opendc.simulator.compute.SimMemory; +import org.opendc.simulator.compute.SimNetworkInterface; +import org.opendc.simulator.compute.SimProcessingUnit; +import org.opendc.simulator.compute.SimStorageInterface; +import org.opendc.simulator.flow2.FlowGraph; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link SimWorkload} that composes two {@link SimWorkload}s. + */ +final class SimChainWorkload implements SimWorkload { + private static final Logger LOGGER = LoggerFactory.getLogger(SimChainWorkload.class); + + private final SimWorkload[] workloads; + private int activeWorkloadIndex; + + private Context activeContext; + + /** + * Construct a {@link SimChainWorkload} instance. + * + * @param workloads The workloads to chain. + */ + SimChainWorkload(SimWorkload... workloads) { + this.workloads = workloads; + } + + @Override + public void onStart(SimMachineContext ctx) { + final SimWorkload[] workloads = this.workloads; + final int activeWorkloadIndex = this.activeWorkloadIndex; + + if (activeWorkloadIndex >= workloads.length) { + return; + } + + final Context context = new Context(ctx); + activeContext = context; + + if (!context.doStart(workloads[activeWorkloadIndex])) { + ctx.shutdown(); + } + } + + @Override + public void onStop(SimMachineContext ctx) { + final SimWorkload[] workloads = this.workloads; + final int activeWorkloadIndex = this.activeWorkloadIndex; + + if (activeWorkloadIndex >= workloads.length) { + return; + } + + final Context context = activeContext; + activeContext = null; + + context.doStop(workloads[activeWorkloadIndex]); + } + + /** + * A {@link SimMachineContext} that intercepts the shutdown calls. + */ + private class Context implements SimMachineContext { + private final SimMachineContext ctx; + + private Context(SimMachineContext ctx) { + this.ctx = ctx; + } + + @Override + public FlowGraph getGraph() { + return ctx.getGraph(); + } + + @Override + public Map<String, Object> getMeta() { + return ctx.getMeta(); + } + + @Override + public List<? extends SimProcessingUnit> getCpus() { + return ctx.getCpus(); + } + + @Override + public SimMemory getMemory() { + return ctx.getMemory(); + } + + @Override + public List<? extends SimNetworkInterface> getNetworkInterfaces() { + return ctx.getNetworkInterfaces(); + } + + @Override + public List<? extends SimStorageInterface> getStorageInterfaces() { + return ctx.getStorageInterfaces(); + } + + @Override + public void reset() { + ctx.reset(); + } + + @Override + public void shutdown() { + final SimWorkload[] workloads = SimChainWorkload.this.workloads; + final int activeWorkloadIndex = ++SimChainWorkload.this.activeWorkloadIndex; + + if (doStop(workloads[activeWorkloadIndex - 1]) && activeWorkloadIndex < workloads.length) { + ctx.reset(); + + if (doStart(workloads[activeWorkloadIndex])) { + return; + } + } + + ctx.shutdown(); + } + + /** + * Start the specified workload. + * + * @return <code>true</code> if the workload started successfully, <code>false</code> otherwise. + */ + private boolean doStart(SimWorkload workload) { + try { + workload.onStart(this); + } catch (Exception cause) { + LOGGER.warn("Workload failed during onStart callback", cause); + doStop(workload); + return false; + } + + return true; + } + + /** + * Stop the specified workload. + * + * @return <code>true</code> if the workload stopped successfully, <code>false</code> otherwise. + */ + private boolean doStop(SimWorkload workload) { + try { + workload.onStop(this); + } catch (Exception cause) { + LOGGER.warn("Workload failed during onStop callback", cause); + return false; + } + + return true; + } + } +} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimFlopsWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimFlopsWorkload.java index 255fd1b2..f3efbebb 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimFlopsWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimFlopsWorkload.java @@ -49,9 +49,9 @@ public class SimFlopsWorkload implements SimWorkload, FlowStageLogic { * Construct a new {@link SimFlopsWorkload}. * * @param flops The number of floating point operations to perform for this task in MFLOPs. - * @param utilization A model of the CPU utilization of the application. + * @param utilization The CPU utilization of the workload. */ - public SimFlopsWorkload(long flops, double utilization) { + SimFlopsWorkload(long flops, double utilization) { if (flops < 0) { throw new IllegalArgumentException("Number of FLOPs must be positive"); } else if (utilization <= 0.0 || utilization > 1.0) { diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimRuntimeWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimRuntimeWorkload.java index c3380b31..194efafd 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimRuntimeWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimRuntimeWorkload.java @@ -48,9 +48,9 @@ public class SimRuntimeWorkload implements SimWorkload, FlowStageLogic { * Construct a new {@link SimRuntimeWorkload}. * * @param duration The duration of the workload in milliseconds. - * @param utilization A model of the CPU utilization of the application. + * @param utilization The CPU utilization of the workload. */ - public SimRuntimeWorkload(long duration, double utilization) { + SimRuntimeWorkload(long duration, double utilization) { if (duration < 0) { throw new IllegalArgumentException("Duration must be positive"); } else if (utilization <= 0.0 || utilization > 1.0) { diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkloads.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkloads.java new file mode 100644 index 00000000..82557d06 --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkloads.java @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.workload; + +import java.time.Duration; + +/** + * Helper methods for constructing {@link SimWorkload}s. + */ +public class SimWorkloads { + private SimWorkloads() {} + + /** + * Create a {@link SimWorkload} that executes a specified number of floating point operations (FLOPs) at the given + * utilization. + * + * @param flops The number of floating point operations to perform for this task in MFLOPs. + * @param utilization The CPU utilization of the workload. + */ + public static SimWorkload flops(long flops, double utilization) { + return new SimFlopsWorkload(flops, utilization); + } + + /** + * Create a {@link SimWorkload} that consumes the CPU resources for a specified duration at the given utilization. + * + * @param duration The duration of the workload in milliseconds. + * @param utilization The CPU utilization of the workload. + */ + public static SimWorkload runtime(long duration, double utilization) { + return new SimRuntimeWorkload(duration, utilization); + } + + /** + * Create a {@link SimWorkload} that consumes the CPU resources for a specified duration at the given utilization. + * + * @param duration The duration of the workload. + * @param utilization The CPU utilization of the workload. + */ + public static SimWorkload runtime(Duration duration, double utilization) { + return runtime(duration.toMillis(), utilization); + } + + /** + * Chain the specified <code>workloads</code> into a single {@link SimWorkload}. + */ + public static SimWorkload chain(SimWorkload... workloads) { + return new SimChainWorkload(workloads); + } +} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt index c23f48dc..b354caff 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt @@ -39,31 +39,8 @@ public suspend fun SimMachine.runWorkload(workload: SimWorkload, meta: Map<Strin return suspendCancellableCoroutine { cont -> cont.invokeOnCancellation { this@runWorkload.cancel() } - startWorkload( - object : SimWorkload { - override fun onStart(ctx: SimMachineContext) { - try { - workload.onStart(ctx) - } catch (cause: Throwable) { - cont.resumeWithException(cause) - throw cause - } - } - - override fun onStop(ctx: SimMachineContext) { - try { - workload.onStop(ctx) - - if (!cont.isCompleted) { - cont.resume(Unit) - } - } catch (cause: Throwable) { - cont.resumeWithException(cause) - throw cause - } - } - }, - meta - ) + startWorkload(workload, meta) { cause -> + if (cause != null) cont.resumeWithException(cause) else cont.resume(Unit) + } } } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt index f0aae15b..266839bd 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt @@ -40,9 +40,9 @@ import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.model.StorageDevice import org.opendc.simulator.compute.power.CpuPowerModels -import org.opendc.simulator.compute.workload.SimFlopsWorkload import org.opendc.simulator.compute.workload.SimTrace import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.simulator.compute.workload.SimWorkloads import org.opendc.simulator.flow2.FlowEngine import org.opendc.simulator.flow2.source.SimpleFlowSource import org.opendc.simulator.kotlin.runSimulation @@ -78,7 +78,7 @@ class SimMachineTest { machineModel ) - machine.runWorkload(SimFlopsWorkload(2_000, /*utilization*/ 1.0)) + machine.runWorkload(SimWorkloads.flops(2_000, /*utilization*/ 1.0)) // Two cores execute 1000 MFlOps per second (1000 ms) assertEquals(1000, clock.millis()) @@ -123,7 +123,7 @@ class SimMachineTest { machineModel ) - machine.runWorkload(SimFlopsWorkload(2_000, /*utilization*/ 1.0)) + machine.runWorkload(SimWorkloads.flops(2_000, /*utilization*/ 1.0)) // Two sockets with two cores execute 2000 MFlOps per second (500 ms) assertEquals(500, clock.millis()) @@ -142,7 +142,7 @@ class SimMachineTest { source.connect(machine.psu) coroutineScope { - launch { machine.runWorkload(SimFlopsWorkload(2_000, /*utilization*/ 1.0)) } + launch { machine.runWorkload(SimWorkloads.flops(2_000, /*utilization*/ 1.0)) } yield() assertAll( @@ -304,7 +304,7 @@ class SimMachineTest { try { coroutineScope { - launch { machine.runWorkload(SimFlopsWorkload(2_000, /*utilization*/ 1.0)) } + launch { machine.runWorkload(SimWorkloads.flops(2_000, /*utilization*/ 1.0)) } cancel() } } catch (_: CancellationException) { @@ -326,11 +326,11 @@ class SimMachineTest { coroutineScope { launch { - machine.runWorkload(SimFlopsWorkload(2_000, /*utilization*/ 1.0)) + machine.runWorkload(SimWorkloads.flops(2_000, /*utilization*/ 1.0)) } assertThrows<IllegalStateException> { - machine.runWorkload(SimFlopsWorkload(2_000, /*utilization*/ 1.0)) + machine.runWorkload(SimWorkloads.flops(2_000, /*utilization*/ 1.0)) } } } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt index ba5a5c68..d11b91ee 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt @@ -38,10 +38,9 @@ import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.runWorkload -import org.opendc.simulator.compute.workload.SimFlopsWorkload -import org.opendc.simulator.compute.workload.SimRuntimeWorkload import org.opendc.simulator.compute.workload.SimTrace import org.opendc.simulator.compute.workload.SimTraceFragment +import org.opendc.simulator.compute.workload.SimWorkloads import org.opendc.simulator.flow2.FlowEngine import org.opendc.simulator.flow2.mux.FlowMultiplexerFactory import org.opendc.simulator.kotlin.runSimulation @@ -99,7 +98,7 @@ internal class SimSpaceSharedHypervisorTest { @Test fun testRuntimeWorkload() = runSimulation { val duration = 5 * 60L * 1000 - val workload = SimRuntimeWorkload(duration, 1.0) + val workload = SimWorkloads.runtime(duration, 1.0) val engine = FlowEngine.create(coroutineContext, clock) val graph = engine.newGraph() @@ -123,7 +122,7 @@ internal class SimSpaceSharedHypervisorTest { @Test fun testFlopsWorkload() = runSimulation { val duration = 5 * 60L * 1000 - val workload = SimFlopsWorkload((duration * 3.2).toLong(), 1.0) + val workload = SimWorkloads.flops((duration * 3.2).toLong(), 1.0) val engine = FlowEngine.create(coroutineContext, clock) val graph = engine.newGraph() @@ -155,13 +154,13 @@ internal class SimSpaceSharedHypervisorTest { yield() val vm = hypervisor.newMachine(machineModel) - vm.runWorkload(SimRuntimeWorkload(duration, 1.0)) + vm.runWorkload(SimWorkloads.runtime(duration, 1.0)) hypervisor.removeMachine(vm) yield() val vm2 = hypervisor.newMachine(machineModel) - vm2.runWorkload(SimRuntimeWorkload(duration, 1.0)) + vm2.runWorkload(SimWorkloads.runtime(duration, 1.0)) hypervisor.removeMachine(vm2) machine.cancel() @@ -184,7 +183,7 @@ internal class SimSpaceSharedHypervisorTest { yield() val vm = hypervisor.newMachine(machineModel) - launch { vm.runWorkload(SimFlopsWorkload(10_000, 1.0)) } + launch { vm.runWorkload(SimWorkloads.runtime(10_000, 1.0)) } yield() assertAll( diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimChainWorkloadTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimChainWorkloadTest.kt new file mode 100644 index 00000000..6bf05f65 --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimChainWorkloadTest.kt @@ -0,0 +1,176 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.workload + +import io.mockk.every +import io.mockk.mockk +import io.mockk.spyk +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.opendc.simulator.compute.SimBareMetalMachine +import org.opendc.simulator.compute.model.MachineModel +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.runWorkload +import org.opendc.simulator.flow2.FlowEngine +import org.opendc.simulator.kotlin.runSimulation + +/** + * Test suite for the [SimChainWorkload] class. + */ +class SimChainWorkloadTest { + private lateinit var machineModel: MachineModel + + @BeforeEach + fun setUp() { + val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) + + machineModel = MachineModel( + /*cpus*/ List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 1000.0) }, + /*memory*/ List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } + ) + } + + @Test + fun testMultipleWorkloads() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val machine = SimBareMetalMachine.create( + graph, + machineModel + ) + + val workload = + SimWorkloads.chain( + SimRuntimeWorkload(1000, 1.0), + SimRuntimeWorkload(1000, 1.0) + ) + + machine.runWorkload(workload) + + assertEquals(2000, clock.millis()) + } + + @Test + fun testStartFailure() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val machine = SimBareMetalMachine.create( + graph, + machineModel + ) + + val workloadA = mockk<SimWorkload>() + every { workloadA.onStart(any()) } throws IllegalStateException("Staged") + every { workloadA.onStop(any()) } returns Unit + + val workload = + SimWorkloads.chain( + workloadA, + SimRuntimeWorkload(1000, 1.0) + ) + + machine.runWorkload(workload) + + assertEquals(0, clock.millis()) + } + + @Test + fun testStartFailureSecond() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val machine = SimBareMetalMachine.create( + graph, + machineModel + ) + + val workloadA = mockk<SimWorkload>() + every { workloadA.onStart(any()) } throws IllegalStateException("Staged") + every { workloadA.onStop(any()) } returns Unit + + val workload = + SimWorkloads.chain( + SimRuntimeWorkload(1000, 1.0), + workloadA, + SimRuntimeWorkload(1000, 1.0) + ) + + machine.runWorkload(workload) + + assertEquals(1000, clock.millis()) + } + + @Test + fun testStopFailure() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val machine = SimBareMetalMachine.create( + graph, + machineModel + ) + + val workloadA = spyk<SimWorkload>(SimRuntimeWorkload(1000, 1.0)) + every { workloadA.onStop(any()) } throws IllegalStateException("Staged") + + val workload = + SimWorkloads.chain( + workloadA, + SimRuntimeWorkload(1000, 1.0) + ) + + machine.runWorkload(workload) + + assertEquals(1000, clock.millis()) + } + + @Test + fun testStopFailureSecond() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val machine = SimBareMetalMachine.create( + graph, + machineModel + ) + + val workloadA = spyk<SimWorkload>(SimRuntimeWorkload(1000, 1.0)) + every { workloadA.onStop(any()) } throws IllegalStateException("Staged") + + val workload = + SimWorkloads.chain( + SimRuntimeWorkload(1000, 1.0), + workloadA, + SimRuntimeWorkload(1000, 1.0) + ) + + machine.runWorkload(workload) + + assertEquals(2000, clock.millis()) + } +} diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt index c7123000..b165418a 100644 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt @@ -119,7 +119,7 @@ internal class WorkflowServiceTest { }, { assertEquals(0, metrics.tasksRunning, "Not all started tasks finished") }, { assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") }, - { assertEquals(46102707L, clock.millis()) { "Total duration incorrect" } } + { assertEquals(45977707L, clock.millis()) { "Total duration incorrect" } } ) } } |
