diff options
89 files changed, 1864 insertions, 1014 deletions
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 503d1549..f78ab816 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -17,6 +17,7 @@ kotlinx-coroutines = "1.6.4" ktlint-gradle = "10.3.0" log4j = "2.18.0" microprofile-openapi = "3.0" +microprofile-config = "3.0.1" mockk = "1.12.5" parquet = "1.12.3" progressbar = "0.9.3" @@ -97,6 +98,7 @@ hadoop-common = { module = "org.apache.hadoop:hadoop-common", version.ref = "had hadoop-mapreduce-client-core = { module = "org.apache.hadoop:hadoop-mapreduce-client-core", version.ref = "hadoop" } commons-math3 = { module = "org.apache.commons:commons-math3", version.ref = "commons-math3" } microprofile-openapi-api = { module = "org.eclipse.microprofile.openapi:microprofile-openapi-api", version.ref = "microprofile-openapi" } +microprofile-config = { module = "org.eclipse.microprofile.config:microprofile-config-api", version.ref = "microprofile-config" } # Other (Build) dokka-gradle = { module = "org.jetbrains.dokka:dokka-gradle-plugin", version.ref = "dokka" } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt deleted file mode 100644 index f6744123..00000000 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt +++ /dev/null @@ -1,204 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.workload - -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch -import kotlinx.coroutines.yield -import org.opendc.compute.service.ComputeService -import org.opendc.compute.service.scheduler.ComputeScheduler -import org.opendc.compute.simulator.SimHost -import org.opendc.compute.workload.topology.HostSpec -import org.opendc.simulator.compute.SimBareMetalMachine -import org.opendc.simulator.compute.kernel.SimHypervisor -import org.opendc.simulator.compute.workload.SimTraceWorkload -import org.opendc.simulator.flow.FlowEngine -import java.time.Clock -import java.time.Duration -import java.util.* -import kotlin.coroutines.CoroutineContext -import kotlin.math.max - -/** - * Helper class to simulate VM-based workloads in OpenDC. - * - * @param context [CoroutineContext] to run the simulation in. - * @param clock [Clock] instance tracking simulation time. - * @param scheduler [ComputeScheduler] implementation to use for the service. - * @param schedulingQuantum The scheduling quantum of the scheduler. - */ -public class ComputeServiceHelper( - private val context: CoroutineContext, - private val clock: Clock, - scheduler: ComputeScheduler, - seed: Long, - schedulingQuantum: Duration = Duration.ofMinutes(5) -) : AutoCloseable { - /** - * The [ComputeService] that has been configured by the manager. - */ - public val service: ComputeService = ComputeService(context, clock, scheduler, schedulingQuantum) - - /** - * The [FlowEngine] to simulate the hosts. - */ - private val engine = FlowEngine(context, clock) - - /** - * The hosts that belong to this class. - */ - private val hosts = mutableSetOf<SimHost>() - - /** - * The source of randomness. - */ - private val random = SplittableRandom(seed) - - /** - * Run a simulation of the [ComputeService] by replaying the workload trace given by [trace]. - * - * @param trace The trace to simulate. - * @param submitImmediately A flag to indicate that the servers are scheduled immediately (so not at their start time). - * @param failureModel A failure model to use for injecting failures. - * @param interference A flag to indicate that VM interference needs to be enabled. - */ - public suspend fun run( - trace: List<VirtualMachine>, - submitImmediately: Boolean = false, - failureModel: FailureModel? = null, - interference: Boolean = false, - ) { - val injector = failureModel?.createInjector(context, clock, service, Random(random.nextLong())) - val client = service.newClient() - val clock = clock - - // Create new image for the virtual machine - val image = client.newImage("vm-image") - - try { - coroutineScope { - // Start the fault injector - injector?.start() - - var offset = Long.MIN_VALUE - - for (entry in trace.sortedBy { it.startTime }) { - val now = clock.millis() - val start = entry.startTime.toEpochMilli() - - if (offset < 0) { - offset = start - now - } - - // Make sure the trace entries are ordered by submission time - assert(start - offset >= 0) { "Invalid trace order" } - - if (!submitImmediately) { - delay(max(0, (start - offset) - now)) - } - - val workloadOffset = -offset + 300001 - val workload = SimTraceWorkload(entry.trace, workloadOffset) - val meta = mutableMapOf<String, Any>("workload" to workload) - - val interferenceProfile = entry.interferenceProfile - if (interference && interferenceProfile != null) { - meta["interference-profile"] = interferenceProfile - } - - launch { - val server = client.newServer( - entry.name, - image, - client.newFlavor( - entry.name, - entry.cpuCount, - entry.memCapacity, - meta = if (entry.cpuCapacity > 0.0) mapOf("cpu-capacity" to entry.cpuCapacity) else emptyMap() - ), - meta = meta - ) - - // Wait for the server reach its end time - val endTime = entry.stopTime.toEpochMilli() - delay(endTime + workloadOffset - clock.millis() + 5 * 60 * 1000) - - // Stop the server after reaching the end-time of the virtual machine - server.stop() - } - } - } - - yield() - } finally { - injector?.close() - client.close() - } - } - - /** - * Register a host for this simulation. - * - * @param spec The definition of the host. - * @param optimize Merge the CPU resources of the host into a single CPU resource. - * @return The [SimHost] that has been constructed by the runner. - */ - public fun registerHost(spec: HostSpec, optimize: Boolean = false): SimHost { - val machine = SimBareMetalMachine(engine, spec.model, spec.powerDriver) - val hypervisor = SimHypervisor(engine, spec.multiplexerFactory, random) - - val host = SimHost( - spec.uid, - spec.name, - spec.meta, - context, - clock, - machine, - hypervisor, - optimize = optimize - ) - - require(hosts.add(host)) { "Host with uid ${spec.uid} already exists" } - service.addHost(host) - - return host - } - - override fun close() { - service.close() - - for (host in hosts) { - host.close() - } - - hosts.clear() - } - - /** - * Construct a [ComputeService] instance. - */ - private fun createService(scheduler: ComputeScheduler, schedulingQuantum: Duration): ComputeService { - return ComputeService(context, clock, scheduler, schedulingQuantum) - } -} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/Topology.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/Topology.kt deleted file mode 100644 index 3b8dc918..00000000 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/Topology.kt +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.workload.topology - -/** - * Representation of the environment of the compute service, describing the physical details of every host. - */ -public interface Topology { - /** - * Resolve the [Topology] into a list of [HostSpec]s. - */ - public fun resolve(): List<HostSpec> -} diff --git a/opendc-workflow/opendc-workflow-workload/build.gradle.kts b/opendc-experiments/opendc-experiments-base/build.gradle.kts index 17eadf29..2cce8c1c 100644 --- a/opendc-workflow/opendc-workflow-workload/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-base/build.gradle.kts @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 AtLarge Research + * Copyright (c) 2022 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,16 +20,17 @@ * SOFTWARE. */ -description = "Support library for simulating workflows with OpenDC" +description = "Experiment base for OpenDC" /* Build configuration */ plugins { `kotlin-library-conventions` + `testing-conventions` + `jacoco-conventions` } dependencies { - api(projects.opendcWorkflow.opendcWorkflowService) + api(libs.microprofile.config) - implementation(projects.opendcSimulator.opendcSimulatorCompute) - implementation(projects.opendcTrace.opendcTraceApi) + implementation(projects.opendcSimulator.opendcSimulatorCore) } diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/MutableServiceRegistry.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/MutableServiceRegistry.kt new file mode 100644 index 00000000..160dd393 --- /dev/null +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/MutableServiceRegistry.kt @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments + +/** + * A mutable [ServiceRegistry]. + */ +public interface MutableServiceRegistry : ServiceRegistry { + /** + * Register [service] for the specified [name] in this registry. + * + * @param name The name of the service to register, which should follow the rules for domain names as defined by + * DNS. + * @param type The interface provided by the service. + * @param service A reference to the actual implementation of the service. + */ + public fun <T : Any> register(name: String, type: Class<T>, service: T) + + /** + * Remove the service with [name] and [type] from this registry. + * + * @param name The name of the service to remove, which should follow the rules for domain names as defined by DNS. + * @param type The type of the service to remove. + */ + public fun remove(name: String, type: Class<*>) + + /** + * Remove all services registered with [name]. + * + * @param name The name of the services to remove, which should follow the rules for domain names as defined by DNS. + */ + public fun remove(name: String) + + /** + * Create a copy of the registry. + */ + public override fun clone(): MutableServiceRegistry +} diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/ServiceRegistry.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/ServiceRegistry.kt new file mode 100644 index 00000000..e9d5b50e --- /dev/null +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/ServiceRegistry.kt @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments + +/** + * A read-only registry of services used during experiments to resolve services. + * + * The service registry is similar conceptually to the Domain Name System (DNS), which is a naming system used to + * identify computers reachable via the Internet. The service registry should be used in a similar fashion. + */ +public interface ServiceRegistry { + /** + * Lookup the service with the specified [name] and [type]. + * + * @param name The name of the service to resolve, which should follow the rules for domain names as defined by DNS. + * @param type The type of the service to resolve, identified by the interface that is implemented by the service. + * @return The service with specified [name] and implementing [type] or `null` if it does not exist. + */ + public fun <T : Any> resolve(name: String, type: Class<T>): T? + + /** + * Create a copy of the registry. + */ + public fun clone(): ServiceRegistry +} diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/internal/ServiceRegistryImpl.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/internal/ServiceRegistryImpl.kt new file mode 100644 index 00000000..c2e91730 --- /dev/null +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/internal/ServiceRegistryImpl.kt @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.internal + +import org.opendc.experiments.MutableServiceRegistry + +/** + * Implementation of the [MutableServiceRegistry] interface. + */ +internal class ServiceRegistryImpl(private val registry: MutableMap<String, MutableMap<Class<*>, Any>> = mutableMapOf()) : + MutableServiceRegistry { + override fun <T : Any> resolve(name: String, type: Class<T>): T? { + val servicesForName = registry[name] ?: return null + + @Suppress("UNCHECKED_CAST") + return servicesForName[type] as T? + } + + override fun <T : Any> register(name: String, type: Class<T>, service: T) { + val services = registry.computeIfAbsent(name) { mutableMapOf() } + + if (type in services) { + throw IllegalStateException("Duplicate service $type registered for name $name") + } + + services[type] = service + } + + override fun remove(name: String, type: Class<*>) { + val services = registry[name] ?: return + services.remove(type) + } + + override fun remove(name: String) { + registry.remove(name) + } + + override fun clone(): MutableServiceRegistry { + val res = mutableMapOf<String, MutableMap<Class<*>, Any>>() + registry.mapValuesTo(res) { (_, v) -> v.toMutableMap() } + return ServiceRegistryImpl(res) + } + + override fun toString(): String { + val entries = registry.map { "${it.key}=${it.value}" }.joinToString() + return "ServiceRegistry{$entries}" + } +} diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/Provisioner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/Provisioner.kt new file mode 100644 index 00000000..3a1c3144 --- /dev/null +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/Provisioner.kt @@ -0,0 +1,101 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.provisioner + +import org.opendc.experiments.MutableServiceRegistry +import org.opendc.experiments.ServiceRegistry +import org.opendc.experiments.internal.ServiceRegistryImpl +import java.time.Clock +import java.util.* +import java.util.ArrayDeque +import kotlin.coroutines.CoroutineContext + +/** + * A helper class to set up the experimental environment in a reproducible manner. + * + * With this class, users describe the environment using multiple [ProvisioningStep]s. These re-usable + * [ProvisioningStep]s are executed sequentially and ensure that the necessary infrastructure is configured and teared + * down after the simulation completes. + * + * @param coroutineContext The [CoroutineContext] in which the environment is set up. + * @param clock The simulation [Clock]. + * @param seed A seed for initializing the randomness of the environment. + */ +public class Provisioner(coroutineContext: CoroutineContext, clock: Clock, seed: Long) : AutoCloseable { + /** + * Implementation of [ProvisioningContext]. + */ + private val context = object : ProvisioningContext { + override val clock: Clock = clock + override val coroutineContext: CoroutineContext = coroutineContext + override val seeder: SplittableRandom = SplittableRandom(seed) + override val registry: MutableServiceRegistry = ServiceRegistryImpl() + + override fun toString(): String = "Provisioner.ProvisioningContext" + } + + /** + * The stack of handles to run during the clean-up process. + */ + private val stack = ArrayDeque<AutoCloseable>() + + /** + * The [ServiceRegistry] containing the services registered in this environment. + */ + public val registry: ServiceRegistry + get() = context.registry + + /** + * Run a single [ProvisioningStep] for this environment. + * + * @param step The step to apply to the environment. + */ + public fun runStep(step: ProvisioningStep) { + val handle = step.apply(context) + stack.push(handle) + } + + /** + * Run multiple [ProvisioningStep]s for this environment. + * + * @param steps The steps to apply to the environment. + */ + public fun runSteps(vararg steps: ProvisioningStep) { + val ctx = context + val stack = stack + for (step in steps) { + val handle = step.apply(ctx) + stack.push(handle) + } + } + + /** + * Clean-up the environment. + */ + override fun close() { + val stack = stack + while (stack.isNotEmpty()) { + stack.pop().close() + } + } +} diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/ProvisioningContext.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/ProvisioningContext.kt new file mode 100644 index 00000000..58f6844d --- /dev/null +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/ProvisioningContext.kt @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.provisioner + +import org.opendc.experiments.MutableServiceRegistry +import java.time.Clock +import java.util.* +import kotlin.coroutines.CoroutineContext + +/** + * The [ProvisioningContext] class provides access to shared state between subsequent [ProvisioningStep]s, as well as + * access to the simulation dispatcher (via [CoroutineContext]), the virtual clock, and a randomness seeder to allow + * the provisioning steps to initialize the (simulated) resources. + */ +public interface ProvisioningContext { + /** + * The [CoroutineContext] in which the provisioner runs. + */ + public val coroutineContext: CoroutineContext + + /** + * The [Clock] tracking the virtual simulation time. + */ + public val clock: Clock + + /** + * A [SplittableRandom] instance used to seed the provisioners. + */ + public val seeder: SplittableRandom + + /** + * A [MutableServiceRegistry] where the provisioned services are registered. + */ + public val registry: MutableServiceRegistry +} diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/ProvisioningStep.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/ProvisioningStep.kt new file mode 100644 index 00000000..e78f8d4f --- /dev/null +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/ProvisioningStep.kt @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.provisioner + +import org.eclipse.microprofile.config.Config + +/** + * A provisioning step is responsible for provisioning (acquiring or configuring) infrastructure necessary for a + * simulation experiment. + */ +public fun interface ProvisioningStep { + /** + * Apply the step by provisioning the required resources for the experiment using the specified + * [ProvisioningContext][ctx]. + * + * @param ctx The environment in which the resources should be provisioned. + * @return A handle that is invoked once the simulation completes, so that the resources can be cleaned up. + */ + public fun apply(ctx: ProvisioningContext): AutoCloseable + + /** + * A factory interface for [ProvisioningStep] instances. + * + * @param S The type that describes the input for constructing a [ProvisioningStep]. + */ + public abstract class Provider<S>(public val type: Class<S>) { + /** + * The name that identifies the provisioning step. + */ + public abstract val name: String + + /** + * Construct a [ProvisioningStep] with the specified [spec]. + * + * @param spec The specification that describes the provisioner to be created. + * @param config The external configuration of the experiment runner. + * @return The [ProvisioningStep] constructed according to [spec]. + */ + public abstract fun create(spec: S, config: Config): ProvisioningStep + } +} diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/ServiceRegistryTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/ServiceRegistryTest.kt new file mode 100644 index 00000000..f69c07c1 --- /dev/null +++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/ServiceRegistryTest.kt @@ -0,0 +1,111 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments + +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import org.opendc.experiments.internal.ServiceRegistryImpl + +/** + * Test suite for the [ServiceRegistry] implementation. + */ +class ServiceRegistryTest { + @Test + fun testRetrievalSuccess() { + val registry = ServiceRegistryImpl() + + registry.register("opendc.org", String::class.java, "Comment") + + assertEquals("Comment", registry.resolve("opendc.org", String::class.java)) + } + + @Test + fun testRetrievalFailure() { + val registry = ServiceRegistryImpl() + assertNull(registry.resolve("opendc.org", String::class.java)) + } + + @Test + fun testDuplicate() { + val registry = ServiceRegistryImpl() + + registry.register("opendc.org", String::class.java, "Comment") + + assertThrows<IllegalStateException> { registry.register("opendc.org", String::class.java, "Comment2") } + } + + @Test + fun testRemove() { + val registry = ServiceRegistryImpl() + + registry.register("opendc.org", String::class.java, "Comment") + registry.remove("opendc.org", String::class.java) + + assertAll( + { assertDoesNotThrow { registry.remove("opendc.org", String::class.java) } }, + { assertNull(registry.resolve("opendc.org", String::class.java)) } + ) + } + + @Test + fun testRemoveNonExistent() { + val registry = ServiceRegistryImpl() + + assertAll( + { assertNull(registry.resolve("opendc.org", String::class.java)) }, + { assertDoesNotThrow { registry.remove("opendc.org", String::class.java) } } + ) + } + + @Test + fun testRemoveAll() { + val registry = ServiceRegistryImpl() + + registry.register("opendc.org", String::class.java, "Comment") + registry.register("opendc.org", Int::class.java, 1) + + println(registry) + + registry.remove("opendc.org") + + assertAll( + { assertNull(registry.resolve("opendc.org", String::class.java)) }, + { assertNull(registry.resolve("opendc.org", Int::class.java)) }, + ) + } + + @Test + fun testClone() { + val registry = ServiceRegistryImpl() + registry.register("opendc.org", String::class.java, "Comment") + + val clone = registry.clone() + clone.remove("opendc.org") + + assertAll( + { assertEquals("Comment", registry.resolve("opendc.org", String::class.java)) }, + { assertNull(clone.resolve("opendc.org", String::class.java)) } + ) + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts index 8320179a..e19784ba 100644 --- a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts @@ -32,7 +32,7 @@ plugins { } dependencies { - api(projects.opendcCompute.opendcComputeWorkload) + api(projects.opendcExperiments.opendcExperimentsCompute) implementation(projects.opendcSimulator.opendcSimulatorCore) implementation(projects.opendcSimulator.opendcSimulatorCompute) diff --git a/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt b/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt index c09ce96a..f021e223 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt @@ -22,15 +22,16 @@ package org.opendc.experiments.capelin +import org.opendc.compute.service.ComputeService import org.opendc.compute.service.scheduler.FilterScheduler import org.opendc.compute.service.scheduler.filters.ComputeFilter import org.opendc.compute.service.scheduler.filters.RamFilter import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.service.scheduler.weights.CoreRamWeigher -import org.opendc.compute.workload.* -import org.opendc.compute.workload.topology.Topology -import org.opendc.compute.workload.topology.apply import org.opendc.experiments.capelin.topology.clusterTopology +import org.opendc.experiments.compute.* +import org.opendc.experiments.compute.topology.HostSpec +import org.opendc.experiments.provisioner.Provisioner import org.opendc.simulator.core.runBlockingSimulation import org.openjdk.jmh.annotations.* import java.io.File @@ -46,7 +47,7 @@ import java.util.concurrent.TimeUnit @Measurement(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS) class CapelinBenchmarks { private lateinit var vms: List<VirtualMachine> - private lateinit var topology: Topology + private lateinit var topology: List<HostSpec> @Param("true", "false") private var isOptimized: Boolean = false @@ -54,29 +55,27 @@ class CapelinBenchmarks { @Setup fun setUp() { val loader = ComputeWorkloadLoader(File("src/test/resources/trace")) - val source = trace("bitbrains-small") vms = trace("bitbrains-small").resolve(loader, Random(1L)) topology = checkNotNull(object {}.javaClass.getResourceAsStream("/topology.txt")).use { clusterTopology(it) } } @Benchmark fun benchmarkCapelin() = runBlockingSimulation { - val computeScheduler = FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), - weighers = listOf(CoreRamWeigher(multiplier = 1.0)) - ) - val runner = ComputeServiceHelper( - coroutineContext, - clock, - computeScheduler, - seed = 0L - ) + val serviceDomain = "compute.opendc.org" - try { - runner.apply(topology, isOptimized) - runner.run(vms, interference = true) - } finally { - runner.close() + Provisioner(coroutineContext, clock, seed = 0).use { provisioner -> + val computeScheduler = FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), + weighers = listOf(CoreRamWeigher(multiplier = 1.0)) + ) + + provisioner.runSteps( + setupComputeService(serviceDomain, { computeScheduler }), + setupHosts(serviceDomain, topology, optimize = isOptimized) + ) + + val service = provisioner.registry.resolve(serviceDomain, ComputeService::class.java)!! + service.replay(clock, vms, 0L, interference = true) } } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt index dbb5ced3..f1214b08 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt @@ -22,15 +22,12 @@ package org.opendc.experiments.capelin -import org.opendc.compute.workload.ComputeServiceHelper -import org.opendc.compute.workload.ComputeWorkloadLoader -import org.opendc.compute.workload.createComputeScheduler -import org.opendc.compute.workload.export.parquet.ParquetComputeMonitor -import org.opendc.compute.workload.grid5000 -import org.opendc.compute.workload.telemetry.ComputeMetricReader -import org.opendc.compute.workload.topology.apply +import org.opendc.compute.service.ComputeService import org.opendc.experiments.capelin.model.Scenario import org.opendc.experiments.capelin.topology.clusterTopology +import org.opendc.experiments.compute.* +import org.opendc.experiments.compute.export.parquet.ParquetComputeMonitor +import org.opendc.experiments.provisioner.Provisioner import org.opendc.simulator.core.runBlockingSimulation import java.io.File import java.time.Duration @@ -58,54 +55,41 @@ public class CapelinRunner( * Run a single [scenario] with the specified seed. */ fun runScenario(scenario: Scenario, seed: Long) = runBlockingSimulation { - val seeder = Random(seed) - - val operationalPhenomena = scenario.operationalPhenomena - val computeScheduler = createComputeScheduler(scenario.allocationPolicy, seeder) - val failureModel = - if (operationalPhenomena.failureFrequency > 0) - grid5000(Duration.ofSeconds((operationalPhenomena.failureFrequency * 60).roundToLong())) - else - null - val vms = scenario.workload.source.resolve(workloadLoader, seeder) - val runner = ComputeServiceHelper( - coroutineContext, - clock, - computeScheduler, - seed, - ) - + val serviceDomain = "compute.opendc.org" val topology = clusterTopology(File(envPath, "${scenario.topology.name}.txt")) - val partitions = scenario.partitions + ("seed" to seed.toString()) - val partition = partitions.map { (k, v) -> "$k=$v" }.joinToString("/") - val exporter = if (outputPath != null) { - ComputeMetricReader( - this, - clock, - runner.service, - ParquetComputeMonitor( - outputPath, - partition, - bufferSize = 4096 - ), - exportInterval = Duration.ofMinutes(5) + Provisioner(coroutineContext, clock, seed).use { provisioner -> + provisioner.runSteps( + setupComputeService(serviceDomain, { createComputeScheduler(scenario.allocationPolicy, Random(it.seeder.nextLong())) }), + setupHosts(serviceDomain, topology, optimize = true) ) - } else { - null - } - try { - // Instantiate the desired topology - runner.apply(topology, optimize = true) + if (outputPath != null) { + val partitions = scenario.partitions + ("seed" to seed.toString()) + val partition = partitions.map { (k, v) -> "$k=$v" }.joinToString("/") + + provisioner.runStep( + registerComputeMonitor( + serviceDomain, + ParquetComputeMonitor( + outputPath, + partition, + bufferSize = 4096 + ) + ) + ) + } - // Run the workload trace - runner.run(vms, failureModel = failureModel, interference = operationalPhenomena.hasInterference) + val service = provisioner.registry.resolve(serviceDomain, ComputeService::class.java)!! + val vms = scenario.workload.source.resolve(workloadLoader, Random(seed)) + val operationalPhenomena = scenario.operationalPhenomena + val failureModel = + if (operationalPhenomena.failureFrequency > 0) + grid5000(Duration.ofSeconds((operationalPhenomena.failureFrequency * 60).roundToLong())) + else + null - // Stop the metric collection - exporter?.close() - } finally { - runner.close() + service.replay(clock, vms, seed, failureModel = failureModel, interference = operationalPhenomena.hasInterference) } } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Topology.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Topology.kt index fe16a294..c90194ce 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Topology.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Topology.kt @@ -23,6 +23,6 @@ package org.opendc.experiments.capelin.model /** - * The topology topology on which we test the workload. + * The topology on which we simulate the workload. */ public data class Topology(val name: String) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt index a2e71243..ed2588f0 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt @@ -22,7 +22,7 @@ package org.opendc.experiments.capelin.model -import org.opendc.compute.workload.ComputeWorkload +import org.opendc.experiments.compute.ComputeWorkload /** * A single workload originating from a trace. diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/CompositeWorkloadPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/CompositeWorkloadPortfolio.kt index 68eb15b3..80b8859c 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/CompositeWorkloadPortfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/CompositeWorkloadPortfolio.kt @@ -22,12 +22,12 @@ package org.opendc.experiments.capelin.portfolio -import org.opendc.compute.workload.composite -import org.opendc.compute.workload.trace import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Scenario import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload +import org.opendc.experiments.compute.composite +import org.opendc.experiments.compute.trace /** * A [Portfolio] that explores the effect of a composite workload. diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/HorVerPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/HorVerPortfolio.kt index 0d7f3072..f3c002ac 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/HorVerPortfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/HorVerPortfolio.kt @@ -22,12 +22,12 @@ package org.opendc.experiments.capelin.portfolio -import org.opendc.compute.workload.sampleByLoad -import org.opendc.compute.workload.trace import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Scenario import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload +import org.opendc.experiments.compute.sampleByLoad +import org.opendc.experiments.compute.trace /** * A [Portfolio] that explores the difference between horizontal and vertical scaling. diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/MoreHpcPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/MoreHpcPortfolio.kt index 6afffc09..22f9f3ac 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/MoreHpcPortfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/MoreHpcPortfolio.kt @@ -22,13 +22,13 @@ package org.opendc.experiments.capelin.portfolio -import org.opendc.compute.workload.sampleByHpc -import org.opendc.compute.workload.sampleByHpcLoad -import org.opendc.compute.workload.trace import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Scenario import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload +import org.opendc.experiments.compute.sampleByHpc +import org.opendc.experiments.compute.sampleByHpcLoad +import org.opendc.experiments.compute.trace /** * A [Portfolio] to explore the effect of HPC workloads. diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/MoreVelocityPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/MoreVelocityPortfolio.kt index 92bf80b3..e63a5807 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/MoreVelocityPortfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/MoreVelocityPortfolio.kt @@ -22,12 +22,12 @@ package org.opendc.experiments.capelin.portfolio -import org.opendc.compute.workload.sampleByLoad -import org.opendc.compute.workload.trace import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Scenario import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload +import org.opendc.experiments.compute.sampleByLoad +import org.opendc.experiments.compute.trace /** * A [Portfolio] that explores the effect of adding more velocity to a cluster (e.g., faster machines). diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/OperationalPhenomenaPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/OperationalPhenomenaPortfolio.kt index f9a9d681..12570108 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/OperationalPhenomenaPortfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/OperationalPhenomenaPortfolio.kt @@ -22,12 +22,12 @@ package org.opendc.experiments.capelin.portfolio -import org.opendc.compute.workload.sampleByLoad -import org.opendc.compute.workload.trace import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Scenario import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload +import org.opendc.experiments.compute.sampleByLoad +import org.opendc.experiments.compute.trace /** * A [Portfolio] that explores the effect of operational phenomena on metrics. diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/TestPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/TestPortfolio.kt index 944e9f43..6f126b87 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/TestPortfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/TestPortfolio.kt @@ -22,11 +22,11 @@ package org.opendc.experiments.capelin.portfolio -import org.opendc.compute.workload.trace import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Scenario import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload +import org.opendc.experiments.compute.trace /** * A [Portfolio] to perform a simple test run. diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt index 5ab4261a..054adfcd 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt @@ -23,8 +23,7 @@ @file:JvmName("TopologyFactories") package org.opendc.experiments.capelin.topology -import org.opendc.compute.workload.topology.HostSpec -import org.opendc.compute.workload.topology.Topology +import org.opendc.experiments.compute.topology.HostSpec import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode @@ -43,61 +42,55 @@ import kotlin.math.roundToLong private val reader = ClusterSpecReader() /** - * Construct a [Topology] from the specified [file]. + * Construct a topology from the specified [file]. */ fun clusterTopology( file: File, powerModel: PowerModel = LinearPowerModel(350.0, idlePower = 200.0), random: Random = Random(0) -): Topology = clusterTopology(reader.read(file), powerModel, random) +): List<HostSpec> { + return clusterTopology(reader.read(file), powerModel, random) +} /** - * Construct a [Topology] from the specified [input]. + * Construct a topology from the specified [input]. */ fun clusterTopology( input: InputStream, powerModel: PowerModel = LinearPowerModel(350.0, idlePower = 200.0), random: Random = Random(0) -): Topology = clusterTopology(reader.read(input), powerModel, random) +): List<HostSpec> { + return clusterTopology(reader.read(input), powerModel, random) +} /** - * Construct a [Topology] from the given list of [clusters]. + * Construct a topology from the given list of [clusters]. */ -fun clusterTopology( - clusters: List<ClusterSpec>, - powerModel: PowerModel, - random: Random = Random(0) -): Topology { - return object : Topology { - override fun resolve(): List<HostSpec> { - val hosts = mutableListOf<HostSpec>() - for (cluster in clusters) { - val cpuSpeed = cluster.cpuSpeed - val memoryPerHost = cluster.memCapacityPerHost.roundToLong() - - val unknownProcessingNode = ProcessingNode("unknown", "unknown", "unknown", cluster.cpuCountPerHost) - val unknownMemoryUnit = MemoryUnit("unknown", "unknown", -1.0, memoryPerHost) - val machineModel = MachineModel( - List(cluster.cpuCountPerHost) { coreId -> ProcessingUnit(unknownProcessingNode, coreId, cpuSpeed) }, - listOf(unknownMemoryUnit) - ) - - repeat(cluster.hostCount) { - val spec = HostSpec( - UUID(random.nextLong(), it.toLong()), - "node-${cluster.name}-$it", - mapOf("cluster" to cluster.id), - machineModel, - SimplePowerDriver(powerModel) - ) +fun clusterTopology(clusters: List<ClusterSpec>, powerModel: PowerModel, random: Random = Random(0)): List<HostSpec> { + return clusters.flatMap { it.toHostSpecs(random, powerModel) } +} - hosts += spec - } - } +/** + * Helper method to convert a [ClusterSpec] into a list of [HostSpec]s. + */ +private fun ClusterSpec.toHostSpecs(random: Random, powerModel: PowerModel): List<HostSpec> { + val cpuSpeed = cpuSpeed + val memoryPerHost = memCapacityPerHost.roundToLong() - return hosts - } + val unknownProcessingNode = ProcessingNode("unknown", "unknown", "unknown", cpuCountPerHost) + val unknownMemoryUnit = MemoryUnit("unknown", "unknown", -1.0, memoryPerHost) + val machineModel = MachineModel( + List(cpuCountPerHost) { coreId -> ProcessingUnit(unknownProcessingNode, coreId, cpuSpeed) }, + listOf(unknownMemoryUnit) + ) - override fun toString(): String = "ClusterSpecTopology" + return List(hostCount) { + HostSpec( + UUID(random.nextLong(), it.toLong()), + "node-$name-$it", + mapOf("cluster" to id), + machineModel, + SimplePowerDriver(powerModel) + ) } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index eae3c993..9be2d522 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -26,18 +26,19 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll +import org.opendc.compute.service.ComputeService import org.opendc.compute.service.scheduler.FilterScheduler import org.opendc.compute.service.scheduler.filters.ComputeFilter import org.opendc.compute.service.scheduler.filters.RamFilter import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.service.scheduler.weights.CoreRamWeigher -import org.opendc.compute.workload.* -import org.opendc.compute.workload.telemetry.ComputeMetricReader -import org.opendc.compute.workload.telemetry.ComputeMonitor -import org.opendc.compute.workload.telemetry.table.HostTableReader -import org.opendc.compute.workload.topology.Topology -import org.opendc.compute.workload.topology.apply import org.opendc.experiments.capelin.topology.clusterTopology +import org.opendc.experiments.compute.* +import org.opendc.experiments.compute.telemetry.ComputeMonitor +import org.opendc.experiments.compute.telemetry.table.HostTableReader +import org.opendc.experiments.compute.telemetry.table.ServiceTableReader +import org.opendc.experiments.compute.topology.HostSpec +import org.opendc.experiments.provisioner.Provisioner import org.opendc.simulator.core.runBlockingSimulation import java.io.File import java.time.Duration @@ -82,45 +83,41 @@ class CapelinIntegrationTest { fun testLarge() = runBlockingSimulation { val seed = 0L val workload = createTestWorkload(1.0, seed) - val runner = ComputeServiceHelper( - coroutineContext, - clock, - computeScheduler, - seed, - ) val topology = createTopology() - val reader = ComputeMetricReader(this, clock, runner.service, monitor) - - try { - runner.apply(topology) - runner.run(workload) + val monitor = monitor - val serviceMetrics = runner.service.getSchedulerStats() - println( - "Scheduler " + - "Success=${serviceMetrics.attemptsSuccess} " + - "Failure=${serviceMetrics.attemptsFailure} " + - "Error=${serviceMetrics.attemptsError} " + - "Pending=${serviceMetrics.serversPending} " + - "Active=${serviceMetrics.serversActive}" + Provisioner(coroutineContext, clock, seed).use { provisioner -> + provisioner.runSteps( + setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), + registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), + setupHosts(serviceDomain = "compute.opendc.org", topology), ) - // Note that these values have been verified beforehand - assertAll( - { assertEquals(50, serviceMetrics.attemptsSuccess, "The scheduler should schedule 50 VMs") }, - { assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") }, - { assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") }, - { assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") }, - { assertEquals(223393683, this@CapelinIntegrationTest.monitor.idleTime) { "Incorrect idle time" } }, - { assertEquals(66977508, this@CapelinIntegrationTest.monitor.activeTime) { "Incorrect active time" } }, - { assertEquals(3160381, this@CapelinIntegrationTest.monitor.stealTime) { "Incorrect steal time" } }, - { assertEquals(0, this@CapelinIntegrationTest.monitor.lostTime) { "Incorrect lost time" } }, - { assertEquals(5.840939264814157E9, this@CapelinIntegrationTest.monitor.energyUsage, 0.01) { "Incorrect power draw" } }, - ) - } finally { - runner.close() - reader.close() + val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! + service.replay(clock, workload, seed) } + + println( + "Scheduler " + + "Success=${monitor.attemptsSuccess} " + + "Failure=${monitor.attemptsFailure} " + + "Error=${monitor.attemptsError} " + + "Pending=${monitor.serversPending} " + + "Active=${monitor.serversActive}" + ) + + // Note that these values have been verified beforehand + assertAll( + { assertEquals(50, monitor.attemptsSuccess, "The scheduler should schedule 50 VMs") }, + { assertEquals(0, monitor.serversActive, "All VMs should finish after a run") }, + { assertEquals(0, monitor.attemptsFailure, "No VM should be unscheduled") }, + { assertEquals(0, monitor.serversPending, "No VM should not be in the queue") }, + { assertEquals(223393683, monitor.idleTime) { "Incorrect idle time" } }, + { assertEquals(66977508, monitor.activeTime) { "Incorrect active time" } }, + { assertEquals(3160381, monitor.stealTime) { "Incorrect steal time" } }, + { assertEquals(0, monitor.lostTime) { "Incorrect lost time" } }, + { assertEquals(5.840939264814157E9, monitor.energyUsage, 0.01) { "Incorrect power draw" } }, + ) } /** @@ -130,40 +127,36 @@ class CapelinIntegrationTest { fun testSmall() = runBlockingSimulation { val seed = 1L val workload = createTestWorkload(0.25, seed) - val runner = ComputeServiceHelper( - coroutineContext, - clock, - computeScheduler, - seed, - ) val topology = createTopology("single") - val reader = ComputeMetricReader(this, clock, runner.service, monitor) - - try { - runner.apply(topology) - runner.run(workload) + val monitor = monitor - val serviceMetrics = runner.service.getSchedulerStats() - println( - "Scheduler " + - "Success=${serviceMetrics.attemptsSuccess} " + - "Failure=${serviceMetrics.attemptsFailure} " + - "Error=${serviceMetrics.attemptsError} " + - "Pending=${serviceMetrics.serversPending} " + - "Active=${serviceMetrics.serversActive}" + Provisioner(coroutineContext, clock, seed).use { provisioner -> + provisioner.runSteps( + setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), + registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), + setupHosts(serviceDomain = "compute.opendc.org", topology), ) - } finally { - runner.close() - reader.close() + + val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! + service.replay(clock, workload, seed) } + println( + "Scheduler " + + "Success=${monitor.attemptsSuccess} " + + "Failure=${monitor.attemptsFailure} " + + "Error=${monitor.attemptsError} " + + "Pending=${monitor.serversPending} " + + "Active=${monitor.serversActive}" + ) + // Note that these values have been verified beforehand assertAll( - { assertEquals(10999592, this@CapelinIntegrationTest.monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(9741207, this@CapelinIntegrationTest.monitor.activeTime) { "Active time incorrect" } }, - { assertEquals(0, this@CapelinIntegrationTest.monitor.stealTime) { "Steal time incorrect" } }, - { assertEquals(0, this@CapelinIntegrationTest.monitor.lostTime) { "Lost time incorrect" } }, - { assertEquals(7.011676470304312E8, this@CapelinIntegrationTest.monitor.energyUsage, 0.01) { "Incorrect power draw" } } + { assertEquals(10999592, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(9741207, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, + { assertEquals(7.011676470304312E8, monitor.energyUsage, 0.01) { "Incorrect power draw" } } ) } @@ -174,40 +167,34 @@ class CapelinIntegrationTest { fun testInterference() = runBlockingSimulation { val seed = 0L val workload = createTestWorkload(1.0, seed) - - val simulator = ComputeServiceHelper( - coroutineContext, - clock, - computeScheduler, - seed - ) val topology = createTopology("single") - val reader = ComputeMetricReader(this, clock, simulator.service, monitor) - try { - simulator.apply(topology) - simulator.run(workload, interference = true) - - val serviceMetrics = simulator.service.getSchedulerStats() - println( - "Scheduler " + - "Success=${serviceMetrics.attemptsSuccess} " + - "Failure=${serviceMetrics.attemptsFailure} " + - "Error=${serviceMetrics.attemptsError} " + - "Pending=${serviceMetrics.serversPending} " + - "Active=${serviceMetrics.serversActive}" + Provisioner(coroutineContext, clock, seed).use { provisioner -> + provisioner.runSteps( + setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), + registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), + setupHosts(serviceDomain = "compute.opendc.org", topology), ) - } finally { - simulator.close() - reader.close() + + val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! + service.replay(clock, workload, seed, interference = true) } + println( + "Scheduler " + + "Success=${monitor.attemptsSuccess} " + + "Failure=${monitor.attemptsFailure} " + + "Error=${monitor.attemptsError} " + + "Pending=${monitor.serversPending} " + + "Active=${monitor.serversActive}" + ) + // Note that these values have been verified beforehand assertAll( - { assertEquals(6028050, this@CapelinIntegrationTest.monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(14712749, this@CapelinIntegrationTest.monitor.activeTime) { "Active time incorrect" } }, - { assertEquals(12532907, this@CapelinIntegrationTest.monitor.stealTime) { "Steal time incorrect" } }, - { assertEquals(485510, this@CapelinIntegrationTest.monitor.lostTime) { "Lost time incorrect" } } + { assertEquals(6028050, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(14712749, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(12532907, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(470593, monitor.lostTime) { "Lost time incorrect" } } ) } @@ -217,41 +204,28 @@ class CapelinIntegrationTest { @Test fun testFailures() = runBlockingSimulation { val seed = 0L - val simulator = ComputeServiceHelper( - coroutineContext, - clock, - computeScheduler, - seed - ) val topology = createTopology("single") val workload = createTestWorkload(0.25, seed) - val reader = ComputeMetricReader(this, clock, simulator.service, monitor) + val monitor = monitor - try { - simulator.apply(topology) - simulator.run(workload, failureModel = grid5000(Duration.ofDays(7))) - - val serviceMetrics = simulator.service.getSchedulerStats() - println( - "Scheduler " + - "Success=${serviceMetrics.attemptsSuccess} " + - "Failure=${serviceMetrics.attemptsFailure} " + - "Error=${serviceMetrics.attemptsError} " + - "Pending=${serviceMetrics.serversPending} " + - "Active=${serviceMetrics.serversActive}" + Provisioner(coroutineContext, clock, seed).use { provisioner -> + provisioner.runSteps( + setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), + registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), + setupHosts(serviceDomain = "compute.opendc.org", topology), ) - } finally { - simulator.close() - reader.close() + + val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! + service.replay(clock, workload, seed, failureModel = grid5000(Duration.ofDays(7))) } // Note that these values have been verified beforehand assertAll( - { assertEquals(10982026, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(9740058, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(10085158, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(8539158, monitor.activeTime) { "Active time incorrect" } }, { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, - { assertEquals(2590260605, monitor.uptime) { "Uptime incorrect" } }, + { assertEquals(2328039558, monitor.uptime) { "Uptime incorrect" } }, ) } @@ -266,12 +240,26 @@ class CapelinIntegrationTest { /** * Obtain the topology factory for the test. */ - private fun createTopology(name: String = "topology"): Topology { + private fun createTopology(name: String = "topology"): List<HostSpec> { val stream = checkNotNull(object {}.javaClass.getResourceAsStream("/env/$name.txt")) return stream.use { clusterTopology(stream) } } class TestComputeMonitor : ComputeMonitor { + var attemptsSuccess = 0 + var attemptsFailure = 0 + var attemptsError = 0 + var serversPending = 0 + var serversActive = 0 + + override fun record(reader: ServiceTableReader) { + attemptsSuccess = reader.attemptsSuccess + attemptsFailure = reader.attemptsFailure + attemptsError = reader.attemptsError + serversPending = reader.serversPending + serversActive = reader.serversActive + } + var idleTime = 0L var activeTime = 0L var stealTime = 0L diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinRunnerTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinRunnerTest.kt new file mode 100644 index 00000000..2aeb9ff9 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinRunnerTest.kt @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin + +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertDoesNotThrow +import org.opendc.experiments.capelin.model.OperationalPhenomena +import org.opendc.experiments.capelin.model.Scenario +import org.opendc.experiments.capelin.model.Topology +import org.opendc.experiments.capelin.model.Workload +import org.opendc.experiments.compute.trace +import java.io.File +import java.nio.file.Files + +/** + * Test suite for [CapelinRunner]. + */ +class CapelinRunnerTest { + /** + * The path to the environments. + */ + private val envPath = File("src/test/resources/env") + + /** + * The path to the traces. + */ + private val tracePath = File("src/test/resources/trace") + + /** + * Smoke test with output. + */ + @Test + fun testSmoke() { + val outputPath = Files.createTempDirectory("output").toFile() + + try { + val runner = CapelinRunner(envPath, tracePath, outputPath) + val scenario = Scenario( + Topology("topology"), + Workload("bitbrains-small", trace("bitbrains-small")), + OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true), + "active-servers" + ) + + assertDoesNotThrow { runner.runScenario(scenario, seed = 0L) } + } finally { + outputPath.delete() + } + } + + /** + * Smoke test without output. + */ + @Test + fun testSmokeNoOutput() { + val runner = CapelinRunner(envPath, tracePath, null) + val scenario = Scenario( + Topology("topology"), + Workload("bitbrains-small", trace("bitbrains-small")), + OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true), + "active-servers" + ) + + assertDoesNotThrow { runner.runScenario(scenario, seed = 0L) } + } +} diff --git a/opendc-compute/opendc-compute-workload/build.gradle.kts b/opendc-experiments/opendc-experiments-compute/build.gradle.kts index 7b5fe6c1..5cae1d43 100644 --- a/opendc-compute/opendc-compute-workload/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-compute/build.gradle.kts @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 AtLarge Research + * Copyright (c) 2022 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -25,9 +25,13 @@ description = "Support library for simulating VM-based workloads with OpenDC" /* Build configuration */ plugins { `kotlin-library-conventions` + `testing-conventions` + `jacoco-conventions` } dependencies { + api(projects.opendcCompute.opendcComputeService) + api(projects.opendcExperiments.opendcExperimentsBase) api(projects.opendcCompute.opendcComputeSimulator) implementation(projects.opendcTrace.opendcTraceApi) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeSchedulers.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeSchedulers.kt index c94f30e4..1731a4ac 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeSchedulers.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeSchedulers.kt @@ -21,7 +21,7 @@ */ @file:JvmName("ComputeSchedulers") -package org.opendc.compute.workload +package org.opendc.experiments.compute import org.opendc.compute.service.scheduler.ComputeScheduler import org.opendc.compute.service.scheduler.FilterScheduler diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeServiceProvisioningStep.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeServiceProvisioningStep.kt new file mode 100644 index 00000000..38cbf2dc --- /dev/null +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeServiceProvisioningStep.kt @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.compute + +import org.opendc.compute.service.ComputeService +import org.opendc.compute.service.scheduler.ComputeScheduler +import org.opendc.experiments.provisioner.ProvisioningContext +import org.opendc.experiments.provisioner.ProvisioningStep +import java.time.Duration + +/** + * A [ProvisioningStep] that provisions a [ComputeService] without any hosts. + * + * @param serviceDomain The domain name under which to register the compute service. + * @param scheduler A function to construct the compute scheduler. + * @param schedulingQuantum The scheduling quantum of the compute scheduler. + */ +public class ComputeServiceProvisioningStep internal constructor( + private val serviceDomain: String, + private val scheduler: (ProvisioningContext) -> ComputeScheduler, + private val schedulingQuantum: Duration +) : ProvisioningStep { + override fun apply(ctx: ProvisioningContext): AutoCloseable { + val service = ComputeService(ctx.coroutineContext, ctx.clock, scheduler(ctx), schedulingQuantum) + ctx.registry.register(serviceDomain, ComputeService::class.java, service) + + return AutoCloseable { service.close() } + } +} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeSteps.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeSteps.kt new file mode 100644 index 00000000..3ae4b0df --- /dev/null +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeSteps.kt @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("ComputeSteps") +package org.opendc.experiments.compute + +import org.opendc.compute.service.ComputeService +import org.opendc.compute.service.scheduler.ComputeScheduler +import org.opendc.experiments.compute.telemetry.ComputeMonitor +import org.opendc.experiments.compute.telemetry.ComputeMonitorProvisioningStep +import org.opendc.experiments.compute.topology.HostSpec +import org.opendc.experiments.provisioner.ProvisioningContext +import org.opendc.experiments.provisioner.ProvisioningStep +import java.time.Duration + +/** + * Return a [ProvisioningStep] that provisions a [ComputeService] without any hosts. + * + * @param serviceDomain The domain name under which to register the compute service. + * @param scheduler A function to construct the compute scheduler. + * @param schedulingQuantum The scheduling quantum of the compute scheduler. + */ +public fun setupComputeService( + serviceDomain: String, + scheduler: (ProvisioningContext) -> ComputeScheduler, + schedulingQuantum: Duration = Duration.ofMinutes(5) +): ProvisioningStep { + return ComputeServiceProvisioningStep(serviceDomain, scheduler, schedulingQuantum) +} + +/** + * Return a [ProvisioningStep] that installs a [ComputeMetricReader] to periodically collect the metrics of a + * [ComputeService] and report them to a [ComputeMonitor]. + * + * @param serviceDomain The service domain at which the [ComputeService] is located. + * @param monitor The [ComputeMonitor] to install. + * @param exportInterval The interval between which to collect the metrics. + */ +public fun registerComputeMonitor( + serviceDomain: String, + monitor: ComputeMonitor, + exportInterval: Duration = Duration.ofMinutes(5) +): ProvisioningStep { + return ComputeMonitorProvisioningStep(serviceDomain, monitor, exportInterval) +} + +/** + * Return a [ProvisioningStep] that sets up the specified list of hosts (based on [specs]) for the specified compute + * service. + * + * @param serviceDomain The domain name under which the compute service is registered. + * @param specs A list of [HostSpec] objects describing the simulated hosts to provision. + * @param optimize A flag to indicate that the CPU resources of the host should be merged into a single CPU resource. + */ +public fun setupHosts(serviceDomain: String, specs: List<HostSpec>, optimize: Boolean = false): ProvisioningStep { + return HostsProvisioningStep(serviceDomain, specs, optimize) +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkload.kt index 78002c2f..3db980ca 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkload.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.compute.workload +package org.opendc.experiments.compute import java.util.* diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkloadLoader.kt index 387a3ec2..f92e10e3 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkloadLoader.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.compute.workload +package org.opendc.experiments.compute import mu.KotlinLogging import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkloads.kt index 2f4935ca..732f761e 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkloads.kt @@ -21,12 +21,12 @@ */ @file:JvmName("ComputeWorkloads") -package org.opendc.compute.workload +package org.opendc.experiments.compute -import org.opendc.compute.workload.internal.CompositeComputeWorkload -import org.opendc.compute.workload.internal.HpcSampledComputeWorkload -import org.opendc.compute.workload.internal.LoadSampledComputeWorkload -import org.opendc.compute.workload.internal.TraceComputeWorkload +import org.opendc.experiments.compute.internal.CompositeComputeWorkload +import org.opendc.experiments.compute.internal.HpcSampledComputeWorkload +import org.opendc.experiments.compute.internal.LoadSampledComputeWorkload +import org.opendc.experiments.compute.internal.TraceComputeWorkload /** * Construct a workload from a trace. diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModel.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModel.kt index 4d9ef15d..f96b7e16 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModel.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModel.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.compute.workload +package org.opendc.experiments.compute import org.opendc.compute.service.ComputeService import org.opendc.compute.simulator.failure.HostFaultInjector diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModels.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModels.kt index be7120b9..00bf44a1 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModels.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModels.kt @@ -21,7 +21,7 @@ */ @file:JvmName("FailureModels") -package org.opendc.compute.workload +package org.opendc.experiments.compute import org.apache.commons.math3.distribution.LogNormalDistribution import org.apache.commons.math3.random.Well19937c diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt new file mode 100644 index 00000000..28c9bc01 --- /dev/null +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.compute + +import org.opendc.compute.service.ComputeService +import org.opendc.compute.simulator.SimHost +import org.opendc.experiments.compute.topology.HostSpec +import org.opendc.experiments.provisioner.ProvisioningContext +import org.opendc.experiments.provisioner.ProvisioningStep +import org.opendc.simulator.compute.SimBareMetalMachine +import org.opendc.simulator.compute.kernel.SimHypervisor +import org.opendc.simulator.flow.FlowEngine +import java.util.* + +/** + * A [ProvisioningStep] that provisions a list of hosts for a [ComputeService]. + * + * @param serviceDomain The domain name under which the compute service is registered. + * @param specs A list of [HostSpec] objects describing the simulated hosts to provision. + * @param optimize A flag to indicate that the CPU resources of the host should be merged into a single CPU resource. + */ +public class HostsProvisioningStep internal constructor( + private val serviceDomain: String, + private val specs: List<HostSpec>, + private val optimize: Boolean +) : ProvisioningStep { + override fun apply(ctx: ProvisioningContext): AutoCloseable { + val service = requireNotNull(ctx.registry.resolve(serviceDomain, ComputeService::class.java)) { "Compute service $serviceDomain does not exist" } + val engine = FlowEngine(ctx.coroutineContext, ctx.clock) + val hosts = mutableSetOf<SimHost>() + + for (spec in specs) { + val machine = SimBareMetalMachine(engine, spec.model, spec.powerDriver) + val hypervisor = SimHypervisor(engine, spec.multiplexerFactory, SplittableRandom(ctx.seeder.nextLong())) + + val host = SimHost( + spec.uid, + spec.name, + spec.meta, + ctx.coroutineContext, + ctx.clock, + machine, + hypervisor, + optimize = optimize + ) + + require(hosts.add(host)) { "Host with uid ${spec.uid} already exists" } + service.addHost(host) + } + + return AutoCloseable { + for (host in hosts) { + host.close() + } + } + } +} diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt new file mode 100644 index 00000000..0df9305a --- /dev/null +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt @@ -0,0 +1,117 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("TraceHelpers") +package org.opendc.experiments.compute + +import kotlinx.coroutines.* +import org.opendc.compute.service.ComputeService +import org.opendc.simulator.compute.workload.SimTraceWorkload +import java.time.Clock +import java.util.* +import kotlin.coroutines.coroutineContext +import kotlin.math.max + +/** + * Helper method to replay the specified list of [VirtualMachine] and suspend execution util all VMs have finished. + * + * @param clock The simulation clock. + * @param trace The trace to simulate. + * @param seed The seed to use for randomness. + * @param submitImmediately A flag to indicate that the servers are scheduled immediately (so not at their start time). + * @param failureModel A failure model to use for injecting failures. + * @param interference A flag to indicate that VM interference needs to be enabled. + */ +public suspend fun ComputeService.replay( + clock: Clock, + trace: List<VirtualMachine>, + seed: Long, + submitImmediately: Boolean = false, + failureModel: FailureModel? = null, + interference: Boolean = false +) { + val injector = failureModel?.createInjector(coroutineContext, clock, this, Random(seed)) + val client = newClient() + + // Create new image for the virtual machine + val image = client.newImage("vm-image") + + try { + coroutineScope { + // Start the fault injector + injector?.start() + + var offset = Long.MIN_VALUE + + for (entry in trace.sortedBy { it.startTime }) { + val now = clock.millis() + val start = entry.startTime.toEpochMilli() + + if (offset < 0) { + offset = start - now + } + + // Make sure the trace entries are ordered by submission time + assert(start - offset >= 0) { "Invalid trace order" } + + if (!submitImmediately) { + delay(max(0, (start - offset) - now)) + } + + val workloadOffset = -offset + 300001 + val workload = SimTraceWorkload(entry.trace, workloadOffset) + val meta = mutableMapOf<String, Any>("workload" to workload) + + val interferenceProfile = entry.interferenceProfile + if (interference && interferenceProfile != null) { + meta["interference-profile"] = interferenceProfile + } + + launch { + val server = client.newServer( + entry.name, + image, + client.newFlavor( + entry.name, + entry.cpuCount, + entry.memCapacity, + meta = if (entry.cpuCapacity > 0.0) mapOf("cpu-capacity" to entry.cpuCapacity) else emptyMap() + ), + meta = meta + ) + + // Wait for the server reach its end time + val endTime = entry.stopTime.toEpochMilli() + delay(endTime + workloadOffset - clock.millis() + 5 * 60 * 1000) + + // Stop the server after reaching the end-time of the virtual machine + server.stop() + } + } + } + + yield() + } finally { + injector?.close() + client.close() + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/VirtualMachine.kt index 8560b537..3ed497a0 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/VirtualMachine.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.compute.workload +package org.opendc.experiments.compute import org.opendc.simulator.compute.kernel.interference.VmInterferenceProfile import org.opendc.simulator.compute.workload.SimTrace diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMonitor.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetComputeMonitor.kt index af4dad44..a104851f 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMonitor.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetComputeMonitor.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 AtLarge Research + * Copyright (c) 2022 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,12 +20,12 @@ * SOFTWARE. */ -package org.opendc.compute.workload.export.parquet +package org.opendc.experiments.compute.export.parquet -import org.opendc.compute.workload.telemetry.ComputeMonitor -import org.opendc.compute.workload.telemetry.table.HostTableReader -import org.opendc.compute.workload.telemetry.table.ServerTableReader -import org.opendc.compute.workload.telemetry.table.ServiceTableReader +import org.opendc.experiments.compute.telemetry.ComputeMonitor +import org.opendc.experiments.compute.telemetry.table.HostTableReader +import org.opendc.experiments.compute.telemetry.table.ServerTableReader +import org.opendc.experiments.compute.telemetry.table.ServiceTableReader import java.io.File /** diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetDataWriter.kt index c854d874..60629a95 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetDataWriter.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.compute.workload.export.parquet +package org.opendc.experiments.compute.export.parquet import mu.KotlinLogging import org.apache.parquet.column.ParquetProperties diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetHostDataWriter.kt index e6e7e42d..cf0a3bf2 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetHostDataWriter.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 AtLarge Research + * Copyright (c) 2022 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,14 +20,14 @@ * SOFTWARE. */ -package org.opendc.compute.workload.export.parquet +package org.opendc.experiments.compute.export.parquet import org.apache.hadoop.conf.Configuration import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.api.WriteSupport import org.apache.parquet.io.api.RecordConsumer import org.apache.parquet.schema.* -import org.opendc.compute.workload.telemetry.table.HostTableReader +import org.opendc.experiments.compute.telemetry.table.HostTableReader import org.opendc.trace.util.parquet.LocalParquetWriter import java.io.File import java.util.* diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetServerDataWriter.kt index 082c7c88..1622289e 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetServerDataWriter.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 AtLarge Research + * Copyright (c) 2022 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,14 +20,14 @@ * SOFTWARE. */ -package org.opendc.compute.workload.export.parquet +package org.opendc.experiments.compute.export.parquet import org.apache.hadoop.conf.Configuration import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.api.WriteSupport import org.apache.parquet.io.api.RecordConsumer import org.apache.parquet.schema.* -import org.opendc.compute.workload.telemetry.table.ServerTableReader +import org.opendc.experiments.compute.telemetry.table.ServerTableReader import org.opendc.trace.util.parquet.LocalParquetWriter import java.io.File import java.util.* diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetServiceDataWriter.kt index 2a0fdca1..0c466d39 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetServiceDataWriter.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 AtLarge Research + * Copyright (c) 2022 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,13 +20,13 @@ * SOFTWARE. */ -package org.opendc.compute.workload.export.parquet +package org.opendc.experiments.compute.export.parquet import org.apache.hadoop.conf.Configuration import org.apache.parquet.hadoop.api.WriteSupport import org.apache.parquet.io.api.RecordConsumer import org.apache.parquet.schema.* -import org.opendc.compute.workload.telemetry.table.ServiceTableReader +import org.opendc.experiments.compute.telemetry.table.ServiceTableReader import java.io.File /** diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/Utils.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/Utils.kt index 050e0f0a..a3f2d597 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/Utils.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/Utils.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.compute.workload.export.parquet +package org.opendc.experiments.compute.export.parquet import org.apache.parquet.io.api.Binary import java.nio.ByteBuffer diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/CompositeComputeWorkload.kt index 9b2bec55..75779088 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/CompositeComputeWorkload.kt @@ -20,12 +20,12 @@ * SOFTWARE. */ -package org.opendc.compute.workload.internal +package org.opendc.experiments.compute.internal import mu.KotlinLogging -import org.opendc.compute.workload.ComputeWorkload -import org.opendc.compute.workload.ComputeWorkloadLoader -import org.opendc.compute.workload.VirtualMachine +import org.opendc.experiments.compute.ComputeWorkload +import org.opendc.experiments.compute.ComputeWorkloadLoader +import org.opendc.experiments.compute.VirtualMachine import java.util.* /** diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/HpcSampledComputeWorkload.kt index 52f4c672..23efb154 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/HpcSampledComputeWorkload.kt @@ -20,12 +20,12 @@ * SOFTWARE. */ -package org.opendc.compute.workload.internal +package org.opendc.experiments.compute.internal import mu.KotlinLogging -import org.opendc.compute.workload.ComputeWorkload -import org.opendc.compute.workload.ComputeWorkloadLoader -import org.opendc.compute.workload.VirtualMachine +import org.opendc.experiments.compute.ComputeWorkload +import org.opendc.experiments.compute.ComputeWorkloadLoader +import org.opendc.experiments.compute.VirtualMachine import java.util.* /** diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/LoadSampledComputeWorkload.kt index ef6de729..4663c59e 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/LoadSampledComputeWorkload.kt @@ -20,12 +20,12 @@ * SOFTWARE. */ -package org.opendc.compute.workload.internal +package org.opendc.experiments.compute.internal import mu.KotlinLogging -import org.opendc.compute.workload.ComputeWorkload -import org.opendc.compute.workload.ComputeWorkloadLoader -import org.opendc.compute.workload.VirtualMachine +import org.opendc.experiments.compute.ComputeWorkload +import org.opendc.experiments.compute.ComputeWorkloadLoader +import org.opendc.experiments.compute.VirtualMachine import java.util.* /** diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/TraceComputeWorkload.kt index c20cb8f3..1cfee3bd 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/TraceComputeWorkload.kt @@ -20,11 +20,11 @@ * SOFTWARE. */ -package org.opendc.compute.workload.internal +package org.opendc.experiments.compute.internal -import org.opendc.compute.workload.ComputeWorkload -import org.opendc.compute.workload.ComputeWorkloadLoader -import org.opendc.compute.workload.VirtualMachine +import org.opendc.experiments.compute.ComputeWorkload +import org.opendc.experiments.compute.ComputeWorkloadLoader +import org.opendc.experiments.compute.VirtualMachine import java.util.* /** diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMetricReader.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt index a0ec4bd6..088f98e9 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMetricReader.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.compute.workload.telemetry +package org.opendc.experiments.compute.telemetry import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.delay @@ -30,7 +30,7 @@ import mu.KotlinLogging import org.opendc.compute.api.Server import org.opendc.compute.service.ComputeService import org.opendc.compute.service.driver.Host -import org.opendc.compute.workload.telemetry.table.* +import org.opendc.experiments.compute.telemetry.table.* import java.time.Clock import java.time.Duration import java.time.Instant diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMonitor.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMonitor.kt index 36a2079a..ff36bef3 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMonitor.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMonitor.kt @@ -20,11 +20,11 @@ * SOFTWARE. */ -package org.opendc.compute.workload.telemetry +package org.opendc.experiments.compute.telemetry -import org.opendc.compute.workload.telemetry.table.HostTableReader -import org.opendc.compute.workload.telemetry.table.ServerTableReader -import org.opendc.compute.workload.telemetry.table.ServiceTableReader +import org.opendc.experiments.compute.telemetry.table.HostTableReader +import org.opendc.experiments.compute.telemetry.table.ServerTableReader +import org.opendc.experiments.compute.telemetry.table.ServiceTableReader /** * A monitor that tracks the metrics and events of the OpenDC Compute service. diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMonitorProvisioningStep.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMonitorProvisioningStep.kt new file mode 100644 index 00000000..68ca5ae8 --- /dev/null +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMonitorProvisioningStep.kt @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.compute.telemetry + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.cancel +import org.opendc.compute.service.ComputeService +import org.opendc.experiments.provisioner.ProvisioningContext +import org.opendc.experiments.provisioner.ProvisioningStep +import java.time.Duration + +/** + * A [ProvisioningStep] that provisions a [ComputeMetricReader] to periodically collect the metrics of a [ComputeService] + * and report them to a [ComputeMonitor]. + */ +public class ComputeMonitorProvisioningStep internal constructor( + private val serviceDomain: String, + private val monitor: ComputeMonitor, + private val exportInterval: Duration +) : ProvisioningStep { + override fun apply(ctx: ProvisioningContext): AutoCloseable { + val scope = CoroutineScope(ctx.coroutineContext + Job()) + val service = requireNotNull(ctx.registry.resolve(serviceDomain, ComputeService::class.java)) { "Compute service $serviceDomain does not exist" } + val metricReader = ComputeMetricReader(scope, ctx.clock, service, monitor, exportInterval) + + return AutoCloseable { + metricReader.close() + scope.cancel() + } + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/HostInfo.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/HostInfo.kt index 5d383e40..84dd7a4f 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/HostInfo.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/HostInfo.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.compute.workload.telemetry.table +package org.opendc.experiments.compute.telemetry.table /** * Information about a host exposed to the telemetry service. diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/HostTableReader.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/HostTableReader.kt index 8f6f0d01..e6953550 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/HostTableReader.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/HostTableReader.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.compute.workload.telemetry.table +package org.opendc.experiments.compute.telemetry.table import java.time.Instant diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServerInfo.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServerInfo.kt index 111135b7..fc360fee 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServerInfo.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServerInfo.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.compute.workload.telemetry.table +package org.opendc.experiments.compute.telemetry.table /** * Static information about a server exposed to the telemetry service. diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServerTableReader.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServerTableReader.kt index bccccd01..c4e2fb4c 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServerTableReader.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServerTableReader.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.compute.workload.telemetry.table +package org.opendc.experiments.compute.telemetry.table import java.time.Instant diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServiceData.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServiceData.kt index a1df6ea7..394c6bd6 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServiceData.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServiceData.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.compute.workload.telemetry.table +package org.opendc.experiments.compute.telemetry.table import java.time.Instant diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServiceTableReader.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServiceTableReader.kt index 4211ab15..0155a879 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServiceTableReader.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServiceTableReader.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.compute.workload.telemetry.table +package org.opendc.experiments.compute.telemetry.table import java.time.Instant diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/HostSpec.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/topology/HostSpec.kt index 87530f5a..8ade963a 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/HostSpec.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/topology/HostSpec.kt @@ -20,10 +20,12 @@ * SOFTWARE. */ -package org.opendc.compute.workload.topology +package org.opendc.experiments.compute.topology import org.opendc.simulator.compute.model.MachineModel +import org.opendc.simulator.compute.power.LinearPowerModel import org.opendc.simulator.compute.power.PowerDriver +import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.flow.mux.FlowMultiplexerFactory import java.util.* @@ -42,6 +44,6 @@ public data class HostSpec( val name: String, val meta: Map<String, Any>, val model: MachineModel, - val powerDriver: PowerDriver, + val powerDriver: PowerDriver = SimplePowerDriver(LinearPowerModel(350.0, idlePower = 200.0)), val multiplexerFactory: FlowMultiplexerFactory = FlowMultiplexerFactory.maxMinMultiplexer() ) diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/HostDataWriterTest.kt b/opendc-experiments/opendc-experiments-compute/src/test/kotlin/org/opendc/experiments/compute/export/parquet/HostDataWriterTest.kt index 4344bb08..52b94324 100644 --- a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/HostDataWriterTest.kt +++ b/opendc-experiments/opendc-experiments-compute/src/test/kotlin/org/opendc/experiments/compute/export/parquet/HostDataWriterTest.kt @@ -20,13 +20,13 @@ * SOFTWARE. */ -package org.opendc.compute.workload.export.parquet +package org.opendc.experiments.compute.export.parquet import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow -import org.opendc.compute.workload.telemetry.table.HostInfo -import org.opendc.compute.workload.telemetry.table.HostTableReader +import org.opendc.experiments.compute.telemetry.table.HostInfo +import org.opendc.experiments.compute.telemetry.table.HostTableReader import java.nio.file.Files import java.time.Instant diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServerDataWriterTest.kt b/opendc-experiments/opendc-experiments-compute/src/test/kotlin/org/opendc/experiments/compute/export/parquet/ServerDataWriterTest.kt index 8465871d..0ba93173 100644 --- a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServerDataWriterTest.kt +++ b/opendc-experiments/opendc-experiments-compute/src/test/kotlin/org/opendc/experiments/compute/export/parquet/ServerDataWriterTest.kt @@ -20,14 +20,14 @@ * SOFTWARE. */ -package org.opendc.compute.workload.export.parquet +package org.opendc.experiments.compute.export.parquet import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow -import org.opendc.compute.workload.telemetry.table.HostInfo -import org.opendc.compute.workload.telemetry.table.ServerInfo -import org.opendc.compute.workload.telemetry.table.ServerTableReader +import org.opendc.experiments.compute.telemetry.table.HostInfo +import org.opendc.experiments.compute.telemetry.table.ServerInfo +import org.opendc.experiments.compute.telemetry.table.ServerTableReader import java.nio.file.Files import java.time.Instant diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServiceDataWriterTest.kt b/opendc-experiments/opendc-experiments-compute/src/test/kotlin/org/opendc/experiments/compute/export/parquet/ServiceDataWriterTest.kt index d91982bc..20301185 100644 --- a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServiceDataWriterTest.kt +++ b/opendc-experiments/opendc-experiments-compute/src/test/kotlin/org/opendc/experiments/compute/export/parquet/ServiceDataWriterTest.kt @@ -20,12 +20,12 @@ * SOFTWARE. */ -package org.opendc.compute.workload.export.parquet +package org.opendc.experiments.compute.export.parquet import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow -import org.opendc.compute.workload.telemetry.table.ServiceTableReader +import org.opendc.experiments.compute.telemetry.table.ServiceTableReader import java.nio.file.Files import java.time.Instant diff --git a/opendc-faas/opendc-faas-workload/build.gradle.kts b/opendc-experiments/opendc-experiments-faas/build.gradle.kts index 37c74d7e..8230c74d 100644 --- a/opendc-faas/opendc-faas-workload/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-faas/build.gradle.kts @@ -25,9 +25,12 @@ description = "Support library for simulating FaaS workloads with OpenDC" /* Build configuration */ plugins { `kotlin-library-conventions` + `testing-conventions` + `jacoco-conventions` } dependencies { + api(projects.opendcExperiments.opendcExperimentsBase) api(projects.opendcFaas.opendcFaasSimulator) implementation(libs.kotlin.logging) diff --git a/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FaaSServiceProvisioningStep.kt b/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FaaSServiceProvisioningStep.kt new file mode 100644 index 00000000..d977042e --- /dev/null +++ b/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FaaSServiceProvisioningStep.kt @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.faas + +import org.opendc.experiments.provisioner.ProvisioningContext +import org.opendc.experiments.provisioner.ProvisioningStep +import org.opendc.faas.service.FaaSService +import org.opendc.faas.service.autoscaler.FunctionTerminationPolicy +import org.opendc.faas.service.router.RoutingPolicy +import org.opendc.faas.simulator.SimFunctionDeployer +import org.opendc.faas.simulator.delay.ColdStartModel +import org.opendc.faas.simulator.delay.StochasticDelayInjector +import org.opendc.faas.simulator.delay.ZeroDelayInjector +import org.opendc.simulator.compute.model.MachineModel +import java.util.* + +/** + * A [ProvisioningStep] implementation for a [FaaSService]. + * + * @param serviceDomain The domain name under which to register the compute service. + * @param routingPolicy The routing policy to use. + * @param terminationPolicy The function termination policy to use. + * @param machineModel The [MachineModel] that models the physical machine on which the functions run. + * @param coldStartModel The cold start models to test. + */ +public class FaaSServiceProvisioningStep internal constructor( + private val serviceDomain: String, + private val routingPolicy: (ProvisioningContext) -> RoutingPolicy, + private val terminationPolicy: (ProvisioningContext) -> FunctionTerminationPolicy, + private val machineModel: MachineModel, + private val coldStartModel: ColdStartModel? +) : ProvisioningStep { + override fun apply(ctx: ProvisioningContext): AutoCloseable { + val delayInjector = if (coldStartModel != null) + StochasticDelayInjector(coldStartModel, Random(ctx.seeder.nextLong())) + else + ZeroDelayInjector + val deployer = SimFunctionDeployer(ctx.coroutineContext, ctx.clock, machineModel, delayInjector) + val service = FaaSService( + ctx.coroutineContext, + ctx.clock, + deployer, + routingPolicy(ctx), + terminationPolicy(ctx) + ) + + ctx.registry.register(serviceDomain, FaaSService::class.java, service) + + return AutoCloseable { service.close() } + } +} diff --git a/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FaaSSteps.kt b/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FaaSSteps.kt new file mode 100644 index 00000000..40e5627b --- /dev/null +++ b/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FaaSSteps.kt @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("FaaSSteps") +package org.opendc.experiments.faas + +import org.opendc.experiments.provisioner.ProvisioningContext +import org.opendc.experiments.provisioner.ProvisioningStep +import org.opendc.faas.service.FaaSService +import org.opendc.faas.service.autoscaler.FunctionTerminationPolicy +import org.opendc.faas.service.router.RoutingPolicy +import org.opendc.faas.simulator.delay.ColdStartModel +import org.opendc.simulator.compute.model.MachineModel + +/** + * Return a [ProvisioningStep] that sets up a [FaaSService]. + * + * @param serviceDomain The domain name under which to register the compute service. + * @param routingPolicy The routing policy to use. + * @param terminationPolicy The function termination policy to use. + * @param machineModel The [MachineModel] that models the physical machine on which the functions run. + * @param coldStartModel The cold start models to test. + */ +public fun setupFaaSService( + serviceDomain: String, + routingPolicy: (ProvisioningContext) -> RoutingPolicy, + terminationPolicy: (ProvisioningContext) -> FunctionTerminationPolicy, + machineModel: MachineModel, + coldStartModel: ColdStartModel? = null +): ProvisioningStep { + return FaaSServiceProvisioningStep(serviceDomain, routingPolicy, terminationPolicy, machineModel, coldStartModel) +} diff --git a/opendc-faas/opendc-faas-workload/src/main/kotlin/org/opendc/faas/workload/FunctionSample.kt b/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FunctionSample.kt index 418f895d..4ce2b136 100644 --- a/opendc-faas/opendc-faas-workload/src/main/kotlin/org/opendc/faas/workload/FunctionSample.kt +++ b/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FunctionSample.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.faas.workload +package org.opendc.experiments.faas /** * A sample of a single function. diff --git a/opendc-faas/opendc-faas-workload/src/main/kotlin/org/opendc/faas/workload/FunctionTrace.kt b/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FunctionTrace.kt index 712267e5..5268811c 100644 --- a/opendc-faas/opendc-faas-workload/src/main/kotlin/org/opendc/faas/workload/FunctionTrace.kt +++ b/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FunctionTrace.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.faas.workload +package org.opendc.experiments.faas /** * A trace for a single function diff --git a/opendc-faas/opendc-faas-workload/src/main/kotlin/org/opendc/faas/workload/FunctionTraceWorkload.kt b/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FunctionTraceWorkload.kt index cdb800c3..90e76dac 100644 --- a/opendc-faas/opendc-faas-workload/src/main/kotlin/org/opendc/faas/workload/FunctionTraceWorkload.kt +++ b/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FunctionTraceWorkload.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.faas.workload +package org.opendc.experiments.faas import org.opendc.faas.simulator.workload.SimFaaSWorkload import org.opendc.simulator.compute.workload.SimTrace diff --git a/opendc-faas/opendc-faas-workload/src/main/kotlin/org/opendc/faas/workload/ServerlessTraceReader.kt b/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/ServerlessTraceReader.kt index 3694cf30..7b6b3ef7 100644 --- a/opendc-faas/opendc-faas-workload/src/main/kotlin/org/opendc/faas/workload/ServerlessTraceReader.kt +++ b/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/ServerlessTraceReader.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.faas.workload +package org.opendc.experiments.faas import com.fasterxml.jackson.core.JsonToken import com.fasterxml.jackson.dataformat.csv.CsvFactory diff --git a/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/TraceHelpers.kt b/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/TraceHelpers.kt new file mode 100644 index 00000000..cf278606 --- /dev/null +++ b/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/TraceHelpers.kt @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("TraceHelpers") +package org.opendc.experiments.faas + +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import org.opendc.faas.service.FaaSService +import java.time.Clock +import kotlin.math.max + +/** + * Run a simulation of the [FaaSService] by replaying the workload trace given by [trace]. + * + * @param clock A [Clock] instance tracking simulation time. + * @param trace The trace to simulate. + */ +public suspend fun FaaSService.replay(clock: Clock, trace: List<FunctionTrace>) { + val client = newClient() + try { + coroutineScope { + for (entry in trace) { + launch { + val workload = FunctionTraceWorkload(entry) + val function = client.newFunction(entry.id, entry.maxMemory.toLong(), meta = mapOf("workload" to workload)) + + var offset = Long.MIN_VALUE + + for (sample in entry.samples) { + if (sample.invocations == 0) { + continue + } + + if (offset < 0) { + offset = sample.timestamp - clock.millis() + } + + delay(max(0, (sample.timestamp - offset) - clock.millis())) + + repeat(sample.invocations) { + function.invoke() + } + } + } + } + } + } finally { + client.close() + } +} diff --git a/opendc-faas/opendc-faas-workload/src/test/kotlin/org/opendc/faas/workload/FaaSServiceHelperTest.kt b/opendc-experiments/opendc-experiments-faas/src/test/kotlin/org/opendc/experiments/faas/FaaSExperiment.kt index dbe024c0..98328d3e 100644 --- a/opendc-faas/opendc-faas-workload/src/test/kotlin/org/opendc/faas/workload/FaaSServiceHelperTest.kt +++ b/opendc-experiments/opendc-experiments-faas/src/test/kotlin/org/opendc/experiments/faas/FaaSExperiment.kt @@ -20,11 +20,13 @@ * SOFTWARE. */ -package org.opendc.faas.workload +package org.opendc.experiments.faas +import org.junit.jupiter.api.Assertions.assertAll import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertAll +import org.opendc.experiments.provisioner.Provisioner +import org.opendc.faas.service.FaaSService import org.opendc.faas.service.autoscaler.FunctionTerminationPolicyFixed import org.opendc.faas.service.router.RandomRoutingPolicy import org.opendc.faas.simulator.delay.ColdStartModel @@ -37,37 +39,40 @@ import java.io.File import java.time.Duration /** - * Integration test suite for the [FaaSServiceHelper] class. + * Integration test to demonstrate a FaaS experiment. */ -class FaaSServiceHelperTest { +class FaaSExperiment { /** * Smoke test that simulates a small trace. */ @Test fun testSmoke() = runBlockingSimulation { - val trace = ServerlessTraceReader().parse(File("src/test/resources/trace")) - val runner = FaaSServiceHelper( - coroutineContext, - clock, - createMachineModel(), - RandomRoutingPolicy(), - FunctionTerminationPolicyFixed(coroutineContext, clock, timeout = Duration.ofMinutes(10)), - coldStartModel = ColdStartModel.GOOGLE - ) + val faasService = "faas.opendc.org" - try { - runner.run(trace) - } finally { - runner.close() - } + Provisioner(coroutineContext, clock, seed = 0L).use { provisioner -> + provisioner.runStep( + setupFaaSService( + faasService, + { RandomRoutingPolicy() }, + { FunctionTerminationPolicyFixed(it.coroutineContext, it.clock, timeout = Duration.ofMinutes(10)) }, + createMachineModel(), + coldStartModel = ColdStartModel.GOOGLE + ) + ) - val stats = runner.service.getSchedulerStats() + val service = provisioner.registry.resolve(faasService, FaaSService::class.java)!! - assertAll( - { assertEquals(14, stats.totalInvocations) }, - { assertEquals(2, stats.timelyInvocations) }, - { assertEquals(12, stats.delayedInvocations) }, - ) + val trace = ServerlessTraceReader().parse(File("src/test/resources/trace")) + service.replay(clock, trace) + + val stats = service.getSchedulerStats() + + assertAll( + { assertEquals(14, stats.totalInvocations) }, + { assertEquals(2, stats.timelyInvocations) }, + { assertEquals(12, stats.delayedInvocations) }, + ) + } } /** diff --git a/opendc-faas/opendc-faas-workload/src/test/kotlin/org/opendc/faas/workload/ServerlessTraceReaderTest.kt b/opendc-experiments/opendc-experiments-faas/src/test/kotlin/org/opendc/experiments/faas/ServerlessTraceReaderTest.kt index e835d4d9..54071791 100644 --- a/opendc-faas/opendc-faas-workload/src/test/kotlin/org/opendc/faas/workload/ServerlessTraceReaderTest.kt +++ b/opendc-experiments/opendc-experiments-faas/src/test/kotlin/org/opendc/experiments/faas/ServerlessTraceReaderTest.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.faas.workload +package org.opendc.experiments.faas import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test diff --git a/opendc-faas/opendc-faas-workload/src/test/resources/trace/1.csv b/opendc-experiments/opendc-experiments-faas/src/test/resources/trace/1.csv index 03a10d07..03a10d07 100644 --- a/opendc-faas/opendc-faas-workload/src/test/resources/trace/1.csv +++ b/opendc-experiments/opendc-experiments-faas/src/test/resources/trace/1.csv diff --git a/opendc-faas/opendc-faas-workload/src/test/resources/trace/10.csv b/opendc-experiments/opendc-experiments-faas/src/test/resources/trace/10.csv index 0046b0e5..0046b0e5 100644 --- a/opendc-faas/opendc-faas-workload/src/test/resources/trace/10.csv +++ b/opendc-experiments/opendc-experiments-faas/src/test/resources/trace/10.csv diff --git a/opendc-experiments/opendc-experiments-workflow/build.gradle.kts b/opendc-experiments/opendc-experiments-workflow/build.gradle.kts new file mode 100644 index 00000000..4fc34d2d --- /dev/null +++ b/opendc-experiments/opendc-experiments-workflow/build.gradle.kts @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "Support library for simulating workflows with OpenDC" + +/* Build configuration */ +plugins { + `kotlin-library-conventions` + `testing-conventions` + `jacoco-conventions` +} + +dependencies { + api(projects.opendcExperiments.opendcExperimentsBase) + api(projects.opendcWorkflow.opendcWorkflowApi) + + implementation(libs.kotlinx.coroutines) + implementation(projects.opendcCompute.opendcComputeService) + implementation(projects.opendcWorkflow.opendcWorkflowService) + implementation(projects.opendcSimulator.opendcSimulatorCompute) + implementation(projects.opendcTrace.opendcTraceApi) +} diff --git a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/TraceHelpers.kt b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt index 5f57723b..a15d3d5b 100644 --- a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/TraceHelpers.kt +++ b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 AtLarge Research + * Copyright (c) 2022 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -21,8 +21,11 @@ */ @file:JvmName("TraceHelpers") -package org.opendc.workflow.workload +package org.opendc.experiments.workflow +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch import org.opendc.simulator.compute.workload.SimFlopsWorkload import org.opendc.trace.* import org.opendc.trace.conv.* @@ -30,6 +33,8 @@ import org.opendc.workflow.api.Job import org.opendc.workflow.api.Task import org.opendc.workflow.api.WORKFLOW_TASK_CORES import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE +import org.opendc.workflow.service.WorkflowService +import java.time.Clock import java.util.* import kotlin.collections.HashMap import kotlin.collections.HashSet @@ -92,3 +97,34 @@ public fun Trace.toJobs(): List<Job> { return jobs.values.toList() } + +/** + * Helper method to replay the specified list of [jobs] and suspend execution util all jobs have finished. + */ +public suspend fun WorkflowService.replay(clock: Clock, jobs: List<Job>) { + // Sort jobs by their arrival time + val orderedJobs = jobs.sortedBy { it.metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long } + if (orderedJobs.isEmpty()) { + return + } + + // Wait until the trace is started + val startTime = orderedJobs[0].metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long + var offset = 0L + + if (startTime != Long.MAX_VALUE) { + offset = startTime - clock.millis() + delay(offset.coerceAtLeast(0)) + } + + coroutineScope { + for (job in orderedJobs) { + val submitTime = job.metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long + if (submitTime != Long.MAX_VALUE) { + delay(((submitTime - offset) - clock.millis()).coerceAtLeast(0)) + } + + launch { invoke(job) } + } + } +} diff --git a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowSchedulerSpec.kt b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSchedulerSpec.kt index d6a375b6..cb8056a7 100644 --- a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowSchedulerSpec.kt +++ b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSchedulerSpec.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 AtLarge Research + * Copyright (c) 2022 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.workflow.workload +package org.opendc.experiments.workflow import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy import org.opendc.workflow.service.scheduler.job.JobOrderPolicy diff --git a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt new file mode 100644 index 00000000..a2d6a172 --- /dev/null +++ b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.workflow + +import org.opendc.compute.service.ComputeService +import org.opendc.experiments.provisioner.ProvisioningContext +import org.opendc.experiments.provisioner.ProvisioningStep +import org.opendc.workflow.service.WorkflowService +import java.time.Duration + +/** + * A [ProvisioningStep] that provisions a [WorkflowService]. + * + * @param serviceDomain The domain name under which to register the workflow service. + * @param computeService The domain name where the underlying compute service is located. + * @param scheduler The configuration of the scheduler of the workflow engine. + * @param schedulingQuantum The scheduling quantum of the compute scheduler. + */ +public class WorkflowServiceProvisioningStep internal constructor( + private val serviceDomain: String, + private val computeService: String, + private val scheduler: WorkflowSchedulerSpec, + private val schedulingQuantum: Duration +) : ProvisioningStep { + override fun apply(ctx: ProvisioningContext): AutoCloseable { + val computeService = requireNotNull(ctx.registry.resolve(computeService, ComputeService::class.java)) { "Compute service $computeService does not exist" } + + val client = computeService.newClient() + val service = WorkflowService( + ctx.coroutineContext, + ctx.clock, + client, + scheduler.schedulingQuantum, + jobAdmissionPolicy = scheduler.jobAdmissionPolicy, + jobOrderPolicy = scheduler.jobOrderPolicy, + taskEligibilityPolicy = scheduler.taskEligibilityPolicy, + taskOrderPolicy = scheduler.taskOrderPolicy, + ) + ctx.registry.register(serviceDomain, WorkflowService::class.java, service) + + return AutoCloseable { + service.close() + client.close() + } + } +} diff --git a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSteps.kt b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSteps.kt new file mode 100644 index 00000000..7aae3a9f --- /dev/null +++ b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSteps.kt @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("WorkflowSteps") +package org.opendc.experiments.workflow + +import org.opendc.experiments.provisioner.ProvisioningStep +import org.opendc.workflow.service.WorkflowService +import java.time.Duration + +/** + * Return a [ProvisioningStep] that sets up a [WorkflowService]. + */ +public fun setupWorkflowService( + serviceDomain: String, + computeService: String, + scheduler: WorkflowSchedulerSpec, + schedulingQuantum: Duration = Duration.ofMinutes(5) +): ProvisioningStep { + return WorkflowServiceProvisioningStep(serviceDomain, computeService, scheduler, schedulingQuantum) +} diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionDeployer.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionDeployer.kt index 049f1cc7..18d16d06 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionDeployer.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionDeployer.kt @@ -28,7 +28,7 @@ import org.opendc.faas.service.FunctionObject * A [FunctionDeployer] is responsible for ensuring that an instance of an arbitrary function, a [FunctionInstance], * is deployed. * - * The function deployer should combines the configuration stored in the function registry, the parameters supplied by + * The function deployer should combine the configuration stored in the function registry, the parameters supplied by * the requester, and other factors into a decision of how the function should be deployed, including how many and * what kind of resources it should receive. * diff --git a/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt b/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt index a3d0d34e..22131b13 100644 --- a/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt +++ b/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt @@ -31,6 +31,7 @@ import org.opendc.faas.service.deployer.FunctionInstanceListener import org.opendc.faas.service.deployer.FunctionInstanceState import org.opendc.faas.simulator.delay.DelayInjector import org.opendc.faas.simulator.workload.SimFaaSWorkloadMapper +import org.opendc.faas.simulator.workload.SimMetaFaaSWorkloadMapper import org.opendc.simulator.compute.SimBareMetalMachine import org.opendc.simulator.compute.SimMachine import org.opendc.simulator.compute.model.MachineModel @@ -41,6 +42,7 @@ import org.opendc.simulator.flow.FlowEngine import java.time.Clock import java.util.ArrayDeque import kotlin.coroutines.Continuation +import kotlin.coroutines.CoroutineContext import kotlin.coroutines.resume import kotlin.coroutines.resumeWithException @@ -48,12 +50,16 @@ import kotlin.coroutines.resumeWithException * A [FunctionDeployer] that uses that simulates the [FunctionInstance]s. */ public class SimFunctionDeployer( + context: CoroutineContext, private val clock: Clock, - private val scope: CoroutineScope, private val model: MachineModel, private val delayInjector: DelayInjector, - private val mapper: SimFaaSWorkloadMapper -) : FunctionDeployer { + private val mapper: SimFaaSWorkloadMapper = SimMetaFaaSWorkloadMapper() +) : FunctionDeployer, AutoCloseable { + /** + * The [CoroutineScope] of this deployer. + */ + private val scope = CoroutineScope(context + Job()) override fun deploy(function: FunctionObject, listener: FunctionInstanceListener): Instance { val instance = Instance(function, listener) @@ -172,6 +178,10 @@ public class SimFunctionDeployer( } } + override fun close() { + scope.cancel() + } + /** * A function invocation request. */ diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/TopologyHelpers.kt b/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/workload/SimMetaFaaSWorkloadMapper.kt index de4300c7..8da8bd19 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/TopologyHelpers.kt +++ b/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/workload/SimMetaFaaSWorkloadMapper.kt @@ -20,17 +20,15 @@ * SOFTWARE. */ -@file:JvmName("TopologyHelpers") -package org.opendc.compute.workload.topology +package org.opendc.faas.simulator.workload -import org.opendc.compute.workload.ComputeServiceHelper +import org.opendc.faas.service.FunctionObject /** - * Apply the specified [topology] to the given [ComputeServiceHelper]. + * A [SimFaaSWorkloadMapper] that maps a [FunctionObject] to a workload via the meta-data. */ -public fun ComputeServiceHelper.apply(topology: Topology, optimize: Boolean = false) { - val hosts = topology.resolve() - for (spec in hosts) { - registerHost(spec, optimize) +public class SimMetaFaaSWorkloadMapper(private val key: String = "workload") : SimFaaSWorkloadMapper { + override fun createWorkload(function: FunctionObject): SimFaaSWorkload { + return requireNotNull(function.meta[key]) as SimFaaSWorkload } } diff --git a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt index d528558c..5b730089 100644 --- a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt +++ b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt @@ -24,7 +24,6 @@ package org.opendc.faas.simulator import io.mockk.coVerify import io.mockk.spyk -import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.delay import kotlinx.coroutines.yield import org.junit.jupiter.api.Assertions.assertEquals @@ -50,7 +49,6 @@ import java.util.* /** * A test suite for the [FaaSService] implementation under simulated conditions. */ -@OptIn(ExperimentalCoroutinesApi::class) internal class SimFaaSServiceTest { private lateinit var machineModel: MachineModel @@ -75,7 +73,7 @@ internal class SimFaaSServiceTest { }) val delayInjector = StochasticDelayInjector(ColdStartModel.GOOGLE, random) - val deployer = SimFunctionDeployer(clock, this, machineModel, delayInjector) { workload } + val deployer = SimFunctionDeployer(coroutineContext, clock, machineModel, delayInjector) { workload } val service = FaaSService( coroutineContext, clock, @@ -91,6 +89,7 @@ internal class SimFaaSServiceTest { delay(2000) service.close() + deployer.close() yield() diff --git a/opendc-faas/opendc-faas-workload/src/main/kotlin/org/opendc/faas/workload/FaaSServiceHelper.kt b/opendc-faas/opendc-faas-workload/src/main/kotlin/org/opendc/faas/workload/FaaSServiceHelper.kt deleted file mode 100644 index ede6ac54..00000000 --- a/opendc-faas/opendc-faas-workload/src/main/kotlin/org/opendc/faas/workload/FaaSServiceHelper.kt +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.faas.workload - -import kotlinx.coroutines.* -import mu.KotlinLogging -import org.opendc.faas.api.FaaSFunction -import org.opendc.faas.service.FaaSService -import org.opendc.faas.service.FunctionObject -import org.opendc.faas.service.autoscaler.FunctionTerminationPolicy -import org.opendc.faas.service.deployer.FunctionDeployer -import org.opendc.faas.service.deployer.FunctionInstance -import org.opendc.faas.service.deployer.FunctionInstanceListener -import org.opendc.faas.service.router.RoutingPolicy -import org.opendc.faas.simulator.SimFunctionDeployer -import org.opendc.faas.simulator.delay.ColdStartModel -import org.opendc.faas.simulator.delay.StochasticDelayInjector -import org.opendc.faas.simulator.delay.ZeroDelayInjector -import org.opendc.simulator.compute.model.MachineModel -import java.time.Clock -import java.util.* -import kotlin.coroutines.CoroutineContext -import kotlin.math.max - -/** - * Helper class to simulate FaaS-based workloads in OpenDC. - * - * @param context A [CoroutineContext] to run the simulation in. - * @param clock A [Clock] instance tracking simulation time. - * @param machineModel The [MachineModel] that models the physical machine on which the functions run. - * @param routingPolicy The routing policy to use. - * @param terminationPolicy The function termination policy to use. - * @param coldStartModel The cold start models to test. - * @param seed The seed of the simulation. - */ -public class FaaSServiceHelper( - private val context: CoroutineContext, - private val clock: Clock, - private val machineModel: MachineModel, - private val routingPolicy: RoutingPolicy, - private val terminationPolicy: FunctionTerminationPolicy, - private val coldStartModel: ColdStartModel? = null, -) : AutoCloseable { - /** - * The scope of this helper. - */ - private val scope = CoroutineScope(context + Job()) - - /** - * The logger for this class. - */ - private val logger = KotlinLogging.logger {} - - /** - * The simulated function deployer. - */ - private val deployer = object : FunctionDeployer { - override fun deploy(function: FunctionObject, listener: FunctionInstanceListener): FunctionInstance { - val deployer = checkNotNull(_deployer) - return deployer.deploy(function, listener) - } - } - private var _deployer: SimFunctionDeployer? = null - - /** - * The [FaaSService] created by the helper. - */ - public val service: FaaSService = FaaSService( - context, - clock, - deployer, - routingPolicy, - terminationPolicy - ) - - /** - * Run a simulation of the [FaaSService] by replaying the workload trace given by [trace]. - * - * @param trace The trace to simulate. - * @param seed The seed for the simulation. - * @param functions The functions that have been created by the runner. - */ - public suspend fun run(trace: List<FunctionTrace>, seed: Long = 0, functions: MutableList<FaaSFunction>? = null) { - // Set up the simulated deployer - val delayInjector = if (coldStartModel != null) - StochasticDelayInjector(coldStartModel, Random(seed)) - else - ZeroDelayInjector - val traceById = trace.associateBy { it.id } - _deployer = SimFunctionDeployer(clock, scope, machineModel, delayInjector) { - FunctionTraceWorkload(traceById.getValue(it.name)) - } - - val client = service.newClient() - try { - coroutineScope { - for (entry in trace) { - launch { - val function = client.newFunction(entry.id, entry.maxMemory.toLong()) - functions?.add(function) - - var offset = Long.MIN_VALUE - - for (sample in entry.samples) { - if (sample.invocations == 0) { - continue - } - - if (offset < 0) { - offset = sample.timestamp - clock.millis() - } - - delay(max(0, (sample.timestamp - offset) - clock.millis())) - - logger.info { "Invoking function ${entry.id} ${sample.invocations} times [${sample.timestamp}]" } - - repeat(sample.invocations) { - function.invoke() - } - } - } - } - } - } finally { - client.close() - } - } - - override fun close() { - service.close() - scope.cancel() - } -} diff --git a/opendc-web/opendc-web-runner/build.gradle.kts b/opendc-web/opendc-web-runner/build.gradle.kts index a5723994..2679a97f 100644 --- a/opendc-web/opendc-web-runner/build.gradle.kts +++ b/opendc-web/opendc-web-runner/build.gradle.kts @@ -49,8 +49,7 @@ val cliJar by tasks.creating(Jar::class) { dependencies { api(projects.opendcWeb.opendcWebClient) - implementation(projects.opendcCompute.opendcComputeSimulator) - implementation(projects.opendcCompute.opendcComputeWorkload) + implementation(projects.opendcExperiments.opendcExperimentsCompute) implementation(projects.opendcSimulator.opendcSimulatorCore) implementation(projects.opendcTrace.opendcTraceApi) diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt index 9a1319b6..74f7c8c1 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt @@ -23,11 +23,10 @@ package org.opendc.web.runner import mu.KotlinLogging -import org.opendc.compute.workload.* -import org.opendc.compute.workload.telemetry.ComputeMetricReader -import org.opendc.compute.workload.topology.HostSpec -import org.opendc.compute.workload.topology.Topology -import org.opendc.compute.workload.topology.apply +import org.opendc.compute.service.ComputeService +import org.opendc.experiments.compute.* +import org.opendc.experiments.compute.topology.HostSpec +import org.opendc.experiments.provisioner.Provisioner import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode @@ -37,6 +36,7 @@ import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.core.runBlockingSimulation import org.opendc.web.proto.runner.Job import org.opendc.web.proto.runner.Scenario +import org.opendc.web.proto.runner.Topology import org.opendc.web.runner.internal.WebComputeMonitor import java.io.File import java.time.Duration @@ -74,7 +74,8 @@ public class OpenDCRunner( /** * The [ForkJoinPool] that is used to execute the simulation jobs. */ - private val pool = ForkJoinPool(parallelism, RunnerThreadFactory(Thread.currentThread().contextClassLoader), null, false) + private val pool = + ForkJoinPool(parallelism, RunnerThreadFactory(Thread.currentThread().contextClassLoader), null, false) /** * A [ScheduledExecutorService] to manage the heartbeat of simulation jobs as well as tracking the deadline of @@ -129,11 +130,22 @@ public class OpenDCRunner( val id = job.id val scenario = job.scenario - val heartbeat = scheduler.scheduleWithFixedDelay({ manager.heartbeat(id) }, 0, heartbeatInterval.toMillis(), TimeUnit.MILLISECONDS) + val heartbeat = scheduler.scheduleWithFixedDelay( + { manager.heartbeat(id) }, + 0, + heartbeatInterval.toMillis(), + TimeUnit.MILLISECONDS + ) try { val topology = convertTopology(scenario.topology) - val jobs = (0 until scenario.portfolio.targets.repeats).map { repeat -> SimulationTask(scenario, repeat, topology) } + val jobs = (0 until scenario.portfolio.targets.repeats).map { repeat -> + SimulationTask( + scenario, + repeat, + topology + ) + } val results = invokeAll(jobs).map { it.rawResult } logger.info { "Finished simulation for job $id" } @@ -188,128 +200,126 @@ public class OpenDCRunner( private inner class SimulationTask( private val scenario: Scenario, private val repeat: Int, - private val topology: Topology, + private val topology: List<HostSpec>, ) : RecursiveTask<WebComputeMonitor.Results>() { override fun compute(): WebComputeMonitor.Results { val monitor = WebComputeMonitor() // Schedule task that interrupts the simulation if it runs for too long. val currentThread = Thread.currentThread() - val interruptTask = scheduler.schedule({ currentThread.interrupt() }, jobTimeout.toMillis(), TimeUnit.MILLISECONDS) + val interruptTask = + scheduler.schedule({ currentThread.interrupt() }, jobTimeout.toMillis(), TimeUnit.MILLISECONDS) try { - runBlockingSimulation { - val workloadName = scenario.workload.trace.id - val workloadFraction = scenario.workload.samplingFraction - - val seeder = Random(repeat.toLong()) - - val phenomena = scenario.phenomena - val computeScheduler = createComputeScheduler(scenario.schedulerName, seeder) - val workload = trace(workloadName).sampleByLoad(workloadFraction) - val vms = workload.resolve(workloadLoader, seeder) - - val failureModel = - if (phenomena.failures) - grid5000(Duration.ofDays(7)) - else - null - - val simulator = ComputeServiceHelper( - coroutineContext, - clock, - computeScheduler, - seed = 0L, - ) - val reader = ComputeMetricReader(this, clock, simulator.service, monitor) - - try { - // Instantiate the topology onto the simulator - simulator.apply(topology) - // Run workload trace - simulator.run(vms, failureModel = failureModel, interference = phenomena.interference) - - val serviceMetrics = simulator.service.getSchedulerStats() - logger.debug { - "Scheduler " + - "Success=${serviceMetrics.attemptsSuccess} " + - "Failure=${serviceMetrics.attemptsFailure} " + - "Error=${serviceMetrics.attemptsError} " + - "Pending=${serviceMetrics.serversPending} " + - "Active=${serviceMetrics.serversActive}" - } - } finally { - simulator.close() - reader.close() - } - } + runSimulation(monitor) } finally { interruptTask.cancel(false) } return monitor.collectResults() } + + /** + * Run a single simulation of the scenario. + */ + private fun runSimulation(monitor: WebComputeMonitor) = runBlockingSimulation { + val serviceDomain = "compute.opendc.org" + val seed = repeat.toLong() + + val scenario = scenario + + Provisioner(coroutineContext, clock, seed).use { provisioner -> + provisioner.runSteps( + setupComputeService( + serviceDomain, + { createComputeScheduler(scenario.schedulerName, Random(it.seeder.nextLong())) } + ), + registerComputeMonitor(serviceDomain, monitor), + setupHosts(serviceDomain, topology) + ) + + val service = provisioner.registry.resolve(serviceDomain, ComputeService::class.java)!! + + val workload = + trace(scenario.workload.trace.id).sampleByLoad(scenario.workload.samplingFraction) + val vms = workload.resolve(workloadLoader, Random(seed)) + + val phenomena = scenario.phenomena + val failureModel = + if (phenomena.failures) + grid5000(Duration.ofDays(7)) + else + null + + // Run workload trace + service.replay(clock, vms, seed, failureModel = failureModel, interference = phenomena.interference) + + val serviceMetrics = service.getSchedulerStats() + logger.debug { + "Scheduler " + + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" + } + } + } } /** * Convert the specified [topology] into an [Topology] understood by OpenDC. */ - private fun convertTopology(topology: org.opendc.web.proto.runner.Topology): Topology { - return object : Topology { - - override fun resolve(): List<HostSpec> { - val res = mutableListOf<HostSpec>() - val random = Random(0) - - val machines = topology.rooms.asSequence() - .flatMap { room -> - room.tiles.flatMap { tile -> - val rack = tile.rack - rack?.machines?.map { machine -> rack to machine } ?: emptyList() - } - } - for ((rack, machine) in machines) { - val clusterId = rack.id - val position = machine.position - - val processors = machine.cpus.flatMap { cpu -> - val cores = cpu.numberOfCores - val speed = cpu.clockRateMhz - // TODO Remove hard coding of vendor - val node = ProcessingNode("Intel", "amd64", cpu.name, cores) - List(cores) { coreId -> - ProcessingUnit(node, coreId, speed) - } - } - val memoryUnits = machine.memory.map { memory -> - MemoryUnit( - "Samsung", - memory.name, - memory.speedMbPerS, - memory.sizeMb.toLong() - ) - } - - val energyConsumptionW = machine.cpus.sumOf { it.energyConsumptionW } - val powerModel = LinearPowerModel(2 * energyConsumptionW, energyConsumptionW * 0.5) - val powerDriver = SimplePowerDriver(powerModel) - - val spec = HostSpec( - UUID(random.nextLong(), random.nextLong()), - "node-$clusterId-$position", - mapOf("cluster" to clusterId), - MachineModel(processors, memoryUnits), - powerDriver - ) - - res += spec + private fun convertTopology(topology: Topology): List<HostSpec> { + val res = mutableListOf<HostSpec>() + val random = Random(0) + + val machines = topology.rooms.asSequence() + .flatMap { room -> + room.tiles.flatMap { tile -> + val rack = tile.rack + rack?.machines?.map { machine -> rack to machine } ?: emptyList() } + } - return res + for ((rack, machine) in machines) { + val clusterId = rack.id + val position = machine.position + + val processors = machine.cpus.flatMap { cpu -> + val cores = cpu.numberOfCores + val speed = cpu.clockRateMhz + // TODO Remove hard coding of vendor + val node = ProcessingNode("Intel", "amd64", cpu.name, cores) + List(cores) { coreId -> + ProcessingUnit(node, coreId, speed) + } } + val memoryUnits = machine.memory.map { memory -> + MemoryUnit( + "Samsung", + memory.name, + memory.speedMbPerS, + memory.sizeMb.toLong() + ) + } + + val energyConsumptionW = machine.cpus.sumOf { it.energyConsumptionW } + val powerModel = LinearPowerModel(2 * energyConsumptionW, energyConsumptionW * 0.5) + val powerDriver = SimplePowerDriver(powerModel) - override fun toString(): String = "WebRunnerTopologyFactory" + val spec = HostSpec( + UUID(random.nextLong(), random.nextLong()), + "node-$clusterId-$position", + mapOf("cluster" to clusterId), + MachineModel(processors, memoryUnits), + powerDriver + ) + + res += spec } + + return res } /** diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt index 4c3d1cfa..76377c08 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt @@ -22,9 +22,9 @@ package org.opendc.web.runner.internal -import org.opendc.compute.workload.telemetry.ComputeMonitor -import org.opendc.compute.workload.telemetry.table.HostTableReader -import org.opendc.compute.workload.telemetry.table.ServiceTableReader +import org.opendc.experiments.compute.telemetry.ComputeMonitor +import org.opendc.experiments.compute.telemetry.table.HostTableReader +import org.opendc.experiments.compute.telemetry.table.ServiceTableReader import kotlin.math.max import kotlin.math.roundToLong diff --git a/opendc-workflow/opendc-workflow-service/build.gradle.kts b/opendc-workflow/opendc-workflow-service/build.gradle.kts index b6365885..6908a5af 100644 --- a/opendc-workflow/opendc-workflow-service/build.gradle.kts +++ b/opendc-workflow/opendc-workflow-service/build.gradle.kts @@ -33,9 +33,9 @@ dependencies { implementation(projects.opendcCommon) implementation(libs.kotlin.logging) - testImplementation(projects.opendcWorkflow.opendcWorkflowWorkload) - testImplementation(projects.opendcCompute.opendcComputeWorkload) testImplementation(projects.opendcSimulator.opendcSimulatorCore) + testImplementation(projects.opendcExperiments.opendcExperimentsCompute) + testImplementation(projects.opendcExperiments.opendcExperimentsWorkflow) testImplementation(projects.opendcTrace.opendcTraceApi) testRuntimeOnly(projects.opendcTrace.opendcTraceGwf) testRuntimeOnly(libs.log4j.slf4j) diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt index 0fb8b67c..f8039e1d 100644 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt @@ -22,17 +22,22 @@ package org.opendc.workflow.service +import org.junit.jupiter.api.Assertions.assertAll import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertAll +import org.opendc.compute.service.scheduler.ComputeScheduler import org.opendc.compute.service.scheduler.FilterScheduler import org.opendc.compute.service.scheduler.filters.ComputeFilter import org.opendc.compute.service.scheduler.filters.RamFilter import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.service.scheduler.weights.VCpuWeigher -import org.opendc.compute.workload.ComputeServiceHelper -import org.opendc.compute.workload.topology.HostSpec +import org.opendc.experiments.compute.setupComputeService +import org.opendc.experiments.compute.setupHosts +import org.opendc.experiments.compute.topology.HostSpec +import org.opendc.experiments.provisioner.Provisioner +import org.opendc.experiments.provisioner.ProvisioningContext +import org.opendc.experiments.workflow.* import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode @@ -46,9 +51,6 @@ import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy -import org.opendc.workflow.workload.WorkflowSchedulerSpec -import org.opendc.workflow.workload.WorkflowServiceHelper -import org.opendc.workflow.workload.toJobs import java.nio.file.Paths import java.time.Duration import java.util.* @@ -63,55 +65,61 @@ internal class WorkflowServiceTest { */ @Test fun testTrace() = runBlockingSimulation { - // Configure the ComputeService that is responsible for mapping virtual machines onto physical hosts - val computeScheduler = FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)), - weighers = listOf(VCpuWeigher(1.0, multiplier = 1.0)) - ) + val computeService = "compute.opendc.org" + val workflowService = "workflow.opendc.org" - val computeHelper = ComputeServiceHelper( - coroutineContext, - clock, - computeScheduler, - seed = 0, - schedulingQuantum = Duration.ofSeconds(1) - ) + Provisioner(coroutineContext, clock, seed = 0L).use { provisioner -> + val scheduler: (ProvisioningContext) -> ComputeScheduler = { + FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)), + weighers = listOf(VCpuWeigher(1.0, multiplier = 1.0)) + ) + } - val hostCount = 4 - repeat(hostCount) { computeHelper.registerHost(createHostSpec(it)) } + provisioner.runSteps( + // Configure the ComputeService that is responsible for mapping virtual machines onto physical hosts + setupComputeService(computeService, scheduler, schedulingQuantum = Duration.ofSeconds(1)), + setupHosts(computeService, List(4) { createHostSpec(it) }), - // Configure the WorkflowService that is responsible for scheduling the workflow tasks onto machines - val workflowScheduler = WorkflowSchedulerSpec( - schedulingQuantum = Duration.ofMillis(100), - jobAdmissionPolicy = NullJobAdmissionPolicy, - jobOrderPolicy = SubmissionTimeJobOrderPolicy(), - taskEligibilityPolicy = NullTaskEligibilityPolicy, - taskOrderPolicy = SubmissionTimeTaskOrderPolicy(), - ) - val workflowHelper = WorkflowServiceHelper(coroutineContext, clock, computeHelper.service.newClient(), workflowScheduler) + // Configure the WorkflowService that is responsible for scheduling the workflow tasks onto machines + setupWorkflowService( + workflowService, + computeService, + WorkflowSchedulerSpec( + schedulingQuantum = Duration.ofMillis(100), + jobAdmissionPolicy = NullJobAdmissionPolicy, + jobOrderPolicy = SubmissionTimeJobOrderPolicy(), + taskEligibilityPolicy = NullTaskEligibilityPolicy, + taskOrderPolicy = SubmissionTimeTaskOrderPolicy(), + ) + ) + ) + + val service = provisioner.registry.resolve(workflowService, WorkflowService::class.java)!! - try { val trace = Trace.open( Paths.get(checkNotNull(WorkflowServiceTest::class.java.getResource("/trace.gwf")).toURI()), format = "gwf" ) + service.replay(clock, trace.toJobs()) - workflowHelper.replay(trace.toJobs()) - } finally { - workflowHelper.close() - computeHelper.close() - } - - val metrics = workflowHelper.service.getSchedulerStats() + val metrics = service.getSchedulerStats() - assertAll( - { assertEquals(758, metrics.workflowsSubmitted, "No jobs submitted") }, - { assertEquals(0, metrics.workflowsRunning, "Not all submitted jobs started") }, - { assertEquals(metrics.workflowsSubmitted, metrics.workflowsFinished, "Not all started jobs finished") }, - { assertEquals(0, metrics.tasksRunning, "Not all started tasks finished") }, - { assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") }, - { assertEquals(32649883L, clock.millis()) { "Total duration incorrect" } } - ) + assertAll( + { assertEquals(758, metrics.workflowsSubmitted, "No jobs submitted") }, + { assertEquals(0, metrics.workflowsRunning, "Not all submitted jobs started") }, + { + assertEquals( + metrics.workflowsSubmitted, + metrics.workflowsFinished, + "Not all started jobs finished" + ) + }, + { assertEquals(0, metrics.tasksRunning, "Not all started tasks finished") }, + { assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") }, + { assertEquals(32649883L, clock.millis()) { "Total duration incorrect" } } + ) + } } /** diff --git a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt deleted file mode 100644 index 435d0190..00000000 --- a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.workflow.workload - -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch -import org.opendc.compute.api.ComputeClient -import org.opendc.workflow.api.Job -import org.opendc.workflow.service.WorkflowService -import java.time.Clock -import kotlin.coroutines.CoroutineContext - -/** - * Helper class to simulate workflow-based workloads in OpenDC. - * - * @param context [CoroutineContext] to run the simulation in. - * @param clock [Clock] instance tracking simulation time. - * @param computeClient A [ComputeClient] instance to communicate with the cluster scheduler. - * @param schedulerSpec The configuration of the workflow scheduler. - */ -public class WorkflowServiceHelper( - private val context: CoroutineContext, - private val clock: Clock, - private val computeClient: ComputeClient, - private val schedulerSpec: WorkflowSchedulerSpec -) : AutoCloseable { - /** - * The [WorkflowService] that is constructed by this runner. - */ - public val service: WorkflowService = WorkflowService( - context, - clock, - computeClient, - schedulerSpec.schedulingQuantum, - jobAdmissionPolicy = schedulerSpec.jobAdmissionPolicy, - jobOrderPolicy = schedulerSpec.jobOrderPolicy, - taskEligibilityPolicy = schedulerSpec.taskEligibilityPolicy, - taskOrderPolicy = schedulerSpec.taskOrderPolicy, - ) - - /** - * Run the specified list of [jobs] using the workflow service and suspend execution until all jobs have - * finished. - */ - public suspend fun replay(jobs: List<Job>) { - // Sort jobs by their arrival time - val orderedJobs = jobs.sortedBy { it.metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long } - if (orderedJobs.isEmpty()) { - return - } - - // Wait until the trace is started - val startTime = orderedJobs[0].metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long - var offset = 0L - - if (startTime != Long.MAX_VALUE) { - offset = startTime - clock.millis() - delay(offset.coerceAtLeast(0)) - } - - coroutineScope { - for (job in orderedJobs) { - val submitTime = job.metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long - if (submitTime != Long.MAX_VALUE) { - delay(((submitTime - offset) - clock.millis()).coerceAtLeast(0)) - } - - launch { service.invoke(job) } - } - } - } - - override fun close() { - computeClient.close() - service.close() - } -} diff --git a/settings.gradle.kts b/settings.gradle.kts index 860cbda5..c824f537 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -25,14 +25,15 @@ include(":opendc-common") include(":opendc-compute:opendc-compute-api") include(":opendc-compute:opendc-compute-service") include(":opendc-compute:opendc-compute-simulator") -include(":opendc-compute:opendc-compute-workload") include(":opendc-workflow:opendc-workflow-api") include(":opendc-workflow:opendc-workflow-service") -include(":opendc-workflow:opendc-workflow-workload") include(":opendc-faas:opendc-faas-api") include(":opendc-faas:opendc-faas-service") include(":opendc-faas:opendc-faas-simulator") -include(":opendc-faas:opendc-faas-workload") +include(":opendc-experiments:opendc-experiments-base") +include(":opendc-experiments:opendc-experiments-compute") +include(":opendc-experiments:opendc-experiments-workflow") +include(":opendc-experiments:opendc-experiments-faas") include(":opendc-experiments:opendc-experiments-capelin") include(":opendc-experiments:opendc-experiments-tf20") include(":opendc-web:opendc-web-proto") |
