From cdc7df3c3d398a7af15014b4c0f6cd495c05fcce Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 28 Oct 2022 11:44:58 +0200 Subject: build: Store method parameters in class files This change updates the build configuration to enable Java to emit method parameter information in the class files. This provides more useful error messages when not enough parameters are given. --- buildSrc/src/main/kotlin/java-conventions.gradle.kts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/buildSrc/src/main/kotlin/java-conventions.gradle.kts b/buildSrc/src/main/kotlin/java-conventions.gradle.kts index a639a9e1..8857d4ab 100644 --- a/buildSrc/src/main/kotlin/java-conventions.gradle.kts +++ b/buildSrc/src/main/kotlin/java-conventions.gradle.kts @@ -34,3 +34,7 @@ java { sourceCompatibility = Libs.jvmTarget targetCompatibility = Libs.jvmTarget } + +tasks.withType { + options.compilerArgs.add("-parameters") +} -- cgit v1.2.3 From 3528091684f610d80fcebb5b730d3a201e79a99a Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 28 Oct 2022 11:48:59 +0200 Subject: feat(sim/compute): Add completion parameter to startWorkload This change updates the interface of `SimMachine#startWorkload` to introduce a parameter `completion` that is invoked when the workload completes either succesfully or due to failure. This functionality has often been implemented by wrapping a `SimWorkload` and catching its exceptions. However, since this functionality is used in all usages of `SimMachine#startWorkload` we instead embed it into `SimMachine` itself. --- .../kotlin/org/opendc/compute/simulator/SimHost.kt | 31 ++++--------------- .../simulator/compute/SimAbstractMachine.java | 36 +++++++++++++++++----- .../simulator/compute/SimBareMetalMachine.java | 16 +++++++--- .../org/opendc/simulator/compute/SimMachine.java | 4 ++- .../simulator/compute/kernel/SimHypervisor.java | 19 +++++++++--- .../org/opendc/simulator/compute/Coroutines.kt | 29 ++--------------- 6 files changed, 66 insertions(+), 69 deletions(-) diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index c07649bd..d07c50bc 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -43,7 +43,6 @@ import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.flow2.FlowGraph import java.time.Duration import java.time.Instant @@ -284,30 +283,12 @@ public class SimHost( check(_ctx == null) { "Concurrent hypervisor running" } // Launch hypervisor onto machine - _ctx = machine.startWorkload( - object : SimWorkload { - override fun onStart(ctx: SimMachineContext) { - try { - _bootTime = clock.instant() - _state = HostState.UP - hypervisor.onStart(ctx) - } catch (cause: Throwable) { - _state = HostState.ERROR - _ctx = null - throw cause - } - } - - override fun onStop(ctx: SimMachineContext) { - try { - hypervisor.onStop(ctx) - } finally { - _ctx = null - } - } - }, - emptyMap() - ) + _bootTime = clock.instant() + _state = HostState.UP + _ctx = machine.startWorkload(hypervisor, emptyMap()) { cause -> + _state = if (cause != null) HostState.ERROR else HostState.DOWN + _ctx = null + } } /** diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimAbstractMachine.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimAbstractMachine.java index cf5aed03..d90a7d6f 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimAbstractMachine.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimAbstractMachine.java @@ -25,6 +25,7 @@ package org.opendc.simulator.compute; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import org.opendc.simulator.compute.device.SimNetworkAdapter; import org.opendc.simulator.compute.model.MachineModel; import org.opendc.simulator.compute.model.MemoryUnit; @@ -62,12 +63,13 @@ public abstract class SimAbstractMachine implements SimMachine { } @Override - public final SimMachineContext startWorkload(SimWorkload workload, Map meta) { + public final SimMachineContext startWorkload( + SimWorkload workload, Map meta, Consumer completion) { if (activeContext != null) { throw new IllegalStateException("A machine cannot run multiple workloads concurrently"); } - final Context ctx = createContext(workload, new HashMap<>(meta)); + final Context ctx = createContext(workload, new HashMap<>(meta), completion); ctx.start(); return ctx; } @@ -83,10 +85,12 @@ public abstract class SimAbstractMachine implements SimMachine { /** * Construct a new {@link Context} instance representing the active execution. * - * @param workload The workload to start on the machine. - * @param meta The metadata to pass to the workload. + * @param workload The workload to start on the machine. + * @param meta The metadata to pass to the workload. + * @param completion A block that is invoked when the workload completes carrying an exception if thrown by the workload. */ - protected abstract Context createContext(SimWorkload workload, Map meta); + protected abstract Context createContext( + SimWorkload workload, Map meta, Consumer completion); /** * Return the active {@link Context} instance (if any). @@ -102,7 +106,9 @@ public abstract class SimAbstractMachine implements SimMachine { private final SimAbstractMachine machine; private final SimWorkload workload; private final Map meta; + private final Consumer completion; private boolean isClosed; + private Exception cause; /** * Construct a new {@link Context} instance. @@ -110,11 +116,17 @@ public abstract class SimAbstractMachine implements SimMachine { * @param machine The {@link SimAbstractMachine} to which the context belongs. * @param workload The {@link SimWorkload} to which the context belongs. * @param meta The metadata passed to the context. + * @param completion A block that is invoked when the workload completes carrying an exception if thrown by the workload. */ - public Context(SimAbstractMachine machine, SimWorkload workload, Map meta) { + public Context( + SimAbstractMachine machine, + SimWorkload workload, + Map meta, + Consumer completion) { this.machine = machine; this.workload = workload; this.meta = meta; + this.completion = completion; } @Override @@ -136,11 +148,19 @@ public abstract class SimAbstractMachine implements SimMachine { // Cancel all the resources associated with the machine doCancel(); + Exception e = this.cause; + try { workload.onStop(this); } catch (Exception cause) { - LOGGER.warn("Workload failed during onStop callback", cause); + if (e != null) { + e.addSuppressed(cause); + } else { + e = cause; + } } + + completion.accept(e); } /** @@ -151,7 +171,7 @@ public abstract class SimAbstractMachine implements SimMachine { machine.activeContext = this; workload.onStart(this); } catch (Exception cause) { - LOGGER.warn("Workload failed during onStart callback", cause); + this.cause = cause; shutdown(); } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimBareMetalMachine.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimBareMetalMachine.java index aa7502d6..11356eb2 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimBareMetalMachine.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimBareMetalMachine.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import org.opendc.simulator.compute.device.SimPeripheral; import org.opendc.simulator.compute.model.MachineModel; import org.opendc.simulator.compute.model.ProcessingUnit; @@ -39,7 +40,7 @@ import org.opendc.simulator.flow2.Inlet; * *

* A {@link SimBareMetalMachine} is a stateful object, and you should be careful when operating this object concurrently. For - * example, the class expects only a single concurrent call to {@link #startWorkload(SimWorkload, Map)}. + * example, the class expects only a single concurrent call to {@link #startWorkload(SimWorkload, Map, Consumer)} )}. */ public final class SimBareMetalMachine extends SimAbstractMachine { /** @@ -192,8 +193,9 @@ public final class SimBareMetalMachine extends SimAbstractMachine { } @Override - protected SimAbstractMachine.Context createContext(SimWorkload workload, Map meta) { - return new Context(this, workload, meta); + protected SimAbstractMachine.Context createContext( + SimWorkload workload, Map meta, Consumer completion) { + return new Context(this, workload, meta, completion); } /** @@ -206,8 +208,12 @@ public final class SimBareMetalMachine extends SimAbstractMachine { private final List net; private final List disk; - private Context(SimBareMetalMachine machine, SimWorkload workload, Map meta) { - super(machine, workload, meta); + private Context( + SimBareMetalMachine machine, + SimWorkload workload, + Map meta, + Consumer completion) { + super(machine, workload, meta, completion); this.graph = machine.graph; this.cpus = machine.cpus; diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimMachine.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimMachine.java index 59599875..1f86aa02 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimMachine.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimMachine.java @@ -24,6 +24,7 @@ package org.opendc.simulator.compute; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import org.opendc.simulator.compute.device.SimPeripheral; import org.opendc.simulator.compute.model.MachineModel; import org.opendc.simulator.compute.workload.SimWorkload; @@ -47,10 +48,11 @@ public interface SimMachine { * * @param workload The workload to start on the machine. * @param meta The metadata to pass to the workload. + * @param completion A block that is invoked when the workload completes carrying an exception if thrown by the workload. * @return A {@link SimMachineContext} that represents the execution context for the workload. * @throws IllegalStateException if a workload is already active on the machine or if the machine is closed. */ - SimMachineContext startWorkload(SimWorkload workload, Map meta); + SimMachineContext startWorkload(SimWorkload workload, Map meta, Consumer completion); /** * Cancel the active workload on this machine (if any). diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/kernel/SimHypervisor.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/kernel/SimHypervisor.java index 6e295837..f03a0c20 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/kernel/SimHypervisor.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/kernel/SimHypervisor.java @@ -28,6 +28,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.SplittableRandom; +import java.util.function.Consumer; import java.util.stream.Collectors; import org.opendc.simulator.compute.SimAbstractMachine; import org.opendc.simulator.compute.SimMachine; @@ -471,7 +472,8 @@ public final class SimHypervisor implements SimWorkload { } @Override - protected Context createContext(SimWorkload workload, Map meta) { + protected Context createContext( + SimWorkload workload, Map meta, Consumer completion) { if (isClosed) { throw new IllegalStateException("Virtual machine does not exist anymore"); } @@ -482,7 +484,15 @@ public final class SimHypervisor implements SimWorkload { } return new VmContext( - context, this, random, interferenceDomain, counters, SimHypervisor.this.counters, workload, meta); + context, + this, + random, + interferenceDomain, + counters, + SimHypervisor.this.counters, + workload, + meta, + completion); } @Override @@ -538,8 +548,9 @@ public final class SimHypervisor implements SimWorkload { VmCounters vmCounters, HvCounters hvCounters, SimWorkload workload, - Map meta) { - super(machine, workload, meta); + Map meta, + Consumer completion) { + super(machine, workload, meta, completion); this.context = context; this.random = random; diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt index c23f48dc..b354caff 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt @@ -39,31 +39,8 @@ public suspend fun SimMachine.runWorkload(workload: SimWorkload, meta: Map cont.invokeOnCancellation { this@runWorkload.cancel() } - startWorkload( - object : SimWorkload { - override fun onStart(ctx: SimMachineContext) { - try { - workload.onStart(ctx) - } catch (cause: Throwable) { - cont.resumeWithException(cause) - throw cause - } - } - - override fun onStop(ctx: SimMachineContext) { - try { - workload.onStop(ctx) - - if (!cont.isCompleted) { - cont.resume(Unit) - } - } catch (cause: Throwable) { - cont.resumeWithException(cause) - throw cause - } - } - }, - meta - ) + startWorkload(workload, meta) { cause -> + if (cause != null) cont.resumeWithException(cause) else cont.resume(Unit) + } } } -- cgit v1.2.3 From 788c007599ac61a41460589f65454aac1857eb81 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 28 Oct 2022 11:55:41 +0200 Subject: refactor(sim/compute): Provide workload constructors in SimWorkloads This change introduces a new class SimWorkloads which provides construction methods for the standard workloads available in OpenDC. --- .../opendc/experiments/workflow/TraceHelpers.kt | 4 +- .../opendc/faas/simulator/SimFaaSServiceTest.kt | 4 +- .../compute/workload/SimFlopsWorkload.java | 4 +- .../compute/workload/SimRuntimeWorkload.java | 4 +- .../compute/workload/SimWorkloadLifecycle.java | 60 --------------------- .../simulator/compute/workload/SimWorkloads.java | 63 ++++++++++++++++++++++ .../org/opendc/simulator/compute/SimMachineTest.kt | 14 ++--- .../compute/kernel/SimSpaceSharedHypervisorTest.kt | 13 +++-- 8 files changed, 84 insertions(+), 82 deletions(-) delete mode 100644 opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.java create mode 100644 opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkloads.java diff --git a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt index 4dc3a775..b622362a 100644 --- a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt +++ b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt @@ -27,7 +27,7 @@ package org.opendc.experiments.workflow import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.launch -import org.opendc.simulator.compute.workload.SimFlopsWorkload +import org.opendc.simulator.compute.workload.SimWorkloads import org.opendc.trace.Trace import org.opendc.trace.conv.TABLE_TASKS import org.opendc.trace.conv.TASK_ALLOC_NCPUS @@ -74,7 +74,7 @@ public fun Trace.toJobs(): List { val submitTime = reader.getInstant(TASK_SUBMIT_TIME)!! val runtime = reader.getDuration(TASK_RUNTIME)!! val flops: Long = 4000 * runtime.seconds * grantedCpus - val workload = SimFlopsWorkload(flops, 1.0) + val workload = SimWorkloads.flops(flops, 1.0) val task = Task( UUID(0L, id), "", diff --git a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt index aac54f57..6baee7ea 100644 --- a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt +++ b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt @@ -40,8 +40,8 @@ import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.compute.workload.SimRuntimeWorkload import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.simulator.compute.workload.SimWorkloads import org.opendc.simulator.kotlin.runSimulation import java.time.Duration import java.util.Random @@ -66,7 +66,7 @@ internal class SimFaaSServiceTest { @Test fun testSmoke() = runSimulation { val random = Random(0) - val workload = spyk(object : SimFaaSWorkload, SimWorkload by SimRuntimeWorkload(1000, 1.0) { + val workload = spyk(object : SimFaaSWorkload, SimWorkload by SimWorkloads.runtime(1000, 1.0) { override suspend fun invoke() { delay(random.nextInt(1000).toLong()) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimFlopsWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimFlopsWorkload.java index 255fd1b2..f3efbebb 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimFlopsWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimFlopsWorkload.java @@ -49,9 +49,9 @@ public class SimFlopsWorkload implements SimWorkload, FlowStageLogic { * Construct a new {@link SimFlopsWorkload}. * * @param flops The number of floating point operations to perform for this task in MFLOPs. - * @param utilization A model of the CPU utilization of the application. + * @param utilization The CPU utilization of the workload. */ - public SimFlopsWorkload(long flops, double utilization) { + SimFlopsWorkload(long flops, double utilization) { if (flops < 0) { throw new IllegalArgumentException("Number of FLOPs must be positive"); } else if (utilization <= 0.0 || utilization > 1.0) { diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimRuntimeWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimRuntimeWorkload.java index c3380b31..194efafd 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimRuntimeWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimRuntimeWorkload.java @@ -48,9 +48,9 @@ public class SimRuntimeWorkload implements SimWorkload, FlowStageLogic { * Construct a new {@link SimRuntimeWorkload}. * * @param duration The duration of the workload in milliseconds. - * @param utilization A model of the CPU utilization of the application. + * @param utilization The CPU utilization of the workload. */ - public SimRuntimeWorkload(long duration, double utilization) { + SimRuntimeWorkload(long duration, double utilization) { if (duration < 0) { throw new IllegalArgumentException("Duration must be positive"); } else if (utilization <= 0.0 || utilization > 1.0) { diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.java deleted file mode 100644 index f0e2561f..00000000 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute.workload; - -import java.util.HashSet; -import org.opendc.simulator.compute.SimMachineContext; - -/** - * A helper class to manage the lifecycle of a {@link SimWorkload}. - */ -public final class SimWorkloadLifecycle { - private final SimMachineContext ctx; - private final HashSet waiting = new HashSet<>(); - - /** - * Construct a {@link SimWorkloadLifecycle} instance. - * - * @param ctx The {@link SimMachineContext} of the workload. - */ - public SimWorkloadLifecycle(SimMachineContext ctx) { - this.ctx = ctx; - } - - /** - * Register a "completer" callback that must be invoked before ending the lifecycle of the workload. - */ - public Runnable newCompleter() { - Runnable completer = new Runnable() { - @Override - public void run() { - final HashSet waiting = SimWorkloadLifecycle.this.waiting; - if (waiting.remove(this) && waiting.isEmpty()) { - ctx.shutdown(); - } - } - }; - waiting.add(completer); - return completer; - } -} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkloads.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkloads.java new file mode 100644 index 00000000..8655fd0a --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkloads.java @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.workload; + +import java.time.Duration; + +/** + * Helper methods for constructing {@link SimWorkload}s. + */ +public class SimWorkloads { + private SimWorkloads() {} + + /** + * Create a {@link SimWorkload} that executes a specified number of floating point operations (FLOPs) at the given + * utilization. + * + * @param flops The number of floating point operations to perform for this task in MFLOPs. + * @param utilization The CPU utilization of the workload. + */ + public static SimWorkload flops(long flops, double utilization) { + return new SimFlopsWorkload(flops, utilization); + } + + /** + * Create a {@link SimWorkload} that consumes the CPU resources for a specified duration at the given utilization. + * + * @param duration The duration of the workload in milliseconds. + * @param utilization The CPU utilization of the workload. + */ + public static SimWorkload runtime(long duration, double utilization) { + return new SimRuntimeWorkload(duration, utilization); + } + + /** + * Create a {@link SimWorkload} that consumes the CPU resources for a specified duration at the given utilization. + * + * @param duration The duration of the workload. + * @param utilization The CPU utilization of the workload. + */ + public static SimWorkload runtime(Duration duration, double utilization) { + return runtime(duration.toMillis(), utilization); + } +} diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt index f0aae15b..266839bd 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt @@ -40,9 +40,9 @@ import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.model.StorageDevice import org.opendc.simulator.compute.power.CpuPowerModels -import org.opendc.simulator.compute.workload.SimFlopsWorkload import org.opendc.simulator.compute.workload.SimTrace import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.simulator.compute.workload.SimWorkloads import org.opendc.simulator.flow2.FlowEngine import org.opendc.simulator.flow2.source.SimpleFlowSource import org.opendc.simulator.kotlin.runSimulation @@ -78,7 +78,7 @@ class SimMachineTest { machineModel ) - machine.runWorkload(SimFlopsWorkload(2_000, /*utilization*/ 1.0)) + machine.runWorkload(SimWorkloads.flops(2_000, /*utilization*/ 1.0)) // Two cores execute 1000 MFlOps per second (1000 ms) assertEquals(1000, clock.millis()) @@ -123,7 +123,7 @@ class SimMachineTest { machineModel ) - machine.runWorkload(SimFlopsWorkload(2_000, /*utilization*/ 1.0)) + machine.runWorkload(SimWorkloads.flops(2_000, /*utilization*/ 1.0)) // Two sockets with two cores execute 2000 MFlOps per second (500 ms) assertEquals(500, clock.millis()) @@ -142,7 +142,7 @@ class SimMachineTest { source.connect(machine.psu) coroutineScope { - launch { machine.runWorkload(SimFlopsWorkload(2_000, /*utilization*/ 1.0)) } + launch { machine.runWorkload(SimWorkloads.flops(2_000, /*utilization*/ 1.0)) } yield() assertAll( @@ -304,7 +304,7 @@ class SimMachineTest { try { coroutineScope { - launch { machine.runWorkload(SimFlopsWorkload(2_000, /*utilization*/ 1.0)) } + launch { machine.runWorkload(SimWorkloads.flops(2_000, /*utilization*/ 1.0)) } cancel() } } catch (_: CancellationException) { @@ -326,11 +326,11 @@ class SimMachineTest { coroutineScope { launch { - machine.runWorkload(SimFlopsWorkload(2_000, /*utilization*/ 1.0)) + machine.runWorkload(SimWorkloads.flops(2_000, /*utilization*/ 1.0)) } assertThrows { - machine.runWorkload(SimFlopsWorkload(2_000, /*utilization*/ 1.0)) + machine.runWorkload(SimWorkloads.flops(2_000, /*utilization*/ 1.0)) } } } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt index ba5a5c68..d11b91ee 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt @@ -38,10 +38,9 @@ import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.runWorkload -import org.opendc.simulator.compute.workload.SimFlopsWorkload -import org.opendc.simulator.compute.workload.SimRuntimeWorkload import org.opendc.simulator.compute.workload.SimTrace import org.opendc.simulator.compute.workload.SimTraceFragment +import org.opendc.simulator.compute.workload.SimWorkloads import org.opendc.simulator.flow2.FlowEngine import org.opendc.simulator.flow2.mux.FlowMultiplexerFactory import org.opendc.simulator.kotlin.runSimulation @@ -99,7 +98,7 @@ internal class SimSpaceSharedHypervisorTest { @Test fun testRuntimeWorkload() = runSimulation { val duration = 5 * 60L * 1000 - val workload = SimRuntimeWorkload(duration, 1.0) + val workload = SimWorkloads.runtime(duration, 1.0) val engine = FlowEngine.create(coroutineContext, clock) val graph = engine.newGraph() @@ -123,7 +122,7 @@ internal class SimSpaceSharedHypervisorTest { @Test fun testFlopsWorkload() = runSimulation { val duration = 5 * 60L * 1000 - val workload = SimFlopsWorkload((duration * 3.2).toLong(), 1.0) + val workload = SimWorkloads.flops((duration * 3.2).toLong(), 1.0) val engine = FlowEngine.create(coroutineContext, clock) val graph = engine.newGraph() @@ -155,13 +154,13 @@ internal class SimSpaceSharedHypervisorTest { yield() val vm = hypervisor.newMachine(machineModel) - vm.runWorkload(SimRuntimeWorkload(duration, 1.0)) + vm.runWorkload(SimWorkloads.runtime(duration, 1.0)) hypervisor.removeMachine(vm) yield() val vm2 = hypervisor.newMachine(machineModel) - vm2.runWorkload(SimRuntimeWorkload(duration, 1.0)) + vm2.runWorkload(SimWorkloads.runtime(duration, 1.0)) hypervisor.removeMachine(vm2) machine.cancel() @@ -184,7 +183,7 @@ internal class SimSpaceSharedHypervisorTest { yield() val vm = hypervisor.newMachine(machineModel) - launch { vm.runWorkload(SimFlopsWorkload(10_000, 1.0)) } + launch { vm.runWorkload(SimWorkloads.runtime(10_000, 1.0)) } yield() assertAll( -- cgit v1.2.3 From 801ef92fb4d65e301e2488528f1cadd8b4ab6fdd Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 28 Oct 2022 11:56:49 +0200 Subject: feat(sim/compute): Add support for resetting machine context This change updates the interface of `SimMachineContext` to allow workloads to reset all resources provided by the machine to the workload. This allows us to implement a `SimWorkload` that can compose multiple workloads. --- .../simulator/compute/SimAbstractMachine.java | 41 ++++++++++++---------- .../simulator/compute/SimMachineContext.java | 5 +++ 2 files changed, 28 insertions(+), 18 deletions(-) diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimAbstractMachine.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimAbstractMachine.java index d90a7d6f..d968d884 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimAbstractMachine.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimAbstractMachine.java @@ -134,6 +134,28 @@ public abstract class SimAbstractMachine implements SimMachine { return meta; } + @Override + public void reset() { + final FlowGraph graph = getMemory().getInput().getGraph(); + + for (SimProcessingUnit cpu : getCpus()) { + final Inlet inlet = cpu.getInput(); + graph.disconnect(inlet); + } + + graph.disconnect(getMemory().getInput()); + + for (SimNetworkInterface ifx : getNetworkInterfaces()) { + ((NetworkAdapter) ifx).disconnect(); + } + + for (SimStorageInterface storage : getStorageInterfaces()) { + StorageDevice impl = (StorageDevice) storage; + graph.disconnect(impl.getRead()); + graph.disconnect(impl.getWrite()); + } + } + @Override public final void shutdown() { if (isClosed) { @@ -180,24 +202,7 @@ public abstract class SimAbstractMachine implements SimMachine { * Run the stop procedures for the resources associated with the machine. */ protected void doCancel() { - final FlowGraph graph = getMemory().getInput().getGraph(); - - for (SimProcessingUnit cpu : getCpus()) { - final Inlet inlet = cpu.getInput(); - graph.disconnect(inlet); - } - - graph.disconnect(getMemory().getInput()); - - for (SimNetworkInterface ifx : getNetworkInterfaces()) { - ((NetworkAdapter) ifx).disconnect(); - } - - for (SimStorageInterface storage : getStorageInterfaces()) { - StorageDevice impl = (StorageDevice) storage; - graph.disconnect(impl.getRead()); - graph.disconnect(impl.getWrite()); - } + reset(); } @Override diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimMachineContext.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimMachineContext.java index f6a3bd38..5d08e2b7 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimMachineContext.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimMachineContext.java @@ -67,6 +67,11 @@ public interface SimMachineContext { */ List getStorageInterfaces(); + /** + * Reset all resources of the machine. + */ + void reset(); + /** * Shutdown the workload. */ -- cgit v1.2.3 From eb105ae96191d99745c145e5ef0959f5e5c77fc4 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 28 Oct 2022 11:58:21 +0200 Subject: feat(sim/compute): Add support for chaining workloads This change adds a new static method `chain` to `SimWorkloads` to chain multiple workloads sequentially. --- .../compute/workload/SimChainWorkload.java | 182 +++++++++++++++++++++ .../simulator/compute/workload/SimWorkloads.java | 7 + .../compute/workload/SimChainWorkloadTest.kt | 176 ++++++++++++++++++++ 3 files changed, 365 insertions(+) create mode 100644 opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java create mode 100644 opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimChainWorkloadTest.kt diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java new file mode 100644 index 00000000..9304122a --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java @@ -0,0 +1,182 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.workload; + +import java.util.List; +import java.util.Map; +import org.opendc.simulator.compute.SimMachineContext; +import org.opendc.simulator.compute.SimMemory; +import org.opendc.simulator.compute.SimNetworkInterface; +import org.opendc.simulator.compute.SimProcessingUnit; +import org.opendc.simulator.compute.SimStorageInterface; +import org.opendc.simulator.flow2.FlowGraph; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link SimWorkload} that composes two {@link SimWorkload}s. + */ +final class SimChainWorkload implements SimWorkload { + private static final Logger LOGGER = LoggerFactory.getLogger(SimChainWorkload.class); + + private final SimWorkload[] workloads; + private int activeWorkloadIndex; + + private Context activeContext; + + /** + * Construct a {@link SimChainWorkload} instance. + * + * @param workloads The workloads to chain. + */ + SimChainWorkload(SimWorkload... workloads) { + this.workloads = workloads; + } + + @Override + public void onStart(SimMachineContext ctx) { + final SimWorkload[] workloads = this.workloads; + final int activeWorkloadIndex = this.activeWorkloadIndex; + + if (activeWorkloadIndex >= workloads.length) { + return; + } + + final Context context = new Context(ctx); + activeContext = context; + + if (!context.doStart(workloads[activeWorkloadIndex])) { + ctx.shutdown(); + } + } + + @Override + public void onStop(SimMachineContext ctx) { + final SimWorkload[] workloads = this.workloads; + final int activeWorkloadIndex = this.activeWorkloadIndex; + + if (activeWorkloadIndex >= workloads.length) { + return; + } + + final Context context = activeContext; + activeContext = null; + + context.doStop(workloads[activeWorkloadIndex]); + } + + /** + * A {@link SimMachineContext} that intercepts the shutdown calls. + */ + private class Context implements SimMachineContext { + private final SimMachineContext ctx; + + private Context(SimMachineContext ctx) { + this.ctx = ctx; + } + + @Override + public FlowGraph getGraph() { + return ctx.getGraph(); + } + + @Override + public Map getMeta() { + return ctx.getMeta(); + } + + @Override + public List getCpus() { + return ctx.getCpus(); + } + + @Override + public SimMemory getMemory() { + return ctx.getMemory(); + } + + @Override + public List getNetworkInterfaces() { + return ctx.getNetworkInterfaces(); + } + + @Override + public List getStorageInterfaces() { + return ctx.getStorageInterfaces(); + } + + @Override + public void reset() { + ctx.reset(); + } + + @Override + public void shutdown() { + final SimWorkload[] workloads = SimChainWorkload.this.workloads; + final int activeWorkloadIndex = ++SimChainWorkload.this.activeWorkloadIndex; + + if (doStop(workloads[activeWorkloadIndex - 1]) && activeWorkloadIndex < workloads.length) { + ctx.reset(); + + if (doStart(workloads[activeWorkloadIndex])) { + return; + } + } + + ctx.shutdown(); + } + + /** + * Start the specified workload. + * + * @return true if the workload started successfully, false otherwise. + */ + private boolean doStart(SimWorkload workload) { + try { + workload.onStart(this); + } catch (Exception cause) { + LOGGER.warn("Workload failed during onStart callback", cause); + doStop(workload); + return false; + } + + return true; + } + + /** + * Stop the specified workload. + * + * @return true if the workload stopped successfully, false otherwise. + */ + private boolean doStop(SimWorkload workload) { + try { + workload.onStop(this); + } catch (Exception cause) { + LOGGER.warn("Workload failed during onStop callback", cause); + return false; + } + + return true; + } + } +} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkloads.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkloads.java index 8655fd0a..82557d06 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkloads.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkloads.java @@ -60,4 +60,11 @@ public class SimWorkloads { public static SimWorkload runtime(Duration duration, double utilization) { return runtime(duration.toMillis(), utilization); } + + /** + * Chain the specified workloads into a single {@link SimWorkload}. + */ + public static SimWorkload chain(SimWorkload... workloads) { + return new SimChainWorkload(workloads); + } } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimChainWorkloadTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimChainWorkloadTest.kt new file mode 100644 index 00000000..6bf05f65 --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimChainWorkloadTest.kt @@ -0,0 +1,176 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.workload + +import io.mockk.every +import io.mockk.mockk +import io.mockk.spyk +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.opendc.simulator.compute.SimBareMetalMachine +import org.opendc.simulator.compute.model.MachineModel +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.runWorkload +import org.opendc.simulator.flow2.FlowEngine +import org.opendc.simulator.kotlin.runSimulation + +/** + * Test suite for the [SimChainWorkload] class. + */ +class SimChainWorkloadTest { + private lateinit var machineModel: MachineModel + + @BeforeEach + fun setUp() { + val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) + + machineModel = MachineModel( + /*cpus*/ List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 1000.0) }, + /*memory*/ List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } + ) + } + + @Test + fun testMultipleWorkloads() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val machine = SimBareMetalMachine.create( + graph, + machineModel + ) + + val workload = + SimWorkloads.chain( + SimRuntimeWorkload(1000, 1.0), + SimRuntimeWorkload(1000, 1.0) + ) + + machine.runWorkload(workload) + + assertEquals(2000, clock.millis()) + } + + @Test + fun testStartFailure() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val machine = SimBareMetalMachine.create( + graph, + machineModel + ) + + val workloadA = mockk() + every { workloadA.onStart(any()) } throws IllegalStateException("Staged") + every { workloadA.onStop(any()) } returns Unit + + val workload = + SimWorkloads.chain( + workloadA, + SimRuntimeWorkload(1000, 1.0) + ) + + machine.runWorkload(workload) + + assertEquals(0, clock.millis()) + } + + @Test + fun testStartFailureSecond() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val machine = SimBareMetalMachine.create( + graph, + machineModel + ) + + val workloadA = mockk() + every { workloadA.onStart(any()) } throws IllegalStateException("Staged") + every { workloadA.onStop(any()) } returns Unit + + val workload = + SimWorkloads.chain( + SimRuntimeWorkload(1000, 1.0), + workloadA, + SimRuntimeWorkload(1000, 1.0) + ) + + machine.runWorkload(workload) + + assertEquals(1000, clock.millis()) + } + + @Test + fun testStopFailure() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val machine = SimBareMetalMachine.create( + graph, + machineModel + ) + + val workloadA = spyk(SimRuntimeWorkload(1000, 1.0)) + every { workloadA.onStop(any()) } throws IllegalStateException("Staged") + + val workload = + SimWorkloads.chain( + workloadA, + SimRuntimeWorkload(1000, 1.0) + ) + + machine.runWorkload(workload) + + assertEquals(1000, clock.millis()) + } + + @Test + fun testStopFailureSecond() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val machine = SimBareMetalMachine.create( + graph, + machineModel + ) + + val workloadA = spyk(SimRuntimeWorkload(1000, 1.0)) + every { workloadA.onStop(any()) } throws IllegalStateException("Staged") + + val workload = + SimWorkloads.chain( + SimRuntimeWorkload(1000, 1.0), + workloadA, + SimRuntimeWorkload(1000, 1.0) + ) + + machine.runWorkload(workload) + + assertEquals(2000, clock.millis()) + } +} -- cgit v1.2.3 From d5aed4b1e6e5548728c5978e3b46d1472b62e791 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 17 Oct 2022 22:03:02 +0200 Subject: refactor(compute/sim): Use workload chaining for boot delay This change updates the implementation of `SimHost` to use workload chaining for modelling boot delays. Previously, this was implemented by sleeping 1 millisecond using Kotlin coroutines. With this change, we remove the need for coroutines and instead use the `SimDurationWorkload` to model the boot delay. In the future, we envision a user-supplied stochastic boot model to model the boot delay for VM instances. --- .../kotlin/org/opendc/compute/simulator/SimHost.kt | 10 ++-- .../opendc/compute/simulator/SimWorkloadMapper.kt | 1 + .../simulator/internal/DefaultWorkloadMapper.kt | 44 ++++++++++++++ .../org/opendc/compute/simulator/internal/Guest.kt | 66 +++++++------------- .../org/opendc/compute/simulator/SimHostTest.kt | 70 +++++++++++++++++++++- .../experiments/capelin/CapelinIntegrationTest.kt | 22 +++---- .../experiments/compute/HostsProvisioningStep.kt | 1 - .../opendc/workflow/service/WorkflowServiceTest.kt | 2 +- 8 files changed, 149 insertions(+), 67 deletions(-) create mode 100644 opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/DefaultWorkloadMapper.kt diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index d07c50bc..660c1ccc 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -34,6 +34,7 @@ import org.opendc.compute.service.driver.telemetry.GuestCpuStats import org.opendc.compute.service.driver.telemetry.GuestSystemStats import org.opendc.compute.service.driver.telemetry.HostCpuStats import org.opendc.compute.service.driver.telemetry.HostSystemStats +import org.opendc.compute.simulator.internal.DefaultWorkloadMapper import org.opendc.compute.simulator.internal.Guest import org.opendc.compute.simulator.internal.GuestListener import org.opendc.simulator.compute.SimBareMetalMachine @@ -47,7 +48,6 @@ import org.opendc.simulator.flow2.FlowGraph import java.time.Duration import java.time.Instant import java.util.UUID -import kotlin.coroutines.CoroutineContext /** * A [Host] that is simulates virtual machines on a physical machine using [SimHypervisor]. @@ -56,11 +56,10 @@ public class SimHost( override val uid: UUID, override val name: String, override val meta: Map, - private val context: CoroutineContext, graph: FlowGraph, private val machine: SimBareMetalMachine, private val hypervisor: SimHypervisor, - private val mapper: SimWorkloadMapper = SimMetaWorkloadMapper(), + private val mapper: SimWorkloadMapper = DefaultWorkloadMapper, private val optimize: Boolean = false ) : Host, AutoCloseable { /** @@ -129,7 +128,6 @@ public class SimHost( val machine = hypervisor.newMachine(key.flavor.toMachineModel()) val newGuest = Guest( - context, clock, this, hypervisor, @@ -250,7 +248,7 @@ public class SimHost( override fun toString(): String = "SimHost[uid=$uid,name=$name,model=$model]" - public suspend fun fail() { + public fun fail() { reset(HostState.ERROR) for (guest in _guests) { @@ -272,7 +270,7 @@ public class SimHost( } /** - * The [Job] that represents the machine running the hypervisor. + * The [SimMachineContext] that represents the machine running the hypervisor. */ private var _ctx: SimMachineContext? = null diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimWorkloadMapper.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimWorkloadMapper.kt index 7082c5cf..83baa61a 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimWorkloadMapper.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimWorkloadMapper.kt @@ -22,6 +22,7 @@ package org.opendc.compute.simulator +import org.opendc.compute.api.Image import org.opendc.compute.api.Server import org.opendc.simulator.compute.workload.SimWorkload diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/DefaultWorkloadMapper.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/DefaultWorkloadMapper.kt new file mode 100644 index 00000000..c5293a8d --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/DefaultWorkloadMapper.kt @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.internal + +import org.opendc.compute.api.Server +import org.opendc.compute.simulator.SimMetaWorkloadMapper +import org.opendc.compute.simulator.SimWorkloadMapper +import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.simulator.compute.workload.SimWorkloads +import java.time.Duration + +/** + * A [SimWorkloadMapper] to introduces a boot delay of 1 ms. This object exists to retain the old behavior while + * introducing the possibility of adding custom boot delays. + */ +internal object DefaultWorkloadMapper : SimWorkloadMapper { + private val delegate = SimMetaWorkloadMapper() + + override fun createWorkload(server: Server): SimWorkload { + val workload = delegate.createWorkload(server) + val bootWorkload = SimWorkloads.runtime(Duration.ofMillis(1), 0.8) + return SimWorkloads.chain(bootWorkload, workload) + } +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt index 790d8047..6d3a5bc7 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt @@ -22,12 +22,6 @@ package org.opendc.compute.simulator.internal -import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Job -import kotlinx.coroutines.cancel -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch import mu.KotlinLogging import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState @@ -35,20 +29,17 @@ import org.opendc.compute.service.driver.telemetry.GuestCpuStats import org.opendc.compute.service.driver.telemetry.GuestSystemStats import org.opendc.compute.simulator.SimHost import org.opendc.compute.simulator.SimWorkloadMapper +import org.opendc.simulator.compute.SimMachineContext import org.opendc.simulator.compute.kernel.SimHypervisor import org.opendc.simulator.compute.kernel.SimVirtualMachine -import org.opendc.simulator.compute.runWorkload -import org.opendc.simulator.compute.workload.SimWorkload import java.time.Clock import java.time.Duration import java.time.Instant -import kotlin.coroutines.CoroutineContext /** * A virtual machine instance that is managed by a [SimHost]. */ internal class Guest( - context: CoroutineContext, private val clock: Clock, val host: SimHost, private val hypervisor: SimHypervisor, @@ -57,11 +48,6 @@ internal class Guest( val server: Server, val machine: SimVirtualMachine ) { - /** - * The [CoroutineScope] of the guest. - */ - private val scope: CoroutineScope = CoroutineScope(context + Job()) - /** * The logger instance of this guest. */ @@ -78,7 +64,7 @@ internal class Guest( /** * Start the guest. */ - suspend fun start() { + fun start() { when (state) { ServerState.TERMINATED, ServerState.ERROR -> { logger.info { "User requested to start server ${server.uid}" } @@ -96,7 +82,7 @@ internal class Guest( /** * Stop the guest. */ - suspend fun stop() { + fun stop() { when (state) { ServerState.RUNNING -> doStop(ServerState.TERMINATED) ServerState.ERROR -> doRecover() @@ -111,12 +97,11 @@ internal class Guest( * This operation will stop the guest if it is running on the host and remove all resources associated with the * guest. */ - suspend fun delete() { + fun delete() { stop() state = ServerState.DELETED hypervisor.removeMachine(machine) - scope.cancel() } /** @@ -124,7 +109,7 @@ internal class Guest( * * This operation forcibly stops the guest and puts the server into an error state. */ - suspend fun fail() { + fun fail() { if (state != ServerState.RUNNING) { return } @@ -135,7 +120,7 @@ internal class Guest( /** * Recover the guest if it is in an error state. */ - suspend fun recover() { + fun recover() { if (state != ServerState.ERROR) { return } @@ -175,37 +160,34 @@ internal class Guest( } /** - * The [Job] representing the current active virtual machine instance or `null` if no virtual machine is active. + * The [SimMachineContext] representing the current active virtual machine instance or `null` if no virtual machine + * is active. */ - private var job: Job? = null + private var ctx: SimMachineContext? = null /** * Launch the guest on the simulated */ - private suspend fun doStart() { - assert(job == null) { "Concurrent job running" } - val workload = mapper.createWorkload(server) - - val job = scope.launch { runMachine(workload) } - this.job = job + private fun doStart() { + assert(ctx == null) { "Concurrent job running" } - state = ServerState.RUNNING onStart() - job.invokeOnCompletion { cause -> - this.job = null - onStop(if (cause != null && cause !is CancellationException) ServerState.ERROR else ServerState.TERMINATED) + val workload = mapper.createWorkload(server) + val meta = mapOf("driver" to host, "server" to server) + server.meta + ctx = machine.startWorkload(workload, meta) { cause -> + onStop(if (cause != null) ServerState.ERROR else ServerState.TERMINATED) + ctx = null } } /** * Attempt to stop the server and put it into [target] state. */ - private suspend fun doStop(target: ServerState) { - assert(job != null) { "Invalid job state" } - val job = job ?: return - job.cancel() - job.join() + private fun doStop(target: ServerState) { + assert(ctx != null) { "Invalid job state" } + val ctx = ctx ?: return + ctx.shutdown() state = target } @@ -217,14 +199,6 @@ internal class Guest( state = ServerState.TERMINATED } - /** - * Converge the process that models the virtual machine lifecycle as a coroutine. - */ - private suspend fun runMachine(workload: SimWorkload) { - delay(1) // TODO Introduce model for boot time - machine.runWorkload(workload, mapOf("driver" to host, "server" to server) + server.meta) - } - /** * This method is invoked when the guest was started on the host and has booted into a running state. */ diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index a5999bcd..02be3f28 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -69,6 +69,74 @@ internal class SimHostTest { ) } + /** + * Test a single virtual machine hosted by the hypervisor. + */ + @Test + fun testSingle() = runSimulation { + val duration = 5 * 60L + + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val machine = SimBareMetalMachine.create(graph, machineModel) + val hypervisor = SimHypervisor.create(FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1)) + + val host = SimHost( + uid = UUID.randomUUID(), + name = "test", + meta = emptyMap(), + graph, + machine, + hypervisor + ) + val vmImage = MockImage( + UUID.randomUUID(), + "", + emptyMap(), + mapOf( + "workload" to + SimTrace.ofFragments( + SimTraceFragment(0, duration * 1000, 2 * 28.0, 2), + SimTraceFragment(duration * 1000, duration * 1000, 2 * 3500.0, 2), + SimTraceFragment(duration * 2000, duration * 1000, 0.0, 2), + SimTraceFragment(duration * 3000, duration * 1000, 2 * 183.0, 2) + ).createWorkload(1) + ) + ) + + val flavor = MockFlavor(2, 0) + + coroutineScope { + launch { host.spawn(MockServer(UUID.randomUUID(), "a", flavor, vmImage)) } + + suspendCancellableCoroutine { cont -> + host.addListener(object : HostListener { + private var finished = 0 + + override fun onStateChanged(host: Host, server: Server, newState: ServerState) { + if (newState == ServerState.TERMINATED && ++finished == 1) { + cont.resume(Unit) + } + } + }) + } + } + + // Ensure last cycle is collected + delay(1000L * duration) + host.close() + + val cpuStats = host.getCpuStats() + + assertAll( + { assertEquals(639, cpuStats.activeTime, "Active time does not match") }, + { assertEquals(2360, cpuStats.idleTime, "Idle time does not match") }, + { assertEquals(56, cpuStats.stealTime, "Steal time does not match") }, + { assertEquals(1500001, clock.millis()) } + ) + } + /** * Test overcommitting of resources by the hypervisor. */ @@ -86,7 +154,6 @@ internal class SimHostTest { uid = UUID.randomUUID(), name = "test", meta = emptyMap(), - coroutineContext, graph, machine, hypervisor @@ -169,7 +236,6 @@ internal class SimHostTest { uid = UUID.randomUUID(), name = "test", meta = emptyMap(), - coroutineContext, graph, machine, hypervisor diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 47058caa..77b0d09f 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -120,9 +120,9 @@ class CapelinIntegrationTest { { assertEquals(0, monitor.serversActive, "All VMs should finish after a run") }, { assertEquals(0, monitor.attemptsFailure, "No VM should be unscheduled") }, { assertEquals(0, monitor.serversPending, "No VM should not be in the queue") }, - { assertEquals(223394204, monitor.idleTime) { "Incorrect idle time" } }, - { assertEquals(66976984, monitor.activeTime) { "Incorrect active time" } }, - { assertEquals(3160316, monitor.stealTime) { "Incorrect steal time" } }, + { assertEquals(223394101, monitor.idleTime) { "Incorrect idle time" } }, + { assertEquals(66977086, monitor.activeTime) { "Incorrect active time" } }, + { assertEquals(3160276, monitor.stealTime) { "Incorrect steal time" } }, { assertEquals(0, monitor.lostTime) { "Incorrect lost time" } }, { assertEquals(5.84093E9, monitor.energyUsage, 1E4) { "Incorrect power draw" } } ) @@ -160,8 +160,8 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(10999504, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(9741294, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(10999514, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(9741285, monitor.activeTime) { "Active time incorrect" } }, { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, { assertEquals(7.0116E8, monitor.energyUsage, 1E4) { "Incorrect power draw" } } @@ -199,10 +199,10 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(6027979, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(14712820, monitor.activeTime) { "Active time incorrect" } }, - { assertEquals(12532979, monitor.stealTime) { "Steal time incorrect" } }, - { assertEquals(445913, monitor.lostTime) { "Lost time incorrect" } } + { assertEquals(6028018, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(14712781, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(12532934, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(424267, monitor.lostTime) { "Lost time incorrect" } } ) } @@ -229,8 +229,8 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(10085103, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(8539212, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(10085111, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(8539204, monitor.activeTime) { "Active time incorrect" } }, { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, { assertEquals(2328039558, monitor.uptime) { "Uptime incorrect" } } diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt index 292be929..16a57236 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt @@ -58,7 +58,6 @@ public class HostsProvisioningStep internal constructor( spec.uid, spec.name, spec.meta, - ctx.coroutineContext, graph, machine, hypervisor, diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt index c7123000..b165418a 100644 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt @@ -119,7 +119,7 @@ internal class WorkflowServiceTest { }, { assertEquals(0, metrics.tasksRunning, "Not all started tasks finished") }, { assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") }, - { assertEquals(46102707L, clock.millis()) { "Total duration incorrect" } } + { assertEquals(45977707L, clock.millis()) { "Total duration incorrect" } } ) } } -- cgit v1.2.3 From c6f2d16a20bfac466480c0e98341b08b12fc0772 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 18 Oct 2022 11:01:37 +0200 Subject: feat(compute/sim): Model host boot time This change updates `SimHost` to support modeling the time and resource consumption it takes to boot the host. The boot procedure is modeled as a `SimWorkload`. --- .../kotlin/org/opendc/compute/simulator/SimHost.kt | 57 ++++++++++++++++------ .../org/opendc/compute/simulator/SimHostTest.kt | 6 +-- .../experiments/compute/HostsProvisioningStep.kt | 2 +- 3 files changed, 47 insertions(+), 18 deletions(-) diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index 660c1ccc..ee607066 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -44,28 +44,38 @@ import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.flow2.FlowGraph +import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.simulator.compute.workload.SimWorkloads +import java.time.Clock import java.time.Duration import java.time.Instant import java.util.UUID +import java.util.function.Supplier /** - * A [Host] that is simulates virtual machines on a physical machine using [SimHypervisor]. + * A [Host] implementation that simulates virtual machines on a physical machine using [SimHypervisor]. + * + * @param uid The unique identifier of the host. + * @param name The name of the host. + * @param meta The metadata of the host. + * @param clock The (virtual) clock used to track time. + * @param machine The [SimBareMetalMachine] on which the host runs. + * @param hypervisor The [SimHypervisor] to run on top of the machine. + * @param mapper A [SimWorkloadMapper] to map a [Server] to a [SimWorkload]. + * @param bootModel A [Supplier] providing the [SimWorkload] to execute during the boot procedure of the hypervisor. + * @param optimize A flag to indicate to optimize the machine models of the virtual machines. */ public class SimHost( override val uid: UUID, override val name: String, override val meta: Map, - graph: FlowGraph, + private val clock: Clock, private val machine: SimBareMetalMachine, private val hypervisor: SimHypervisor, private val mapper: SimWorkloadMapper = DefaultWorkloadMapper, + private val bootModel: Supplier = Supplier { null }, private val optimize: Boolean = false ) : Host, AutoCloseable { - /** - * The clock instance used by the host. - */ - private val clock = graph.engine.clock /** * The event listeners registered with this host. @@ -272,20 +282,39 @@ public class SimHost( /** * The [SimMachineContext] that represents the machine running the hypervisor. */ - private var _ctx: SimMachineContext? = null + private var ctx: SimMachineContext? = null /** * Launch the hypervisor. */ private fun launch() { - check(_ctx == null) { "Concurrent hypervisor running" } + check(ctx == null) { "Concurrent hypervisor running" } + + val bootWorkload = bootModel.get() + val hypervisor = hypervisor + val hypervisorWorkload = object : SimWorkload { + override fun onStart(ctx: SimMachineContext) { + try { + _bootTime = clock.instant() + _state = HostState.UP + hypervisor.onStart(ctx) + } catch (cause: Throwable) { + _state = HostState.ERROR + throw cause + } + } + + override fun onStop(ctx: SimMachineContext) { + hypervisor.onStop(ctx) + } + } + + val workload = if (bootWorkload != null) SimWorkloads.chain(bootWorkload, hypervisorWorkload) else hypervisorWorkload // Launch hypervisor onto machine - _bootTime = clock.instant() - _state = HostState.UP - _ctx = machine.startWorkload(hypervisor, emptyMap()) { cause -> + ctx = machine.startWorkload(workload, emptyMap()) { cause -> _state = if (cause != null) HostState.ERROR else HostState.DOWN - _ctx = null + ctx = null } } @@ -296,7 +325,7 @@ public class SimHost( updateUptime() // Stop the hypervisor - _ctx?.shutdown() + ctx?.shutdown() _state = state } diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index 02be3f28..27151422 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -86,7 +86,7 @@ internal class SimHostTest { uid = UUID.randomUUID(), name = "test", meta = emptyMap(), - graph, + clock, machine, hypervisor ) @@ -154,7 +154,7 @@ internal class SimHostTest { uid = UUID.randomUUID(), name = "test", meta = emptyMap(), - graph, + clock, machine, hypervisor ) @@ -236,7 +236,7 @@ internal class SimHostTest { uid = UUID.randomUUID(), name = "test", meta = emptyMap(), - graph, + clock, machine, hypervisor ) diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt index 16a57236..e224fb84 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt @@ -58,7 +58,7 @@ public class HostsProvisioningStep internal constructor( spec.uid, spec.name, spec.meta, - graph, + ctx.clock, machine, hypervisor, optimize = optimize -- cgit v1.2.3 From dd5bbd55fc6e25efdfe93ec16bd37c5350e04c16 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 28 Oct 2022 14:37:22 +0200 Subject: refactor(compute/service): Do not suspend on guest start This change updates the `Host` interface to remove the suspend modifiers to the start, stop, spawn, and delete methods of this interface. We now assume that the host immediately launches the guest on invocation of this method. --- .../org/opendc/compute/service/driver/Host.kt | 11 ++-- .../compute/service/internal/ComputeServiceImpl.kt | 64 +++++++++------------- .../opendc/compute/service/ComputeServiceTest.kt | 4 +- .../kotlin/org/opendc/compute/simulator/SimHost.kt | 29 ++++------ .../org/opendc/compute/simulator/internal/Guest.kt | 1 + .../org/opendc/compute/simulator/SimHostTest.kt | 37 +++++++------ 6 files changed, 65 insertions(+), 81 deletions(-) diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt index fad8757e..efcc0f2c 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt @@ -70,11 +70,8 @@ public interface Host { /** * Register the specified [instance][server] on the host. - * - * Once the method returns, the instance should be running if [start] is true or else the instance should be - * stopped. */ - public suspend fun spawn(server: Server, start: Boolean = true) + public fun spawn(server: Server) /** * Determine whether the specified [instance][server] exists on the host. @@ -86,19 +83,19 @@ public interface Host { * * @throws IllegalArgumentException if the server is not present on the host. */ - public suspend fun start(server: Server) + public fun start(server: Server) /** * Stop the server [instance][server] if it is currently running on this host. * * @throws IllegalArgumentException if the server is not present on the host. */ - public suspend fun stop(server: Server) + public fun stop(server: Server) /** * Delete the specified [instance][server] on this host and cleanup all resources associated with it. */ - public suspend fun delete(server: Server) + public fun delete(server: Server) /** * Add a [HostListener] to this host. diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index 0fe016aa..b377c3e3 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -22,11 +22,6 @@ package org.opendc.compute.service.internal -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Job -import kotlinx.coroutines.cancel -import kotlinx.coroutines.isActive -import kotlinx.coroutines.launch import mu.KotlinLogging import org.opendc.common.util.Pacer import org.opendc.compute.api.ComputeClient @@ -53,22 +48,17 @@ import kotlin.math.max /** * Internal implementation of the OpenDC Compute service. * - * @param context The [CoroutineContext] to use in the service. + * @param coroutineContext The [CoroutineContext] to use in the service. * @param clock The clock instance to use. * @param scheduler The scheduler implementation to use. * @param schedulingQuantum The interval between scheduling cycles. */ internal class ComputeServiceImpl( - private val context: CoroutineContext, + coroutineContext: CoroutineContext, private val clock: Clock, private val scheduler: ComputeScheduler, schedulingQuantum: Duration ) : ComputeService, HostListener { - /** - * The [CoroutineScope] of the service bounded by the lifecycle of the service. - */ - private val scope = CoroutineScope(context + Job()) - /** * The logger instance of this server. */ @@ -115,6 +105,9 @@ internal class ComputeServiceImpl( private val serverById = mutableMapOf() override val servers: MutableList = mutableListOf() + override val hosts: Set + get() = hostToView.keys + private var maxCores = 0 private var maxMemory = 0L private var _attemptsSuccess = 0L @@ -122,17 +115,15 @@ internal class ComputeServiceImpl( private var _attemptsError = 0L private var _serversPending = 0 private var _serversActive = 0 + private var isClosed = false /** * The [Pacer] to use for scheduling the scheduler cycles. */ - private val pacer = Pacer(scope.coroutineContext, clock, schedulingQuantum.toMillis()) { doSchedule() } - - override val hosts: Set - get() = hostToView.keys + private val pacer = Pacer(coroutineContext, clock, schedulingQuantum.toMillis()) { doSchedule() } override fun newClient(): ComputeClient { - check(scope.isActive) { "Service is already closed" } + check(!isClosed) { "Service is already closed" } return object : ComputeClient { private var isClosed: Boolean = false @@ -285,7 +276,12 @@ internal class ComputeServiceImpl( } override fun close() { - scope.cancel() + if (isClosed) { + return + } + + isClosed = true + pacer.cancel() } override fun getSchedulerStats(): SchedulerStats { @@ -379,29 +375,23 @@ internal class ComputeServiceImpl( logger.info { "Assigned server $server to host $host." } - // Speculatively update the hypervisor view information to prevent other images in the queue from - // deciding on stale values. - hv.instanceCount++ - hv.provisionedCores += server.flavor.cpuCount - hv.availableMemory -= server.flavor.memorySize // XXX Temporary hack + try { + server.host = host - scope.launch { - try { - server.host = host - host.spawn(server) - activeServers[server] = host + host.spawn(server) + host.start(server) - _serversActive++ - _attemptsSuccess++ - } catch (e: Throwable) { - logger.error(e) { "Failed to deploy VM" } + _serversActive++ + _attemptsSuccess++ - hv.instanceCount-- - hv.provisionedCores -= server.flavor.cpuCount - hv.availableMemory += server.flavor.memorySize + hv.instanceCount++ + hv.provisionedCores += server.flavor.cpuCount + hv.availableMemory -= server.flavor.memorySize - _attemptsError++ - } + activeServers[server] = host + } catch (e: Throwable) { + logger.error(e) { "Failed to deploy VM" } + _attemptsError++ } } } diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt index 73e9b3d7..c18709f3 100644 --- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt +++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt @@ -348,7 +348,7 @@ internal class ComputeServiceTest { // Start server server.start() delay(5L * 60 * 1000) - coVerify { host.spawn(capture(slot), true) } + coVerify { host.spawn(capture(slot)) } listeners.forEach { it.onStateChanged(host, slot.captured, ServerState.RUNNING) } @@ -376,7 +376,7 @@ internal class ComputeServiceTest { every { host.state } returns HostState.UP every { host.canFit(any()) } returns true every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) } - coEvery { host.spawn(any(), true) } throws IllegalStateException() + coEvery { host.spawn(any()) } throws IllegalStateException() service.addHost(host) diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index ee607066..b3e56f38 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -22,7 +22,6 @@ package org.opendc.compute.simulator -import kotlinx.coroutines.yield import org.opendc.compute.api.Flavor import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState @@ -132,8 +131,8 @@ public class SimHost( return sufficientMemory && enoughCpus && canFit } - override suspend fun spawn(server: Server, start: Boolean) { - val guest = guests.computeIfAbsent(server) { key -> + override fun spawn(server: Server) { + guests.computeIfAbsent(server) { key -> require(canFit(key)) { "Server does not fit" } val machine = hypervisor.newMachine(key.flavor.toMachineModel()) @@ -150,27 +149,23 @@ public class SimHost( _guests.add(newGuest) newGuest } - - if (start) { - guest.start() - } } override fun contains(server: Server): Boolean { return server in guests } - override suspend fun start(server: Server) { + override fun start(server: Server) { val guest = requireNotNull(guests[server]) { "Unknown server ${server.uid} at host $uid" } guest.start() } - override suspend fun stop(server: Server) { + override fun stop(server: Server) { val guest = requireNotNull(guests[server]) { "Unknown server ${server.uid} at host $uid" } guest.stop() } - override suspend fun delete(server: Server) { + override fun delete(server: Server) { val guest = guests[server] ?: return guest.delete() } @@ -266,17 +261,10 @@ public class SimHost( } } - public suspend fun recover() { + public fun recover() { updateUptime() launch() - - // Wait for the hypervisor to launch before recovering the guests - yield() - - for (guest in _guests) { - guest.recover() - } } /** @@ -298,6 +286,11 @@ public class SimHost( _bootTime = clock.instant() _state = HostState.UP hypervisor.onStart(ctx) + + // Recover the guests that were running on the hypervisor. + for (guest in _guests) { + guest.recover() + } } catch (cause: Throwable) { _state = HostState.ERROR throw cause diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt index 6d3a5bc7..c12e6fad 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt @@ -60,6 +60,7 @@ internal class Guest( * a server. */ var state: ServerState = ServerState.TERMINATED + private set /** * Start the guest. diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index 27151422..fc581d3e 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -24,7 +24,6 @@ package org.opendc.compute.simulator import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay -import kotlinx.coroutines.launch import kotlinx.coroutines.suspendCancellableCoroutine import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach @@ -107,20 +106,19 @@ internal class SimHostTest { val flavor = MockFlavor(2, 0) - coroutineScope { - launch { host.spawn(MockServer(UUID.randomUUID(), "a", flavor, vmImage)) } + suspendCancellableCoroutine { cont -> + host.addListener(object : HostListener { + private var finished = 0 - suspendCancellableCoroutine { cont -> - host.addListener(object : HostListener { - private var finished = 0 - - override fun onStateChanged(host: Host, server: Server, newState: ServerState) { - if (newState == ServerState.TERMINATED && ++finished == 1) { - cont.resume(Unit) - } + override fun onStateChanged(host: Host, server: Server, newState: ServerState) { + if (newState == ServerState.TERMINATED && ++finished == 1) { + cont.resume(Unit) } - }) - } + } + }) + val server = MockServer(UUID.randomUUID(), "a", flavor, vmImage) + host.spawn(server) + host.start(server) } // Ensure last cycle is collected @@ -190,9 +188,6 @@ internal class SimHostTest { val flavor = MockFlavor(2, 0) coroutineScope { - launch { host.spawn(MockServer(UUID.randomUUID(), "a", flavor, vmImageA)) } - launch { host.spawn(MockServer(UUID.randomUUID(), "b", flavor, vmImageB)) } - suspendCancellableCoroutine { cont -> host.addListener(object : HostListener { private var finished = 0 @@ -203,6 +198,13 @@ internal class SimHostTest { } } }) + val serverA = MockServer(UUID.randomUUID(), "a", flavor, vmImageA) + host.spawn(serverA) + val serverB = MockServer(UUID.randomUUID(), "b", flavor, vmImageB) + host.spawn(serverB) + + host.start(serverA) + host.start(serverB) } } @@ -259,12 +261,13 @@ internal class SimHostTest { coroutineScope { host.spawn(server) + host.start(server) delay(5000L) host.fail() delay(duration * 1000) host.recover() - suspendCancellableCoroutine { cont -> + suspendCancellableCoroutine { cont -> host.addListener(object : HostListener { override fun onStateChanged(host: Host, server: Server, newState: ServerState) { if (newState == ServerState.TERMINATED) { -- cgit v1.2.3 From 8bf940eb7b59b5e5e326cfc06d51bdb54393f33b Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 28 Oct 2022 14:41:50 +0200 Subject: perf(compute/sim): Use static logger field This change updates the `Guest` class implementation to use a static logger field instead of allocation a new logger for every guest. --- .../kotlin/org/opendc/compute/simulator/internal/Guest.kt | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt index c12e6fad..ca947625 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt @@ -48,11 +48,6 @@ internal class Guest( val server: Server, val machine: SimVirtualMachine ) { - /** - * The logger instance of this guest. - */ - private val logger = KotlinLogging.logger {} - /** * The state of the [Guest]. * @@ -68,12 +63,12 @@ internal class Guest( fun start() { when (state) { ServerState.TERMINATED, ServerState.ERROR -> { - logger.info { "User requested to start server ${server.uid}" } + LOGGER.info { "User requested to start server ${server.uid}" } doStart() } ServerState.RUNNING -> return ServerState.DELETED -> { - logger.warn { "User tried to start deleted server" } + LOGGER.warn { "User tried to start deleted server" } throw IllegalArgumentException("Server is deleted") } else -> assert(false) { "Invalid state transition" } @@ -239,4 +234,9 @@ internal class Guest( _downtime += duration } } + + private companion object { + @JvmStatic + private val LOGGER = KotlinLogging.logger {} + } } -- cgit v1.2.3