diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-03-09 14:01:55 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-03-09 14:01:55 +0100 |
| commit | 66c2501d95b167f9e7474a45e542f82d2d8e83ff (patch) | |
| tree | 7c3464a424891ab7c3cb9c0ac77d67256b144f97 | |
| parent | 2977dd8a5f1d742193eae79364a284e68269f7b5 (diff) | |
| parent | 75751865179c6cd5a05abb4a0641193595f59b45 (diff) | |
compute: Improvements to cloud compute model (v1)
This is the first of the pull requests in an attempt to improve the existing cloud compute model (see #86). This pull request restructures the compute API and splits the consumer and service interfaces into different modules:
- opendc-compute-api now defines the API interface for the OpenDC Compute module, which can be used by consumers of the OpenDC Compute service.
- opendc-compute-service hosts the service implementation for OpenDC Compute and contains all business logic regarding the IaaS platform (such as scheduling).
- opendc-compute-simulator implements a "compute driver" for the OpenDC Compute platform that simulates submitted workloads.
- Image is now a data-class and does not specify itself the workload to simulate. Instead, the workload should be passed via its tags currently (with key "workload"). In the future, the simulation backend will accept a mapper interface that maps Images to Workloads.
103 files changed, 2019 insertions, 1730 deletions
diff --git a/simulator/opendc-compute/opendc-compute-api/build.gradle.kts b/simulator/opendc-compute/opendc-compute-api/build.gradle.kts new file mode 100644 index 00000000..10046322 --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-api/build.gradle.kts @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "API interface for the OpenDC Compute service" + +/* Build configuration */ +plugins { + `kotlin-library-conventions` +} + +dependencies { + api(platform(project(":opendc-platform"))) + api(project(":opendc-core")) +} diff --git a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt new file mode 100644 index 00000000..025513e6 --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.api + +/** + * A client interface for the OpenDC Compute service. + */ +public interface ComputeClient : AutoCloseable { + /** + * Create a new [Server] instance at this compute service. + * + * @param name The name of the server to deploy. + * @param image The image to be deployed. + * @param flavor The flavor of the machine instance to run this [image] on. + */ + public suspend fun newServer( + name: String, + image: Image, + flavor: Flavor + ): Server + + /** + * Release the resources associated with this client, preventing any further API calls. + */ + public override fun close() +} diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/workload/VmWorkload.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeWorkload.kt index 6c724277..64a47277 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/workload/VmWorkload.kt +++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeWorkload.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,9 +20,8 @@ * SOFTWARE. */ -package org.opendc.compute.core.workload +package org.opendc.compute.api -import org.opendc.compute.core.image.Image import org.opendc.core.User import org.opendc.core.workload.Workload import java.util.UUID @@ -35,13 +34,13 @@ import java.util.UUID * @property owner The owner of the VM. * @property image The image of the VM. */ -public data class VmWorkload( +public data class ComputeWorkload( override val uid: UUID, override val name: String, override val owner: User, val image: Image ) : Workload { - override fun equals(other: Any?): Boolean = other is VmWorkload && uid == other.uid + override fun equals(other: Any?): Boolean = other is ComputeWorkload && uid == other.uid override fun hashCode(): Int = uid.hashCode() } diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/Flavor.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Flavor.kt index e5ca115f..bf5f0ce4 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/Flavor.kt +++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Flavor.kt @@ -1,7 +1,5 @@ /* - * MIT License - * - * Copyright (c) 2020 atlarge-research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.compute.core +package org.opendc.compute.api /** * Flavors define the compute and memory capacity of [Server] instance. To put it simply, a flavor is an available diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/image/Image.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Image.kt index e481fcc3..280c4d89 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/image/Image.kt +++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Image.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,9 +20,11 @@ * SOFTWARE. */ -package org.opendc.compute.core.image +package org.opendc.compute.api import org.opendc.core.resource.Resource +import org.opendc.core.resource.TagContainer +import java.util.* /** * An image containing a bootable operating system that can directly be executed by physical or virtual server. @@ -32,4 +34,15 @@ import org.opendc.core.resource.Resource * useful for backup purposes or for producing “gold” server images if you plan to deploy a particular server * configuration frequently. */ -public interface Image : Resource +public data class Image( + public override val uid: UUID, + public override val name: String, + public override val tags: TagContainer +) : Resource { + public companion object { + /** + * An empty boot disk [Image] that exits immediately on start. + */ + public val EMPTY: Image = Image(UUID.randomUUID(), "empty", emptyMap()) + } +} diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/Server.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Server.kt index 948f622f..ab1eb860 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/Server.kt +++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Server.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,59 +20,55 @@ * SOFTWARE. */ -package org.opendc.compute.core +package org.opendc.compute.api -import kotlinx.coroutines.flow.Flow -import org.opendc.compute.core.image.Image import org.opendc.core.resource.Resource -import org.opendc.core.resource.TagContainer -import org.opendc.core.services.ServiceRegistry -import java.util.UUID /** - * A server instance that is running on some physical or virtual machine. + * A stateful object representing a server instance that is running on some physical or virtual machine. */ -public data class Server( +public interface Server : Resource { /** - * The unique identifier of the server. + * The name of the server. */ - public override val uid: UUID, + public override val name: String /** - * The optional name of the server. + * The flavor of the server. */ - public override val name: String, + public val flavor: Flavor /** - * The tags of this server. + * The image of the server. */ - public override val tags: TagContainer, + public val image: Image /** - * The hardware configuration of the server. + * The tags assigned to the server. */ - public val flavor: Flavor, + public override val tags: Map<String, String> /** - * The image running on the server. + * The last known state of the server. */ - public val image: Image, + public val state: ServerState /** - * The last known state of the server. + * Register the specified [ServerWatcher] to watch the state of the server. + * + * @param watcher The watcher to register for the server. */ - public val state: ServerState, + public fun watch(watcher: ServerWatcher) /** - * The services published by this server. + * De-register the specified [ServerWatcher] from the server to stop it from receiving events. + * + * @param watcher The watcher to de-register from the server. */ - public val services: ServiceRegistry, + public fun unwatch(watcher: ServerWatcher) /** - * The events that are emitted by the server. + * Refresh the local state of the resource. */ - public val events: Flow<ServerEvent> -) : Resource { - override fun hashCode(): Int = uid.hashCode() - override fun equals(other: Any?): Boolean = other is Server && uid == other.uid + public suspend fun refresh() } diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/ServerState.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerState.kt index 4b9d7c13..25d2e519 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/ServerState.kt +++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerState.kt @@ -1,7 +1,5 @@ /* - * MIT License - * - * Copyright (c) 2020 atlarge-research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.compute.core +package org.opendc.compute.api /** * An enumeration describing the possible states of a server. diff --git a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerWatcher.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerWatcher.kt new file mode 100644 index 00000000..48a17b30 --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerWatcher.kt @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.api + +/** + * An interface used to watch the state of [Server] instances. + */ +public interface ServerWatcher { + /** + * This method is invoked when the state of a [Server] changes. + * + * Note that the state of [server] might not reflect the state as reported by the invocation, as a call to + * [Server.refresh] is required to update its state. + * + * @param server The server whose state has changed. + * @param newState The new state of the server. + */ + public fun onStateChanged(server: Server, newState: ServerState) {} +} diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/Hypervisor.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/Hypervisor.kt deleted file mode 100644 index 1ae52baa..00000000 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/Hypervisor.kt +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.core.virt - -import kotlinx.coroutines.flow.Flow -import org.opendc.core.Identity -import java.util.UUID - -/** - * A hypervisor (or virtual machine monitor) is software or firmware that virtualizes the host compute environment - * into several virtual guest machines. - */ -public class Hypervisor( - /** - * The unique identifier of the hypervisor. - */ - override val uid: UUID, - - /** - * The optional name of the hypervisor. - */ - override val name: String, - - /** - * Metadata of the hypervisor. - */ - public val metadata: Map<String, Any>, - - /** - * The events that are emitted by the hypervisor. - */ - public val events: Flow<HypervisorEvent> -) : Identity { - override fun hashCode(): Int = uid.hashCode() - override fun equals(other: Any?): Boolean = other is Hypervisor && uid == other.uid -} diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/InsufficientMemoryOnServerException.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/InsufficientMemoryOnServerException.kt deleted file mode 100644 index 6fe84ea6..00000000 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/InsufficientMemoryOnServerException.kt +++ /dev/null @@ -1,3 +0,0 @@ -package org.opendc.compute.core.virt.driver - -public class InsufficientMemoryOnServerException : IllegalStateException("Insufficient memory left on server.") diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/VirtDriver.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/VirtDriver.kt deleted file mode 100644 index 68cc7b50..00000000 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/VirtDriver.kt +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.core.virt.driver - -import kotlinx.coroutines.flow.Flow -import org.opendc.compute.core.Flavor -import org.opendc.compute.core.Server -import org.opendc.compute.core.image.Image -import org.opendc.compute.core.virt.HypervisorEvent -import org.opendc.core.services.AbstractServiceKey -import java.util.UUID - -/** - * A driver interface for a hypervisor running on some host server and communicating with the central compute service to - * provide virtualization for that particular resource. - */ -public interface VirtDriver { - /** - * The events emitted by the driver. - */ - public val events: Flow<HypervisorEvent> - - /** - * Determine whether the specified [flavor] can still fit on this driver. - */ - public fun canFit(flavor: Flavor): Boolean - - /** - * Spawn the given [Image] on the compute resource of this driver. - * - * @param name The name of the server to spawn. - * @param image The image to deploy. - * @param flavor The flavor of the server which this driver is controlling. - * @return The virtual server spawned by this method. - */ - public suspend fun spawn( - name: String, - image: Image, - flavor: Flavor - ): Server - - public companion object Key : AbstractServiceKey<VirtDriver>(UUID.randomUUID(), "virtual-driver") -} diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/VirtProvisioningService.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/VirtProvisioningService.kt deleted file mode 100644 index 3d722110..00000000 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/VirtProvisioningService.kt +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.core.virt.service - -import kotlinx.coroutines.flow.Flow -import org.opendc.compute.core.Server -import org.opendc.compute.core.image.Image -import org.opendc.compute.core.virt.driver.VirtDriver - -/** - * A service for VM provisioning on a cloud. - */ -public interface VirtProvisioningService { - /** - * The events emitted by the service. - */ - public val events: Flow<VirtProvisioningEvent> - - /** - * Obtain the active hypervisors for this provisioner. - */ - public suspend fun drivers(): Set<VirtDriver> - - /** - * The number of hosts available in the system. - */ - public val hostCount: Int - - /** - * Submit the specified [Image] to the provisioning service. - * - * @param name The name of the server to deploy. - * @param image The image to be deployed. - * @param flavor The flavor of the machine instance to run this [image] on. - */ - public suspend fun deploy( - name: String, - image: Image, - flavor: org.opendc.compute.core.Flavor - ): Server - - /** - * Terminate the provisioning service releasing all the leased bare-metal machines. - */ - public suspend fun terminate() -} diff --git a/simulator/opendc-compute/opendc-compute-service/build.gradle.kts b/simulator/opendc-compute/opendc-compute-service/build.gradle.kts new file mode 100644 index 00000000..1b09ef6d --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-service/build.gradle.kts @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "OpenDC Compute Service implementation" + +/* Build configuration */ +plugins { + `kotlin-library-conventions` + `testing-conventions` +} + +dependencies { + api(platform(project(":opendc-platform"))) + api(project(":opendc-compute:opendc-compute-api")) + api(project(":opendc-trace:opendc-trace-core")) + implementation(project(":opendc-utils")) + implementation("io.github.microutils:kotlin-logging") + + testImplementation(project(":opendc-simulator:opendc-simulator-core")) + testRuntimeOnly("org.slf4j:slf4j-simple:${versions.slf4j}") +} diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt new file mode 100644 index 00000000..593e4b56 --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service + +import kotlinx.coroutines.flow.Flow +import org.opendc.compute.api.ComputeClient +import org.opendc.compute.service.driver.Host +import org.opendc.compute.service.internal.ComputeServiceImpl +import org.opendc.compute.service.scheduler.AllocationPolicy +import org.opendc.trace.core.EventTracer +import java.time.Clock +import kotlin.coroutines.CoroutineContext + +/** + * The [ComputeService] hosts the API implementation of the OpenDC Compute service. + */ +public interface ComputeService : AutoCloseable { + /** + * The events emitted by the service. + */ + public val events: Flow<ComputeServiceEvent> + + /** + * The hosts that are used by the compute service. + */ + public val hosts: Set<Host> + + /** + * The number of hosts available in the system. + */ + public val hostCount: Int + + /** + * Create a new [ComputeClient] to control the compute service. + */ + public fun newClient(): ComputeClient + + /** + * Add a [host] to the scheduling pool of the compute service. + */ + public fun addHost(host: Host) + + /** + * Remove a [host] from the scheduling pool of the compute service. + */ + public fun removeHost(host: Host) + + /** + * Terminate the lifecycle of the compute service, stopping all running instances. + */ + public override fun close() + + public companion object { + /** + * Construct a new [ComputeService] implementation. + * + * @param context The [CoroutineContext] to use in the service. + * @param clock The clock instance to use. + * @param tracer The event tracer to use. + * @param allocationPolicy The allocation policy to use. + */ + public operator fun invoke( + context: CoroutineContext, + clock: Clock, + tracer: EventTracer, + allocationPolicy: AllocationPolicy, + schedulingQuantum: Long = 300000, + ): ComputeService { + return ComputeServiceImpl(context, clock, tracer, allocationPolicy, schedulingQuantum) + } + } +} diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/VirtProvisioningEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeServiceEvent.kt index abd2fc95..193008a7 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/VirtProvisioningEvent.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeServiceEvent.kt @@ -1,7 +1,5 @@ /* - * MIT License - * - * Copyright (c) 2020 atlarge-research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,22 +20,22 @@ * SOFTWARE. */ -package org.opendc.compute.core.virt.service +package org.opendc.compute.service /** - * An event that is emitted by the [VirtProvisioningService]. + * An event that is emitted by the [ComputeService]. */ -public sealed class VirtProvisioningEvent { +public sealed class ComputeServiceEvent { /** * The service that has emitted the event. */ - public abstract val provisioner: VirtProvisioningService + public abstract val provisioner: ComputeService /** * An event emitted for writing metrics. */ public data class MetricsAvailable( - override val provisioner: VirtProvisioningService, + override val provisioner: ComputeService, public val totalHostCount: Int, public val availableHostCount: Int, public val totalVmCount: Int, @@ -45,5 +43,5 @@ public sealed class VirtProvisioningEvent { public val inactiveVmCount: Int, public val waitingVmCount: Int, public val failedVmCount: Int - ) : VirtProvisioningEvent() + ) : ComputeServiceEvent() } diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt new file mode 100644 index 00000000..2cd91144 --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service.driver + +import kotlinx.coroutines.flow.Flow +import org.opendc.compute.api.Server +import java.util.* + +/** + * Base interface for representing compute resources that host virtualized [Server] instances. + */ +public interface Host { + /** + * A unique identifier representing the host. + */ + public val uid: UUID + + /** + * The name of this host. + */ + public val name: String + + /** + * The machine model of the host. + */ + public val model: HostModel + + /** + * The state of the host. + */ + public val state: HostState + + /** + * The events emitted by the driver. + */ + public val events: Flow<HostEvent> + + /** + * Determine whether the specified [instance][server] can still fit on this host. + */ + public fun canFit(server: Server): Boolean + + /** + * Register the specified [instance][server] on the host. + * + * Once the method returns, the instance should be running if [start] is true or else the instance should be + * stopped. + */ + public suspend fun spawn(server: Server, start: Boolean = true) + + /** + * Determine whether the specified [instance][server] exists on the host. + */ + public operator fun contains(server: Server): Boolean + + /** + * Stat the server [instance][server] if it is currently not running on this host. + * + * @throws IllegalArgumentException if the server is not present on the host. + */ + public suspend fun start(server: Server) + + /** + * Stop the server [instance][server] if it is currently running on this host. + * + * @throws IllegalArgumentException if the server is not present on the host. + */ + public suspend fun stop(server: Server) + + /** + * Terminate the specified [instance][server] on this host and cleanup all resources associated with it. + */ + public suspend fun terminate(server: Server) + + /** + * Add a [HostListener] to this host. + */ + public fun addListener(listener: HostListener) + + /** + * Remove a [HostListener] from this host. + */ + public fun removeListener(listener: HostListener) +} diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HypervisorEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostEvent.kt index 9fb437de..97350679 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HypervisorEvent.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostEvent.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,19 +20,16 @@ * SOFTWARE. */ -package org.opendc.compute.core.virt - -import org.opendc.compute.core.Server -import org.opendc.compute.core.virt.driver.VirtDriver +package org.opendc.compute.service.driver /** - * An event that is emitted by a [VirtDriver]. + * An event that is emitted by a [Host]. */ -public sealed class HypervisorEvent { +public sealed class HostEvent { /** * The driver that emitted the event. */ - public abstract val driver: VirtDriver + public abstract val driver: Host /** * This event is emitted when the number of active servers on the server managed by this driver is updated. @@ -42,10 +39,10 @@ public sealed class HypervisorEvent { * @property availableMemory The available memory, in MB. */ public data class VmsUpdated( - override val driver: VirtDriver, + override val driver: Host, public val numberOfActiveServers: Int, public val availableMemory: Long - ) : HypervisorEvent() + ) : HostEvent() /** * This event is emitted when a slice is finished. @@ -63,7 +60,7 @@ public sealed class HypervisorEvent { * @property numberOfDeployedImages The number of images deployed on this hypervisor. */ public data class SliceFinished( - override val driver: VirtDriver, + override val driver: Host, public val requestedBurst: Long, public val grantedBurst: Long, public val overcommissionedBurst: Long, @@ -71,6 +68,5 @@ public sealed class HypervisorEvent { public val cpuUsage: Double, public val cpuDemand: Double, public val numberOfDeployedImages: Int, - public val hostServer: Server - ) : HypervisorEvent() + ) : HostEvent() } diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/ServerEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostListener.kt index e9212832..f076cae3 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/ServerEvent.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostListener.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,22 +20,22 @@ * SOFTWARE. */ -package org.opendc.compute.core +package org.opendc.compute.service.driver + +import org.opendc.compute.api.Server +import org.opendc.compute.api.ServerState /** - * An event that is emitted by a [Server]. + * Listener interface for events originating from a [Host]. */ -public sealed class ServerEvent { +public interface HostListener { /** - * The server that emitted the event. + * This method is invoked when the state of an [instance][server] on [host] changes. */ - public abstract val server: Server + public fun onStateChanged(host: Host, server: Server, newState: ServerState) {} /** - * This event is emitted when the state of [server] changes. - * - * @property server The server of which the state changed. - * @property previousState The previous state of the server. + * This method is invoked when the state of a [Host] has changed. */ - public data class StateChanged(override val server: Server, val previousState: ServerState) : ServerEvent() + public fun onStateChanged(host: Host, newState: HostState) {} } diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt new file mode 100644 index 00000000..5632a55e --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service.driver + +/** + * Describes the static machine properties of the host. + * + * @property vcpuCount The number of logical processing cores available for this host. + * @property memorySize The amount of memory available for this host in MB. + */ +public data class HostModel(public val cpuCount: Int, public val memorySize: Long) diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostState.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostState.kt new file mode 100644 index 00000000..6d85ee2d --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostState.kt @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service.driver + +/** + * The state of a host. + */ +public enum class HostState { + /** + * The host is up. + */ + UP, + + /** + * The host is down. + */ + DOWN +} diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/HypervisorAvailableEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/HypervisorAvailableEvent.kt index c1802e64..a7974062 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/HypervisorAvailableEvent.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/HypervisorAvailableEvent.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.compute.core.virt.service.events +package org.opendc.compute.service.events import org.opendc.trace.core.Event import java.util.* diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/HypervisorUnavailableEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/HypervisorUnavailableEvent.kt index 1fea21ea..75bb09ed 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/HypervisorUnavailableEvent.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/HypervisorUnavailableEvent.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.compute.core.virt.service.events +package org.opendc.compute.service.events import org.opendc.trace.core.Event import java.util.* diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmScheduledEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmScheduledEvent.kt index 662068dd..f59c74b7 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmScheduledEvent.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmScheduledEvent.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.compute.core.virt.service.events +package org.opendc.compute.service.events import org.opendc.trace.core.Event diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmStoppedEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmStoppedEvent.kt index 96103129..eaf0736b 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmStoppedEvent.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmStoppedEvent.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.compute.core.virt.service.events +package org.opendc.compute.service.events import org.opendc.trace.core.Event diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmSubmissionEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmSubmissionEvent.kt index f6b71e22..fa0a8a13 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmSubmissionEvent.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmSubmissionEvent.kt @@ -20,10 +20,10 @@ * SOFTWARE. */ -package org.opendc.compute.core.virt.service.events +package org.opendc.compute.service.events -import org.opendc.compute.core.Flavor -import org.opendc.compute.core.image.Image +import org.opendc.compute.api.Flavor +import org.opendc.compute.api.Image import org.opendc.trace.core.Event /** diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmSubmissionInvalidEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmSubmissionInvalidEvent.kt index d0e5c102..52b91616 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmSubmissionInvalidEvent.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmSubmissionInvalidEvent.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.compute.core.virt.service.events +package org.opendc.compute.service.events import org.opendc.trace.core.Event diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt new file mode 100644 index 00000000..f84b7435 --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service.internal + +import org.opendc.compute.api.Flavor +import org.opendc.compute.api.Image +import org.opendc.compute.api.Server +import org.opendc.compute.api.ServerState +import org.opendc.compute.api.ServerWatcher +import java.util.* + +/** + * A [Server] implementation that is passed to clients but delegates its implementation to another class. + */ +internal class ClientServer(private val delegate: Server) : Server, ServerWatcher { + private val watchers = mutableListOf<ServerWatcher>() + + override val uid: UUID = delegate.uid + + override var name: String = delegate.name + private set + + override var flavor: Flavor = delegate.flavor + private set + + override var image: Image = delegate.image + private set + + override var tags: Map<String, String> = delegate.tags.toMap() + private set + + override var state: ServerState = delegate.state + private set + + override fun watch(watcher: ServerWatcher) { + if (watchers.isEmpty()) { + delegate.watch(this) + } + + watchers += watcher + } + + override fun unwatch(watcher: ServerWatcher) { + watchers += watcher + + if (watchers.isEmpty()) { + delegate.unwatch(this) + } + } + + override suspend fun refresh() { + name = delegate.name + flavor = delegate.flavor + image = delegate.image + tags = delegate.tags + state = delegate.state + } + + override fun onStateChanged(server: Server, newState: ServerState) { + val watchers = watchers + + for (watcher in watchers) { + watcher.onStateChanged(this, newState) + } + } +} diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt new file mode 100644 index 00000000..69d6bb59 --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -0,0 +1,414 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service.internal + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.cancel +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.launch +import kotlinx.coroutines.suspendCancellableCoroutine +import mu.KotlinLogging +import org.opendc.compute.api.* +import org.opendc.compute.service.ComputeService +import org.opendc.compute.service.ComputeServiceEvent +import org.opendc.compute.service.driver.Host +import org.opendc.compute.service.driver.HostListener +import org.opendc.compute.service.driver.HostState +import org.opendc.compute.service.events.* +import org.opendc.compute.service.scheduler.AllocationPolicy +import org.opendc.trace.core.EventTracer +import org.opendc.utils.TimerScheduler +import org.opendc.utils.flow.EventFlow +import java.time.Clock +import java.util.* +import kotlin.coroutines.Continuation +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.resume +import kotlin.math.max + +/** + * Internal implementation of the OpenDC Compute service. + * + * @param context The [CoroutineContext] to use. + * @param clock The clock instance to keep track of time. + */ +public class ComputeServiceImpl( + private val context: CoroutineContext, + private val clock: Clock, + private val tracer: EventTracer, + private val allocationPolicy: AllocationPolicy, + private val schedulingQuantum: Long +) : ComputeService, HostListener { + /** + * The [CoroutineScope] of the service bounded by the lifecycle of the service. + */ + private val scope = CoroutineScope(context) + + /** + * The logger instance of this server. + */ + private val logger = KotlinLogging.logger {} + + /** + * The [Random] instance used to generate unique identifiers for the objects. + */ + private val random = Random(0) + + /** + * A mapping from host to host view. + */ + private val hostToView = mutableMapOf<Host, HostView>() + + /** + * The available hypervisors. + */ + private val availableHosts: MutableSet<HostView> = mutableSetOf() + + /** + * The servers that should be launched by the service. + */ + private val queue: Deque<LaunchRequest> = ArrayDeque() + + /** + * The active servers in the system. + */ + private val activeServers: MutableSet<Server> = mutableSetOf() + + public var submittedVms: Int = 0 + public var queuedVms: Int = 0 + public var runningVms: Int = 0 + public var finishedVms: Int = 0 + public var unscheduledVms: Int = 0 + + private var maxCores = 0 + private var maxMemory = 0L + + /** + * The allocation logic to use. + */ + private val allocationLogic = allocationPolicy() + + override val events: Flow<ComputeServiceEvent> + get() = _events + private val _events = EventFlow<ComputeServiceEvent>() + + /** + * The [TimerScheduler] to use for scheduling the scheduler cycles. + */ + private var scheduler: TimerScheduler<Unit> = TimerScheduler(scope, clock) + + override val hosts: Set<Host> + get() = hostToView.keys + + override val hostCount: Int + get() = hostToView.size + + override fun newClient(): ComputeClient = object : ComputeClient { + private var isClosed: Boolean = false + + override suspend fun newServer(name: String, image: Image, flavor: Flavor): Server { + check(!isClosed) { "Client is closed" } + tracer.commit(VmSubmissionEvent(name, image, flavor)) + + _events.emit( + ComputeServiceEvent.MetricsAvailable( + this@ComputeServiceImpl, + hostCount, + availableHosts.size, + ++submittedVms, + runningVms, + finishedVms, + ++queuedVms, + unscheduledVms + ) + ) + + return suspendCancellableCoroutine { cont -> + val request = LaunchRequest(createServer(name, image, flavor), cont) + queue += request + requestCycle() + } + } + + override fun close() { + isClosed = true + } + + override fun toString(): String = "ComputeClient" + } + + override fun addHost(host: Host) { + // Check if host is already known + if (host in hostToView) { + return + } + + val hv = HostView(host) + maxCores = max(maxCores, host.model.cpuCount) + maxMemory = max(maxMemory, host.model.memorySize) + hostToView[host] = hv + + if (host.state == HostState.UP) { + availableHosts += hv + } + + host.addListener(this) + } + + override fun removeHost(host: Host) { + host.removeListener(this) + } + + override fun close() { + scope.cancel() + } + + private fun createServer( + name: String, + image: Image, + flavor: Flavor + ): Server { + return ServerImpl( + uid = UUID(random.nextLong(), random.nextLong()), + name = name, + flavor = flavor, + image = image + ) + } + + private fun requestCycle() { + // Bail out in case we have already requested a new cycle. + if (scheduler.isTimerActive(Unit)) { + return + } + + // We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120). + // This is important because the slices of the VMs need to be aligned. + // We calculate here the delay until the next scheduling slot. + val delay = schedulingQuantum - (clock.millis() % schedulingQuantum) + + scheduler.startSingleTimer(Unit, delay) { + schedule() + } + } + + private fun schedule() { + while (queue.isNotEmpty()) { + val (server, cont) = queue.peekFirst() + val requiredMemory = server.flavor.memorySize + val selectedHv = allocationLogic.select(availableHosts, server) + + if (selectedHv == null || !selectedHv.host.canFit(server)) { + logger.trace { "Server $server selected for scheduling but no capacity available for it." } + + if (requiredMemory > maxMemory || server.flavor.cpuCount > maxCores) { + tracer.commit(VmSubmissionInvalidEvent(server.name)) + + _events.emit( + ComputeServiceEvent.MetricsAvailable( + this@ComputeServiceImpl, + hostCount, + availableHosts.size, + submittedVms, + runningVms, + finishedVms, + --queuedVms, + ++unscheduledVms + ) + ) + + // Remove the incoming image + queue.poll() + + logger.warn("Failed to spawn $server: does not fit [${clock.millis()}]") + continue + } else { + break + } + } + + logger.info { "[${clock.millis()}] Spawning $server on ${selectedHv.host.uid} ${selectedHv.host.name} ${selectedHv.host.model}" } + queue.poll() + + // Speculatively update the hypervisor view information to prevent other images in the queue from + // deciding on stale values. + selectedHv.numberOfActiveServers++ + selectedHv.provisionedCores += server.flavor.cpuCount + selectedHv.availableMemory -= requiredMemory // XXX Temporary hack + + scope.launch { + try { + cont.resume(ClientServer(server)) + selectedHv.host.spawn(server) + activeServers += server + + tracer.commit(VmScheduledEvent(server.name)) + _events.emit( + ComputeServiceEvent.MetricsAvailable( + this@ComputeServiceImpl, + hostCount, + availableHosts.size, + submittedVms, + ++runningVms, + finishedVms, + --queuedVms, + unscheduledVms + ) + ) + } catch (e: Throwable) { + logger.error("Failed to deploy VM", e) + + selectedHv.numberOfActiveServers-- + selectedHv.provisionedCores -= server.flavor.cpuCount + selectedHv.availableMemory += requiredMemory + } + } + } + } + + override fun onStateChanged(host: Host, newState: HostState) { + when (newState) { + HostState.UP -> { + logger.debug { "[${clock.millis()}] Host ${host.uid} state changed: $newState" } + + val hv = hostToView[host] + if (hv != null) { + // Corner case for when the hypervisor already exists + availableHosts += hv + } + + tracer.commit(HypervisorAvailableEvent(host.uid)) + + _events.emit( + ComputeServiceEvent.MetricsAvailable( + this@ComputeServiceImpl, + hostCount, + availableHosts.size, + submittedVms, + runningVms, + finishedVms, + queuedVms, + unscheduledVms + ) + ) + + // Re-schedule on the new machine + if (queue.isNotEmpty()) { + requestCycle() + } + } + HostState.DOWN -> { + logger.debug { "[${clock.millis()}] Host ${host.uid} state changed: $newState" } + + val hv = hostToView[host] ?: return + availableHosts -= hv + + tracer.commit(HypervisorUnavailableEvent(hv.uid)) + + _events.emit( + ComputeServiceEvent.MetricsAvailable( + this@ComputeServiceImpl, + hostCount, + availableHosts.size, + submittedVms, + runningVms, + finishedVms, + queuedVms, + unscheduledVms + ) + ) + + if (queue.isNotEmpty()) { + requestCycle() + } + } + } + } + + override fun onStateChanged(host: Host, server: Server, newState: ServerState) { + val serverImpl = server as ServerImpl + serverImpl.state = newState + serverImpl.watchers.forEach { it.onStateChanged(server, newState) } + + if (newState == ServerState.SHUTOFF) { + logger.info { "[${clock.millis()}] Server ${server.uid} ${server.name} ${server.flavor} finished." } + + tracer.commit(VmStoppedEvent(server.name)) + + _events.emit( + ComputeServiceEvent.MetricsAvailable( + this@ComputeServiceImpl, + hostCount, + availableHosts.size, + submittedVms, + --runningVms, + ++finishedVms, + queuedVms, + unscheduledVms + ) + ) + + activeServers -= server + val hv = hostToView[host] + if (hv != null) { + hv.provisionedCores -= server.flavor.cpuCount + hv.numberOfActiveServers-- + hv.availableMemory += server.flavor.memorySize + } else { + logger.error { "Unknown host $host" } + } + + // Try to reschedule if needed + if (queue.isNotEmpty()) { + requestCycle() + } + } + } + + public data class LaunchRequest(val server: Server, val cont: Continuation<Server>) + + private class ServerImpl( + override val uid: UUID, + override val name: String, + override val flavor: Flavor, + override val image: Image + ) : Server { + val watchers = mutableListOf<ServerWatcher>() + + override fun watch(watcher: ServerWatcher) { + watchers += watcher + } + + override fun unwatch(watcher: ServerWatcher) { + watchers -= watcher + } + + override suspend fun refresh() { + // No-op: this object is the source-of-truth + } + + override val tags: Map<String, String> = emptyMap() + + override var state: ServerState = ServerState.BUILD + } +} diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/image/EmptyImage.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt index 01f86a1b..1bdfdf1a 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/image/EmptyImage.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt @@ -20,16 +20,16 @@ * SOFTWARE. */ -package org.opendc.compute.core.image +package org.opendc.compute.service.internal -import org.opendc.core.resource.TagContainer +import org.opendc.compute.service.driver.Host import java.util.UUID -/** - * An empty boot disk [Image] that exits immediately on start. - */ -public object EmptyImage : Image { - override val uid: UUID = UUID.randomUUID() - override val name: String = "empty" - override val tags: TagContainer = emptyMap() +public class HostView(public val host: Host) { + public val uid: UUID + get() = host.uid + + public var numberOfActiveServers: Int = 0 + public var availableMemory: Long = host.model.memorySize + public var provisionedCores: Int = 0 } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/AllocationPolicy.kt index 2018b9f2..5ee4c70f 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AllocationPolicy.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/AllocationPolicy.kt @@ -20,11 +20,10 @@ * SOFTWARE. */ -package org.opendc.compute.simulator.allocation +package org.opendc.compute.service.scheduler -import org.opendc.compute.core.metal.Node -import org.opendc.compute.simulator.HypervisorView -import org.opendc.compute.simulator.SimVirtProvisioningService +import org.opendc.compute.api.Server +import org.opendc.compute.service.internal.HostView /** * A policy for selecting the [Node] an image should be deployed to, @@ -38,9 +37,9 @@ public interface AllocationPolicy { * Select the node on which the server should be scheduled. */ public fun select( - hypervisors: Set<HypervisorView>, - image: SimVirtProvisioningService.ImageView - ): HypervisorView? + hypervisors: Set<HostView>, + server: Server + ): HostView? } /** diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AvailableCoreMemoryAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/AvailableCoreMemoryAllocationPolicy.kt index 38a07b2b..ad422415 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AvailableCoreMemoryAllocationPolicy.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/AvailableCoreMemoryAllocationPolicy.kt @@ -20,9 +20,9 @@ * SOFTWARE. */ -package org.opendc.compute.simulator.allocation +package org.opendc.compute.service.scheduler -import org.opendc.compute.simulator.HypervisorView +import org.opendc.compute.service.internal.HostView /** * An [AllocationPolicy] that selects the machine with the highest/lowest amount of memory per core. @@ -31,8 +31,8 @@ import org.opendc.compute.simulator.HypervisorView */ public class AvailableCoreMemoryAllocationPolicy(private val reversed: Boolean = false) : AllocationPolicy { override fun invoke(): AllocationPolicy.Logic = object : ComparableAllocationPolicyLogic { - override val comparator: Comparator<HypervisorView> = - compareBy<HypervisorView> { -it.availableMemory / it.server.flavor.cpuCount } + override val comparator: Comparator<HostView> = + compareBy<HostView> { -it.availableMemory / it.host.model.cpuCount } .run { if (reversed) reversed() else this } } } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AvailableMemoryAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/AvailableMemoryAllocationPolicy.kt index e87abd7b..6712b8a2 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AvailableMemoryAllocationPolicy.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/AvailableMemoryAllocationPolicy.kt @@ -20,9 +20,9 @@ * SOFTWARE. */ -package org.opendc.compute.simulator.allocation +package org.opendc.compute.service.scheduler -import org.opendc.compute.simulator.HypervisorView +import org.opendc.compute.service.internal.HostView /** * Allocation policy that selects the node with the most available memory. @@ -31,7 +31,7 @@ import org.opendc.compute.simulator.HypervisorView */ public class AvailableMemoryAllocationPolicy(public val reversed: Boolean = false) : AllocationPolicy { override fun invoke(): AllocationPolicy.Logic = object : ComparableAllocationPolicyLogic { - override val comparator: Comparator<HypervisorView> = compareBy<HypervisorView> { -it.availableMemory } + override val comparator: Comparator<HostView> = compareBy<HostView> { -it.availableMemory } .run { if (reversed) reversed() else this } } } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ComparableAllocationPolicyLogic.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ComparableAllocationPolicyLogic.kt index 4470eab9..265d514d 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ComparableAllocationPolicyLogic.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ComparableAllocationPolicyLogic.kt @@ -20,10 +20,10 @@ * SOFTWARE. */ -package org.opendc.compute.simulator.allocation +package org.opendc.compute.service.scheduler -import org.opendc.compute.simulator.HypervisorView -import org.opendc.compute.simulator.SimVirtProvisioningService +import org.opendc.compute.api.Server +import org.opendc.compute.service.internal.HostView /** * The logic for an [AllocationPolicy] that uses a [Comparator] to select the appropriate node. @@ -32,18 +32,18 @@ public interface ComparableAllocationPolicyLogic : AllocationPolicy.Logic { /** * The comparator to use. */ - public val comparator: Comparator<HypervisorView> + public val comparator: Comparator<HostView> override fun select( - hypervisors: Set<HypervisorView>, - image: SimVirtProvisioningService.ImageView - ): HypervisorView? { + hypervisors: Set<HostView>, + server: Server + ): HostView? { return hypervisors.asSequence() .filter { hv -> - val fitsMemory = hv.availableMemory >= (image.flavor.memorySize) - val fitsCpu = hv.server.flavor.cpuCount >= image.flavor.cpuCount + val fitsMemory = hv.availableMemory >= (server.flavor.memorySize) + val fitsCpu = hv.host.model.cpuCount >= server.flavor.cpuCount fitsMemory && fitsCpu } - .minWithOrNull(comparator.thenBy { it.server.uid }) + .minWithOrNull(comparator.thenBy { it.host.uid }) } } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/NumberOfActiveServersAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/NumberOfActiveServersAllocationPolicy.kt index 5e2b895c..29eba782 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/NumberOfActiveServersAllocationPolicy.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/NumberOfActiveServersAllocationPolicy.kt @@ -20,9 +20,9 @@ * SOFTWARE. */ -package org.opendc.compute.simulator.allocation +package org.opendc.compute.service.scheduler -import org.opendc.compute.simulator.HypervisorView +import org.opendc.compute.service.internal.HostView /** * Allocation policy that selects the node with the least amount of active servers. @@ -31,7 +31,7 @@ import org.opendc.compute.simulator.HypervisorView */ public class NumberOfActiveServersAllocationPolicy(public val reversed: Boolean = false) : AllocationPolicy { override fun invoke(): AllocationPolicy.Logic = object : ComparableAllocationPolicyLogic { - override val comparator: Comparator<HypervisorView> = compareBy<HypervisorView> { it.numberOfActiveServers } + override val comparator: Comparator<HostView> = compareBy<HostView> { it.numberOfActiveServers } .run { if (reversed) reversed() else this } } } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ProvisionedCoresAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ProvisionedCoresAllocationPolicy.kt index 4344d979..4c196953 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ProvisionedCoresAllocationPolicy.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ProvisionedCoresAllocationPolicy.kt @@ -20,9 +20,9 @@ * SOFTWARE. */ -package org.opendc.compute.simulator.allocation +package org.opendc.compute.service.scheduler -import org.opendc.compute.simulator.HypervisorView +import org.opendc.compute.service.internal.HostView /** * An [AllocationPolicy] that takes into account the number of vCPUs that have been provisioned on this machine @@ -33,8 +33,8 @@ import org.opendc.compute.simulator.HypervisorView */ public class ProvisionedCoresAllocationPolicy(private val reversed: Boolean = false) : AllocationPolicy { override fun invoke(): AllocationPolicy.Logic = object : ComparableAllocationPolicyLogic { - override val comparator: Comparator<HypervisorView> = - compareBy<HypervisorView> { it.provisionedCores / it.server.flavor.cpuCount } + override val comparator: Comparator<HostView> = + compareBy<HostView> { it.provisionedCores / it.host.model.cpuCount } .run { if (reversed) reversed() else this } } } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/RandomAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/RandomAllocationPolicy.kt index ac34f410..3facb182 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/RandomAllocationPolicy.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/RandomAllocationPolicy.kt @@ -20,10 +20,10 @@ * SOFTWARE. */ -package org.opendc.compute.simulator.allocation +package org.opendc.compute.service.scheduler -import org.opendc.compute.simulator.HypervisorView -import org.opendc.compute.simulator.SimVirtProvisioningService +import org.opendc.compute.api.Server +import org.opendc.compute.service.internal.HostView import kotlin.random.Random /** @@ -33,13 +33,13 @@ public class RandomAllocationPolicy(private val random: Random = Random(0)) : Al @OptIn(ExperimentalStdlibApi::class) override fun invoke(): AllocationPolicy.Logic = object : AllocationPolicy.Logic { override fun select( - hypervisors: Set<HypervisorView>, - image: SimVirtProvisioningService.ImageView - ): HypervisorView? { + hypervisors: Set<HostView>, + server: Server + ): HostView? { return hypervisors.asIterable() .filter { hv -> - val fitsMemory = hv.availableMemory >= (image.image.tags["required-memory"] as Long) - val fitsCpu = hv.server.flavor.cpuCount >= image.flavor.cpuCount + val fitsMemory = hv.availableMemory >= (server.image.tags["required-memory"] as Long) + val fitsCpu = hv.host.model.cpuCount >= server.flavor.cpuCount fitsMemory && fitsCpu } .randomOrNull(random) diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ReplayAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ReplayAllocationPolicy.kt index 5312f4da..ed1dc662 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ReplayAllocationPolicy.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ReplayAllocationPolicy.kt @@ -23,8 +23,9 @@ package org.opendc.compute.simulator.allocation import mu.KotlinLogging -import org.opendc.compute.simulator.HypervisorView -import org.opendc.compute.simulator.SimVirtProvisioningService +import org.opendc.compute.api.Server +import org.opendc.compute.service.internal.HostView +import org.opendc.compute.service.scheduler.AllocationPolicy private val logger = KotlinLogging.logger {} @@ -37,15 +38,15 @@ private val logger = KotlinLogging.logger {} public class ReplayAllocationPolicy(private val vmPlacements: Map<String, String>) : AllocationPolicy { override fun invoke(): AllocationPolicy.Logic = object : AllocationPolicy.Logic { override fun select( - hypervisors: Set<HypervisorView>, - image: SimVirtProvisioningService.ImageView - ): HypervisorView? { - val clusterName = vmPlacements[image.name] - ?: throw IllegalStateException("Could not find placement data in VM placement file for VM ${image.name}") - val machinesInCluster = hypervisors.filter { it.server.name.contains(clusterName) } + hypervisors: Set<HostView>, + server: Server + ): HostView? { + val clusterName = vmPlacements[server.name] + ?: throw IllegalStateException("Could not find placement data in VM placement file for VM ${server.name}") + val machinesInCluster = hypervisors.filter { it.host.name.contains(clusterName) } if (machinesInCluster.isEmpty()) { - logger.info { "Could not find any machines belonging to cluster $clusterName for image ${image.name}, assigning randomly." } + logger.info { "Could not find any machines belonging to cluster $clusterName for image ${server.name}, assigning randomly." } return hypervisors.maxByOrNull { it.availableMemory } } diff --git a/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts b/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts index f52d0f97..d7d5f002 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts +++ b/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts @@ -30,7 +30,8 @@ plugins { dependencies { api(platform(project(":opendc-platform"))) - api(project(":opendc-compute:opendc-compute-core")) + api(project(":opendc-compute:opendc-compute-service")) + api(project(":opendc-metal")) api(project(":opendc-simulator:opendc-simulator-compute")) api(project(":opendc-simulator:opendc-simulator-failures")) implementation(project(":opendc-utils")) diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ComputeSimExecutionContext.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ComputeSimExecutionContext.kt deleted file mode 100644 index 153a86b3..00000000 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ComputeSimExecutionContext.kt +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.simulator - -import org.opendc.compute.core.Server -import org.opendc.simulator.compute.SimExecutionContext - -/** - * Extended [SimExecutionContext] in which workloads within the OpenDC Compute module run. - */ -public interface ComputeSimExecutionContext : SimExecutionContext { - /** - * The server on which the image runs. - */ - public val server: Server -} diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/HypervisorView.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/HypervisorView.kt deleted file mode 100644 index 1a79523e..00000000 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/HypervisorView.kt +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.simulator - -import org.opendc.compute.core.Server -import org.opendc.compute.core.virt.driver.VirtDriver -import java.util.UUID - -public class HypervisorView( - public val uid: UUID, - public var server: Server, - public var numberOfActiveServers: Int, - public var availableMemory: Long, - public var provisionedCores: Int -) { - public lateinit var driver: VirtDriver -} diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimBareMetalDriver.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimBareMetalDriver.kt index 1e459e6f..2405a8f9 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimBareMetalDriver.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimBareMetalDriver.kt @@ -24,31 +24,23 @@ package org.opendc.compute.simulator import kotlinx.coroutines.* import kotlinx.coroutines.flow.Flow -import org.opendc.compute.core.Flavor -import org.opendc.compute.core.Server -import org.opendc.compute.core.ServerEvent -import org.opendc.compute.core.ServerState -import org.opendc.compute.core.image.EmptyImage -import org.opendc.compute.core.image.Image -import org.opendc.compute.core.metal.Node -import org.opendc.compute.core.metal.NodeEvent -import org.opendc.compute.core.metal.NodeState -import org.opendc.compute.core.metal.driver.BareMetalDriver +import org.opendc.compute.api.Flavor +import org.opendc.compute.api.Image import org.opendc.compute.simulator.power.api.CpuPowerModel import org.opendc.compute.simulator.power.api.Powerable import org.opendc.compute.simulator.power.models.ConstantPowerModel -import org.opendc.core.services.ServiceRegistry +import org.opendc.metal.Node +import org.opendc.metal.NodeEvent +import org.opendc.metal.NodeState +import org.opendc.metal.driver.BareMetalDriver import org.opendc.simulator.compute.SimBareMetalMachine -import org.opendc.simulator.compute.SimExecutionContext import org.opendc.simulator.compute.SimMachineModel -import org.opendc.simulator.compute.workload.SimResourceCommand import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.failures.FailureDomain import org.opendc.utils.flow.EventFlow import org.opendc.utils.flow.StateFlow import java.time.Clock import java.util.UUID -import kotlin.random.Random /** * A basic implementation of the [BareMetalDriver] that simulates an [Image] running on a bare-metal machine. @@ -88,7 +80,7 @@ public class SimBareMetalDriver( * The machine state. */ private val nodeState = - StateFlow(Node(uid, name, metadata + ("driver" to this), NodeState.SHUTOFF, EmptyImage, null, events)) + StateFlow(Node(uid, name, metadata + ("driver" to this), NodeState.SHUTOFF, flavor, Image.EMPTY, events)) /** * The [SimBareMetalMachine] we use to run the workload. @@ -103,20 +95,10 @@ public class SimBareMetalDriver( override val powerDraw: Flow<Double> = cpuPowerModel.getPowerDraw(this) /** - * The internal random instance. - */ - private val random = Random(uid.leastSignificantBits xor uid.mostSignificantBits) - - /** * The [Job] that runs the simulated workload. */ private var job: Job? = null - /** - * The event stream to publish to for the server. - */ - private var serverEvents: EventFlow<ServerEvent>? = null - override suspend fun init(): Node { return nodeState.value } @@ -127,51 +109,13 @@ public class SimBareMetalDriver( return node } - val events = EventFlow<ServerEvent>() - serverEvents = events - val server = Server( - UUID(random.nextLong(), random.nextLong()), - node.name, - emptyMap(), - flavor, - node.image, - ServerState.BUILD, - ServiceRegistry().put(BareMetalDriver, this@SimBareMetalDriver), - events - ) - - val delegate = (node.image as SimWorkloadImage).workload - // Wrap the workload to pass in a ComputeSimExecutionContext - val workload = object : SimWorkload { - lateinit var wrappedCtx: ComputeSimExecutionContext - - override fun onStart(ctx: SimExecutionContext) { - wrappedCtx = object : ComputeSimExecutionContext, SimExecutionContext by ctx { - override val server: Server - get() = nodeState.value.server!! - - override fun toString(): String = "WrappedSimExecutionContext" - } - - delegate.onStart(wrappedCtx) - } - - override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand { - return delegate.onStart(wrappedCtx, cpu) - } - - override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand { - return delegate.onNext(wrappedCtx, cpu, remainingWork) - } - - override fun toString(): String = "SimWorkloadWrapper(delegate=$delegate)" - } + val workload = node.image.tags["workload"] as SimWorkload job = coroutineScope.launch { delay(1) // TODO Introduce boot time initMachine() try { - machine.run(workload) + machine.run(workload, mapOf("driver" to this@SimBareMetalDriver, "node" to node)) exitMachine(null) } catch (_: CancellationException) { // Ignored @@ -180,31 +124,21 @@ public class SimBareMetalDriver( } } - setNode(node.copy(state = NodeState.BOOT, server = server)) + setNode(node.copy(state = NodeState.BOOT)) return nodeState.value } private fun initMachine() { - val server = nodeState.value.server?.copy(state = ServerState.ACTIVE) - setNode(nodeState.value.copy(state = NodeState.ACTIVE, server = server)) + setNode(nodeState.value.copy(state = NodeState.ACTIVE)) } private fun exitMachine(cause: Throwable?) { - val newServerState = - if (cause == null) - ServerState.SHUTOFF - else - ServerState.ERROR val newNodeState = if (cause == null) - nodeState.value.state + NodeState.SHUTOFF else NodeState.ERROR - val server = nodeState.value.server?.copy(state = newServerState) - setNode(nodeState.value.copy(state = newNodeState, server = server)) - - serverEvents?.close() - serverEvents = null + setNode(nodeState.value.copy(state = newNodeState)) } override suspend fun stop(): Node { @@ -214,7 +148,7 @@ public class SimBareMetalDriver( } job?.cancelAndJoin() - setNode(node.copy(state = NodeState.SHUTOFF, server = null)) + setNode(node.copy(state = NodeState.SHUTOFF)) return node } @@ -236,13 +170,6 @@ public class SimBareMetalDriver( events.emit(NodeEvent.StateChanged(value, field.state)) } - val oldServer = field.server - val newServer = value.server - - if (oldServer != null && newServer != null && oldServer.state != newServer.state) { - serverEvents?.emit(ServerEvent.StateChanged(newServer, oldServer.state)) - } - nodeState.value = value } @@ -250,13 +177,11 @@ public class SimBareMetalDriver( get() = coroutineScope override suspend fun fail() { - val server = nodeState.value.server?.copy(state = ServerState.ERROR) - setNode(nodeState.value.copy(state = NodeState.ERROR, server = server)) + setNode(nodeState.value.copy(state = NodeState.ERROR)) } override suspend fun recover() { - val server = nodeState.value.server?.copy(state = ServerState.ACTIVE) - setNode(nodeState.value.copy(state = NodeState.ACTIVE, server = server)) + setNode(nodeState.value.copy(state = NodeState.ACTIVE)) } override fun toString(): String = "SimBareMetalDriver(node = ${nodeState.value.uid})" diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt new file mode 100644 index 00000000..fd547d3d --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -0,0 +1,303 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.Flow +import mu.KotlinLogging +import org.opendc.compute.api.Flavor +import org.opendc.compute.api.Server +import org.opendc.compute.api.ServerState +import org.opendc.compute.service.driver.* +import org.opendc.metal.Node +import org.opendc.simulator.compute.* +import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL +import org.opendc.simulator.compute.interference.PerformanceInterferenceModel +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.workload.SimResourceCommand +import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.utils.flow.EventFlow +import java.util.* +import kotlin.coroutines.resume + +/** + * A [Host] that is simulates virtual machines on a physical machine using [SimHypervisor]. + */ +public class SimHost( + public val node: Node, + private val coroutineScope: CoroutineScope, + hypervisor: SimHypervisorProvider +) : Host, SimWorkload { + /** + * The logger instance of this server. + */ + private val logger = KotlinLogging.logger {} + + /** + * The execution context in which the [Host] runs. + */ + private lateinit var ctx: SimExecutionContext + + override val events: Flow<HostEvent> + get() = _events + internal val _events = EventFlow<HostEvent>() + + /** + * The event listeners registered with this host. + */ + private val listeners = mutableListOf<HostListener>() + + /** + * Current total memory use of the images on this hypervisor. + */ + private var availableMemory: Long = 0 + + /** + * The hypervisor to run multiple workloads. + */ + private val hypervisor = hypervisor.create( + object : SimHypervisor.Listener { + override fun onSliceFinish( + hypervisor: SimHypervisor, + requestedWork: Long, + grantedWork: Long, + overcommittedWork: Long, + interferedWork: Long, + cpuUsage: Double, + cpuDemand: Double + ) { + _events.emit( + HostEvent.SliceFinished( + this@SimHost, + requestedWork, + grantedWork, + overcommittedWork, + interferedWork, + cpuUsage, + cpuDemand, + guests.size + ) + ) + } + } + ) + + /** + * The virtual machines running on the hypervisor. + */ + private val guests = HashMap<Server, Guest>() + + override val uid: UUID + get() = node.uid + + override val name: String + get() = node.name + + override val model: HostModel + get() = HostModel(node.flavor.cpuCount, node.flavor.memorySize) + + override val state: HostState + get() = _state + private var _state: HostState = HostState.UP + set(value) { + listeners.forEach { it.onStateChanged(this, value) } + field = value + } + + override fun canFit(server: Server): Boolean { + val sufficientMemory = availableMemory > server.flavor.memorySize + val enoughCpus = ctx.machine.cpus.size >= server.flavor.cpuCount + val canFit = hypervisor.canFit(server.flavor.toMachineModel()) + + return sufficientMemory && enoughCpus && canFit + } + + override suspend fun spawn(server: Server, start: Boolean) { + // Return if the server already exists on this host + if (server in this) { + return + } + + require(canFit(server)) { "Server does not fit" } + val guest = Guest(server, hypervisor.createMachine(server.flavor.toMachineModel())) + guests[server] = guest + + if (start) { + guest.start() + } + + _events.emit(HostEvent.VmsUpdated(this, guests.count { it.value.state == ServerState.ACTIVE }, availableMemory)) + } + + override fun contains(server: Server): Boolean { + return server in guests + } + + override suspend fun start(server: Server) { + val guest = requireNotNull(guests[server]) { "Unknown server ${server.uid} at host $uid" } + guest.start() + } + + override suspend fun stop(server: Server) { + val guest = requireNotNull(guests[server]) { "Unknown server ${server.uid} at host $uid" } + guest.stop() + } + + override suspend fun terminate(server: Server) { + val guest = guests.remove(server) ?: return + guest.terminate() + } + + override fun addListener(listener: HostListener) { + listeners.add(listener) + } + + override fun removeListener(listener: HostListener) { + listeners.remove(listener) + } + + /** + * Convert flavor to machine model. + */ + private fun Flavor.toMachineModel(): SimMachineModel { + val originalCpu = ctx.machine.cpus[0] + val processingNode = originalCpu.node.copy(coreCount = cpuCount) + val processingUnits = (0 until cpuCount).map { originalCpu.copy(id = it, node = processingNode) } + val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize)) + + return SimMachineModel(processingUnits, memoryUnits) + } + + private fun onGuestStart(vm: Guest) { + guests.forEach { _, guest -> + if (guest.state == ServerState.ACTIVE) { + vm.performanceInterferenceModel?.onStart(vm.server.image.name) + } + } + + listeners.forEach { it.onStateChanged(this, vm.server, vm.state) } + } + + private fun onGuestStop(vm: Guest) { + guests.forEach { _, guest -> + if (guest.state == ServerState.ACTIVE) { + vm.performanceInterferenceModel?.onStop(vm.server.image.name) + } + } + + listeners.forEach { it.onStateChanged(this, vm.server, vm.state) } + + _events.emit(HostEvent.VmsUpdated(this@SimHost, guests.count { it.value.state == ServerState.ACTIVE }, availableMemory)) + } + + /** + * A virtual machine instance that the driver manages. + */ + private inner class Guest(val server: Server, val machine: SimMachine) { + val performanceInterferenceModel: PerformanceInterferenceModel? = server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? + + var state: ServerState = ServerState.SHUTOFF + + suspend fun start() { + when (state) { + ServerState.SHUTOFF -> { + logger.info { "User requested to start server ${server.uid}" } + launch() + } + ServerState.ACTIVE -> return + else -> assert(false) { "Invalid state transition" } + } + } + + suspend fun stop() { + when (state) { + ServerState.ACTIVE, ServerState.ERROR -> { + val job = job ?: throw IllegalStateException("Server should be active") + job.cancel() + job.join() + } + ServerState.SHUTOFF -> return + else -> assert(false) { "Invalid state transition" } + } + } + + suspend fun terminate() { + stop() + } + + private var job: Job? = null + + private suspend fun launch() = suspendCancellableCoroutine<Unit> { cont -> + assert(job == null) { "Concurrent job running" } + val workload = server.image.tags["workload"] as SimWorkload + + val job = coroutineScope.launch { + delay(1) // TODO Introduce boot time + init() + cont.resume(Unit) + try { + machine.run(workload, mapOf("driver" to this@SimHost, "server" to server)) + exit(null) + } catch (cause: Throwable) { + exit(cause) + } finally { + machine.close() + } + } + this.job = job + job.invokeOnCompletion { + this.job = null + } + } + + private fun init() { + state = ServerState.ACTIVE + onGuestStart(this) + } + + private fun exit(cause: Throwable?) { + state = + if (cause == null) + ServerState.SHUTOFF + else + ServerState.ERROR + + availableMemory += server.flavor.memorySize + onGuestStop(this) + } + } + + override fun onStart(ctx: SimExecutionContext) { + this.ctx = ctx + this.availableMemory = ctx.machine.memory.map { it.size }.sum() + this.hypervisor.onStart(ctx) + } + + override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand { + return hypervisor.onStart(ctx, cpu) + } + + override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand { + return hypervisor.onNext(ctx, cpu, remainingWork) + } +} diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHostProvisioner.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHostProvisioner.kt new file mode 100644 index 00000000..bb03777b --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHostProvisioner.kt @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator + +import kotlinx.coroutines.* +import org.opendc.compute.api.Image +import org.opendc.compute.service.driver.Host +import org.opendc.metal.Node +import org.opendc.metal.service.ProvisioningService +import org.opendc.simulator.compute.SimHypervisorProvider +import kotlin.coroutines.CoroutineContext + +/** + * A helper class to provision [SimHost]s on top of bare-metal machines using the [ProvisioningService]. + * + * @param context The [CoroutineContext] to use. + * @param metal The [ProvisioningService] to use. + * @param hypervisor The type of hypervisor to use. + */ +public class SimHostProvisioner( + private val context: CoroutineContext, + private val metal: ProvisioningService, + private val hypervisor: SimHypervisorProvider +) : AutoCloseable { + /** + * The [CoroutineScope] of the service bounded by the lifecycle of the service. + */ + private val scope = CoroutineScope(context) + + /** + * Provision all machines with a host. + */ + public suspend fun provisionAll(): List<Host> = coroutineScope { + metal.nodes().map { node -> async { provision(node) } }.awaitAll() + } + + /** + * Provision the specified [Node]. + */ + public suspend fun provision(node: Node): Host = coroutineScope { + val host = SimHost(node, scope, hypervisor) + metal.deploy(node, Image(node.uid, node.name, mapOf("workload" to host))) + host + } + + override fun close() { + scope.cancel() + } +} diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt deleted file mode 100644 index d7a8a8b2..00000000 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt +++ /dev/null @@ -1,253 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.simulator - -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.launch -import org.opendc.compute.core.* -import org.opendc.compute.core.image.Image -import org.opendc.compute.core.virt.HypervisorEvent -import org.opendc.compute.core.virt.driver.InsufficientMemoryOnServerException -import org.opendc.compute.core.virt.driver.VirtDriver -import org.opendc.core.services.ServiceRegistry -import org.opendc.simulator.compute.* -import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel -import org.opendc.simulator.compute.model.MemoryUnit -import org.opendc.simulator.compute.workload.SimResourceCommand -import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.utils.flow.EventFlow -import java.util.* - -/** - * A [VirtDriver] that is simulates virtual machines on a physical machine using [SimHypervisor]. - */ -public class SimVirtDriver(private val coroutineScope: CoroutineScope, hypervisor: SimHypervisorProvider) : VirtDriver, SimWorkload { - /** - * The execution context in which the [VirtDriver] runs. - */ - private lateinit var ctx: ComputeSimExecutionContext - - /** - * The server hosting this hypervisor. - */ - public val server: Server - get() = ctx.server - - /** - * The [EventFlow] to emit the events. - */ - internal val eventFlow = EventFlow<HypervisorEvent>() - - override val events: Flow<HypervisorEvent> = eventFlow - - /** - * Current total memory use of the images on this hypervisor. - */ - private var availableMemory: Long = 0 - - /** - * The hypervisor to run multiple workloads. - */ - private val hypervisor = hypervisor.create( - object : SimHypervisor.Listener { - override fun onSliceFinish( - hypervisor: SimHypervisor, - requestedWork: Long, - grantedWork: Long, - overcommittedWork: Long, - interferedWork: Long, - cpuUsage: Double, - cpuDemand: Double - ) { - eventFlow.emit( - HypervisorEvent.SliceFinished( - this@SimVirtDriver, - requestedWork, - grantedWork, - overcommittedWork, - interferedWork, - cpuUsage, - cpuDemand, - vms.size, - ctx.server - ) - ) - } - } - ) - - /** - * The virtual machines running on the hypervisor. - */ - private val vms = HashSet<VirtualMachine>() - - override fun canFit(flavor: Flavor): Boolean { - val sufficientMemory = availableMemory > flavor.memorySize - val enoughCpus = ctx.machine.cpus.size >= flavor.cpuCount - val canFit = hypervisor.canFit(flavor.toMachineModel()) - - return sufficientMemory && enoughCpus && canFit - } - - override suspend fun spawn(name: String, image: Image, flavor: Flavor): Server { - val requiredMemory = flavor.memorySize - if (availableMemory - requiredMemory < 0) { - throw InsufficientMemoryOnServerException() - } - require(flavor.cpuCount <= ctx.machine.cpus.size) { "Machine does not fit" } - - val events = EventFlow<ServerEvent>() - val server = Server( - UUID.randomUUID(), - name, - emptyMap(), - flavor, - image, - ServerState.BUILD, - ServiceRegistry(), - events - ) - availableMemory -= requiredMemory - - val vm = VirtualMachine(server, events, hypervisor.createMachine(flavor.toMachineModel())) - vms.add(vm) - vmStarted(vm) - eventFlow.emit(HypervisorEvent.VmsUpdated(this, vms.size, availableMemory)) - return server - } - - /** - * Convert flavor to machine model. - */ - private fun Flavor.toMachineModel(): SimMachineModel { - val originalCpu = ctx.machine.cpus[0] - val processingNode = originalCpu.node.copy(coreCount = cpuCount) - val processingUnits = (0 until cpuCount).map { originalCpu.copy(id = it, node = processingNode) } - val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize)) - - return SimMachineModel(processingUnits, memoryUnits) - } - - private fun vmStarted(vm: VirtualMachine) { - vms.forEach { it -> - vm.performanceInterferenceModel?.onStart(it.server.image.name) - } - } - - private fun vmStopped(vm: VirtualMachine) { - vms.forEach { it -> - vm.performanceInterferenceModel?.onStop(it.server.image.name) - } - } - - /** - * A virtual machine instance that the driver manages. - */ - private inner class VirtualMachine(server: Server, val events: EventFlow<ServerEvent>, val machine: SimMachine) { - val performanceInterferenceModel: PerformanceInterferenceModel? = server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? - - val job = coroutineScope.launch { - val delegate = (server.image as SimWorkloadImage).workload - // Wrap the workload to pass in a ComputeSimExecutionContext - val workload = object : SimWorkload { - lateinit var wrappedCtx: ComputeSimExecutionContext - - override fun onStart(ctx: SimExecutionContext) { - wrappedCtx = object : ComputeSimExecutionContext, SimExecutionContext by ctx { - override val server: Server - get() = server - - override fun toString(): String = "WrappedSimExecutionContext" - } - - delegate.onStart(wrappedCtx) - } - - override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand { - return delegate.onStart(wrappedCtx, cpu) - } - - override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand { - return delegate.onNext(wrappedCtx, cpu, remainingWork) - } - - override fun toString(): String = "SimWorkloadWrapper(delegate=$delegate)" - } - - delay(1) // TODO Introduce boot time - init() - try { - machine.run(workload) - exit(null) - } catch (cause: Throwable) { - exit(cause) - } finally { - machine.close() - } - } - - var server: Server = server - set(value) { - if (field.state != value.state) { - events.emit(ServerEvent.StateChanged(value, field.state)) - } - - field = value - } - - private fun init() { - server = server.copy(state = ServerState.ACTIVE) - } - - private fun exit(cause: Throwable?) { - val serverState = - if (cause == null) - ServerState.SHUTOFF - else - ServerState.ERROR - server = server.copy(state = serverState) - availableMemory += server.flavor.memorySize - vms.remove(this) - vmStopped(this) - eventFlow.emit(HypervisorEvent.VmsUpdated(this@SimVirtDriver, vms.size, availableMemory)) - events.close() - } - } - - override fun onStart(ctx: SimExecutionContext) { - this.ctx = ctx as ComputeSimExecutionContext - this.availableMemory = ctx.machine.memory.map { it.size }.sum() - this.hypervisor.onStart(ctx) - } - - override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand { - return hypervisor.onStart(ctx, cpu) - } - - override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand { - return hypervisor.onNext(ctx, cpu, remainingWork) - } -} diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt deleted file mode 100644 index defea888..00000000 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt +++ /dev/null @@ -1,418 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.simulator - -import kotlinx.coroutines.* -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach -import mu.KotlinLogging -import org.opendc.compute.core.Flavor -import org.opendc.compute.core.Server -import org.opendc.compute.core.ServerEvent -import org.opendc.compute.core.ServerState -import org.opendc.compute.core.image.Image -import org.opendc.compute.core.metal.service.ProvisioningService -import org.opendc.compute.core.virt.HypervisorEvent -import org.opendc.compute.core.virt.driver.InsufficientMemoryOnServerException -import org.opendc.compute.core.virt.driver.VirtDriver -import org.opendc.compute.core.virt.service.VirtProvisioningEvent -import org.opendc.compute.core.virt.service.VirtProvisioningService -import org.opendc.compute.core.virt.service.events.* -import org.opendc.compute.simulator.allocation.AllocationPolicy -import org.opendc.simulator.compute.SimHypervisorProvider -import org.opendc.trace.core.EventTracer -import org.opendc.utils.TimerScheduler -import org.opendc.utils.flow.EventFlow -import java.time.Clock -import java.util.* -import kotlin.coroutines.Continuation -import kotlin.coroutines.resume -import kotlin.math.max - -@OptIn(ExperimentalCoroutinesApi::class) -public class SimVirtProvisioningService( - private val coroutineScope: CoroutineScope, - private val clock: Clock, - private val provisioningService: ProvisioningService, - public val allocationPolicy: AllocationPolicy, - private val tracer: EventTracer, - private val hypervisor: SimHypervisorProvider, - private val schedulingQuantum: Long = 300000, // 5 minutes in milliseconds -) : VirtProvisioningService { - /** - * The logger instance to use. - */ - private val logger = KotlinLogging.logger {} - - /** - * The hypervisors that have been launched by the service. - */ - private val hypervisors: MutableMap<Server, HypervisorView> = mutableMapOf() - - /** - * The available hypervisors. - */ - private val availableHypervisors: MutableSet<HypervisorView> = mutableSetOf() - - /** - * The incoming images to be processed by the provisioner. - */ - private val incomingImages: Deque<ImageView> = ArrayDeque() - - /** - * The active images in the system. - */ - private val activeImages: MutableSet<ImageView> = mutableSetOf() - - public var submittedVms: Int = 0 - public var queuedVms: Int = 0 - public var runningVms: Int = 0 - public var finishedVms: Int = 0 - public var unscheduledVms: Int = 0 - - private var maxCores = 0 - private var maxMemory = 0L - - /** - * The allocation logic to use. - */ - private val allocationLogic = allocationPolicy() - - /** - * The [EventFlow] to emit the events. - */ - internal val eventFlow = EventFlow<VirtProvisioningEvent>() - - override val events: Flow<VirtProvisioningEvent> = eventFlow - - /** - * The [TimerScheduler] to use for scheduling the scheduler cycles. - */ - private var scheduler: TimerScheduler<Unit> = TimerScheduler(coroutineScope, clock) - - init { - coroutineScope.launch { - val provisionedNodes = provisioningService.nodes() - provisionedNodes.forEach { node -> - val workload = SimVirtDriver(coroutineScope, hypervisor) - val hypervisorImage = SimWorkloadImage(UUID.randomUUID(), "vmm", emptyMap(), workload) - launch { - var init = false - val deployedNode = provisioningService.deploy(node, hypervisorImage) - val server = deployedNode.server!! - server.events.onEach { event -> - when (event) { - is ServerEvent.StateChanged -> { - if (!init) { - init = true - } - stateChanged(event.server) - } - } - }.launchIn(this) - - delay(1) - onHypervisorAvailable(server, workload) - } - } - } - } - - override suspend fun drivers(): Set<VirtDriver> { - return availableHypervisors.map { it.driver }.toSet() - } - - override val hostCount: Int = hypervisors.size - - override suspend fun deploy( - name: String, - image: Image, - flavor: Flavor - ): Server { - tracer.commit(VmSubmissionEvent(name, image, flavor)) - - eventFlow.emit( - VirtProvisioningEvent.MetricsAvailable( - this@SimVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, - ++submittedVms, - runningVms, - finishedVms, - ++queuedVms, - unscheduledVms - ) - ) - - return suspendCancellableCoroutine<Server> { cont -> - val vmInstance = ImageView(name, image, flavor, cont) - incomingImages += vmInstance - requestCycle() - } - } - - override suspend fun terminate() { - val provisionedNodes = provisioningService.nodes() - provisionedNodes.forEach { node -> provisioningService.stop(node) } - } - - private fun requestCycle() { - // Bail out in case we have already requested a new cycle. - if (scheduler.isTimerActive(Unit)) { - return - } - - // We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120). - // This is important because the slices of the VMs need to be aligned. - // We calculate here the delay until the next scheduling slot. - val delay = schedulingQuantum - (clock.millis() % schedulingQuantum) - - scheduler.startSingleTimer(Unit, delay) { - coroutineScope.launch { schedule() } - } - } - - private suspend fun schedule() { - while (incomingImages.isNotEmpty()) { - val imageInstance = incomingImages.peekFirst() - val requiredMemory = imageInstance.flavor.memorySize - val selectedHv = allocationLogic.select(availableHypervisors, imageInstance) - - if (selectedHv == null || !selectedHv.driver.canFit(imageInstance.flavor)) { - logger.trace { "Image ${imageInstance.image} selected for scheduling but no capacity available for it." } - - if (requiredMemory > maxMemory || imageInstance.flavor.cpuCount > maxCores) { - tracer.commit(VmSubmissionInvalidEvent(imageInstance.name)) - - eventFlow.emit( - VirtProvisioningEvent.MetricsAvailable( - this@SimVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, - submittedVms, - runningVms, - finishedVms, - --queuedVms, - ++unscheduledVms - ) - ) - - // Remove the incoming image - incomingImages.poll() - - logger.warn("Failed to spawn ${imageInstance.image}: does not fit [${clock.millis()}]") - continue - } else { - break - } - } - - try { - logger.info { "[${clock.millis()}] Spawning ${imageInstance.image} on ${selectedHv.server.uid} ${selectedHv.server.name} ${selectedHv.server.flavor}" } - incomingImages.poll() - - // Speculatively update the hypervisor view information to prevent other images in the queue from - // deciding on stale values. - selectedHv.numberOfActiveServers++ - selectedHv.provisionedCores += imageInstance.flavor.cpuCount - selectedHv.availableMemory -= requiredMemory // XXX Temporary hack - - val server = selectedHv.driver.spawn( - imageInstance.name, - imageInstance.image, - imageInstance.flavor - ) - imageInstance.server = server - imageInstance.continuation.resume(server) - - tracer.commit(VmScheduledEvent(imageInstance.name)) - - eventFlow.emit( - VirtProvisioningEvent.MetricsAvailable( - this@SimVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, - submittedVms, - ++runningVms, - finishedVms, - --queuedVms, - unscheduledVms - ) - ) - activeImages += imageInstance - - server.events - .onEach { event -> - when (event) { - is ServerEvent.StateChanged -> { - if (event.server.state == ServerState.SHUTOFF) { - logger.info { "[${clock.millis()}] Server ${event.server.uid} ${event.server.name} ${event.server.flavor} finished." } - - tracer.commit(VmStoppedEvent(event.server.name)) - - eventFlow.emit( - VirtProvisioningEvent.MetricsAvailable( - this@SimVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, - submittedVms, - --runningVms, - ++finishedVms, - queuedVms, - unscheduledVms - ) - ) - - activeImages -= imageInstance - selectedHv.provisionedCores -= server.flavor.cpuCount - - // Try to reschedule if needed - if (incomingImages.isNotEmpty()) { - requestCycle() - } - } - } - } - } - .launchIn(coroutineScope) - } catch (e: InsufficientMemoryOnServerException) { - logger.error("Failed to deploy VM", e) - - selectedHv.numberOfActiveServers-- - selectedHv.provisionedCores -= imageInstance.flavor.cpuCount - selectedHv.availableMemory += requiredMemory - } catch (e: Throwable) { - logger.error("Failed to deploy VM", e) - } - } - } - - private fun stateChanged(server: Server) { - when (server.state) { - ServerState.ACTIVE -> { - logger.debug { "[${clock.millis()}] Server ${server.uid} available: ${server.state}" } - - if (server in hypervisors) { - // Corner case for when the hypervisor already exists - availableHypervisors += hypervisors.getValue(server) - } else { - val hv = HypervisorView( - server.uid, - server, - 0, - server.flavor.memorySize, - 0 - ) - maxCores = max(maxCores, server.flavor.cpuCount) - maxMemory = max(maxMemory, server.flavor.memorySize) - hypervisors[server] = hv - } - - tracer.commit(HypervisorAvailableEvent(server.uid)) - - eventFlow.emit( - VirtProvisioningEvent.MetricsAvailable( - this@SimVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, - submittedVms, - runningVms, - finishedVms, - queuedVms, - unscheduledVms - ) - ) - - // Re-schedule on the new machine - if (incomingImages.isNotEmpty()) { - requestCycle() - } - } - ServerState.SHUTOFF, ServerState.ERROR -> { - logger.debug { "[${clock.millis()}] Server ${server.uid} unavailable: ${server.state}" } - val hv = hypervisors[server] ?: return - availableHypervisors -= hv - - tracer.commit(HypervisorUnavailableEvent(hv.uid)) - - eventFlow.emit( - VirtProvisioningEvent.MetricsAvailable( - this@SimVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, - submittedVms, - runningVms, - finishedVms, - queuedVms, - unscheduledVms - ) - ) - - if (incomingImages.isNotEmpty()) { - requestCycle() - } - } - else -> throw IllegalStateException() - } - } - - private fun onHypervisorAvailable(server: Server, hypervisor: SimVirtDriver) { - val hv = hypervisors[server] ?: return - hv.driver = hypervisor - availableHypervisors += hv - - tracer.commit(HypervisorAvailableEvent(hv.uid)) - - eventFlow.emit( - VirtProvisioningEvent.MetricsAvailable( - this@SimVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, - submittedVms, - runningVms, - finishedVms, - queuedVms, - unscheduledVms - ) - ) - - hv.driver.events - .onEach { event -> - if (event is HypervisorEvent.VmsUpdated) { - hv.numberOfActiveServers = event.numberOfActiveServers - hv.availableMemory = event.availableMemory - } - }.launchIn(coroutineScope) - - requestCycle() - } - - public data class ImageView( - public val name: String, - public val image: Image, - public val flavor: Flavor, - public val continuation: Continuation<Server>, - public var server: Server? = null - ) -} diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimWorkloadImage.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimWorkloadImage.kt deleted file mode 100644 index b48de1d5..00000000 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimWorkloadImage.kt +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.simulator - -import org.opendc.compute.core.image.Image -import org.opendc.core.resource.TagContainer -import org.opendc.simulator.compute.workload.SimWorkload -import java.util.* - -/** - * An application [Image] that runs a [SimWorkload]. - * - * @property uid The unique identifier of this image. - * @property name The name of this image. - * @property tags The tags attached to the image. - * @property workload The workload to run for this image. - */ -public data class SimWorkloadImage( - public override val uid: UUID, - public override val name: String, - public override val tags: TagContainer, - public val workload: SimWorkload -) : Image diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/power/api/CpuPowerModel.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/power/api/CpuPowerModel.kt index ee9e130b..0141bc8c 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/power/api/CpuPowerModel.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/power/api/CpuPowerModel.kt @@ -2,7 +2,7 @@ package org.opendc.compute.simulator.power.api import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.map -import org.opendc.compute.core.metal.driver.BareMetalDriver +import org.opendc.metal.driver.BareMetalDriver public interface CpuPowerModel { /** diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/power/models/ZeroIdlePowerDecorator.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/power/models/ZeroIdlePowerDecorator.kt index 938e5607..b0c3fa4c 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/power/models/ZeroIdlePowerDecorator.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/power/models/ZeroIdlePowerDecorator.kt @@ -5,10 +5,10 @@ import org.opendc.compute.simulator.power.api.CpuPowerModel /** * A decorator for ignoring the idle power when computing energy consumption of components. * - * @param cpuModelWrappee The wrappe of a [CpuPowerModel]. + * @param delegate The [CpuPowerModel] to delegate to. */ -public class ZeroIdlePowerDecorator(private val cpuModelWrappee: CpuPowerModel) : CpuPowerModel { +public class ZeroIdlePowerDecorator(private val delegate: CpuPowerModel) : CpuPowerModel { override fun computeCpuPower(cpuUtil: Double): Double { - return if (cpuUtil == 0.0) 0.0 else cpuModelWrappee.computeCpuPower(cpuUtil) + return if (cpuUtil == 0.0) 0.0 else delegate.computeCpuPower(cpuUtil) } } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimBareMetalDriverTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimBareMetalDriverTest.kt index fb8a5f47..0d90376e 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimBareMetalDriverTest.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimBareMetalDriverTest.kt @@ -30,8 +30,10 @@ import kotlinx.coroutines.withContext import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test -import org.opendc.compute.core.ServerEvent -import org.opendc.compute.core.ServerState +import org.junit.jupiter.api.assertAll +import org.opendc.compute.api.Image +import org.opendc.metal.NodeEvent +import org.opendc.metal.NodeState import org.opendc.simulator.compute.SimMachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode @@ -59,22 +61,21 @@ internal class SimBareMetalDriverTest { val testScope = TestCoroutineScope() val clock = DelayControllerClockAdapter(testScope) - var finalState: ServerState = ServerState.BUILD + var finalState: NodeState = NodeState.UNKNOWN var finalTime = 0L testScope.launch { val driver = SimBareMetalDriver(this, clock, UUID.randomUUID(), "test", emptyMap(), machineModel) - val image = SimWorkloadImage(UUID.randomUUID(), "<unnamed>", emptyMap(), SimFlopsWorkload(4_000, utilization = 1.0)) - + val image = Image(UUID.randomUUID(), "<unnamed>", mapOf("workload" to SimFlopsWorkload(4_000, utilization = 1.0))) // Batch driver commands withContext(coroutineContext) { driver.init() driver.setImage(image) - val server = driver.start().server!! - server.events.collect { event -> + val node = driver.start() + node.events.collect { event -> when (event) { - is ServerEvent.StateChanged -> { - finalState = event.server.state + is NodeEvent.StateChanged -> { + finalState = event.node.state finalTime = clock.millis() } } @@ -83,7 +84,9 @@ internal class SimBareMetalDriverTest { } testScope.advanceUntilIdle() - assertEquals(ServerState.SHUTOFF, finalState) - assertEquals(501, finalTime) + assertAll( + { assertEquals(NodeState.SHUTOFF, finalState) }, + { assertEquals(501, finalTime) } + ) } } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimVirtDriverTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index 1831eae0..61bff39f 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimVirtDriverTest.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -24,6 +24,7 @@ package org.opendc.compute.simulator import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.emptyFlow import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch @@ -32,8 +33,14 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll -import org.opendc.compute.core.Flavor -import org.opendc.compute.core.virt.HypervisorEvent +import org.opendc.compute.api.Flavor +import org.opendc.compute.api.Image +import org.opendc.compute.api.Server +import org.opendc.compute.api.ServerState +import org.opendc.compute.api.ServerWatcher +import org.opendc.compute.service.driver.HostEvent +import org.opendc.metal.Node +import org.opendc.metal.NodeState import org.opendc.simulator.compute.SimFairShareHypervisorProvider import org.opendc.simulator.compute.SimMachineModel import org.opendc.simulator.compute.model.MemoryUnit @@ -48,7 +55,7 @@ import java.util.UUID * Basic test-suite for the hypervisor. */ @OptIn(ExperimentalCoroutinesApi::class) -internal class SimVirtDriverTest { +internal class SimHostTest { private lateinit var scope: TestCoroutineScope private lateinit var clock: Clock private lateinit var machineModel: SimMachineModel @@ -75,35 +82,42 @@ internal class SimVirtDriverTest { var grantedWork = 0L var overcommittedWork = 0L + val node = Node( + UUID.randomUUID(), "name", emptyMap(), NodeState.SHUTOFF, + Flavor(machineModel.cpus.size, machineModel.memory.map { it.size }.sum()), Image.EMPTY, emptyFlow() + ) + scope.launch { - val virtDriver = SimVirtDriver(this, SimFairShareHypervisorProvider()) - val vmm = SimWorkloadImage(UUID.randomUUID(), "vmm", emptyMap(), virtDriver) + val virtDriver = SimHost(node, this, SimFairShareHypervisorProvider()) + val vmm = Image(UUID.randomUUID(), "vmm", mapOf("workload" to virtDriver)) val duration = 5 * 60L - val vmImageA = SimWorkloadImage( + val vmImageA = Image( UUID.randomUUID(), "<unnamed>", - emptyMap(), - SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(duration * 1000, 28.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 3500.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 0.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 183.0, 2) - ), + mapOf( + "workload" to SimTraceWorkload( + sequenceOf( + SimTraceWorkload.Fragment(duration * 1000, 28.0, 2), + SimTraceWorkload.Fragment(duration * 1000, 3500.0, 2), + SimTraceWorkload.Fragment(duration * 1000, 0.0, 2), + SimTraceWorkload.Fragment(duration * 1000, 183.0, 2) + ), + ) ) ) - val vmImageB = SimWorkloadImage( + val vmImageB = Image( UUID.randomUUID(), "<unnamed>", - emptyMap(), - SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(duration * 1000, 28.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 3100.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 0.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 73.0, 2) + mapOf( + "workload" to SimTraceWorkload( + sequenceOf( + SimTraceWorkload.Fragment(duration * 1000, 28.0, 2), + SimTraceWorkload.Fragment(duration * 1000, 3100.0, 2), + SimTraceWorkload.Fragment(duration * 1000, 0.0, 2), + SimTraceWorkload.Fragment(duration * 1000, 73.0, 2) + ) ) - ), + ) ) val metalDriver = @@ -119,7 +133,7 @@ internal class SimVirtDriverTest { virtDriver.events .onEach { event -> when (event) { - is HypervisorEvent.SliceFinished -> { + is HostEvent.SliceFinished -> { requestedWork += event.requestedBurst grantedWork += event.grantedBurst overcommittedWork += event.overcommissionedBurst @@ -128,8 +142,8 @@ internal class SimVirtDriverTest { } .launchIn(this) - virtDriver.spawn("a", vmImageA, flavor) - virtDriver.spawn("b", vmImageB, flavor) + launch { virtDriver.spawn(MockServer(UUID.randomUUID(), "a", flavor, vmImageA)) } + launch { virtDriver.spawn(MockServer(UUID.randomUUID(), "b", flavor, vmImageB)) } } scope.advanceUntilIdle() @@ -142,4 +156,20 @@ internal class SimVirtDriverTest { { assertEquals(1200006, scope.currentTime) } ) } + + private class MockServer( + override val uid: UUID, + override val name: String, + override val flavor: Flavor, + override val image: Image + ) : Server { + override val tags: Map<String, String> = emptyMap() + override val state: ServerState = ServerState.BUILD + + override fun watch(watcher: ServerWatcher) {} + + override fun unwatch(watcher: ServerWatcher) {} + + override suspend fun refresh() {} + } } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimProvisioningServiceTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimProvisioningServiceTest.kt index a33a4e5f..33b3db94 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimProvisioningServiceTest.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimProvisioningServiceTest.kt @@ -29,7 +29,8 @@ import kotlinx.coroutines.launch import kotlinx.coroutines.test.TestCoroutineScope import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test -import org.opendc.compute.core.metal.service.SimpleProvisioningService +import org.opendc.compute.api.Image +import org.opendc.metal.service.SimpleProvisioningService import org.opendc.simulator.compute.SimMachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode @@ -64,7 +65,7 @@ internal class SimProvisioningServiceTest { val clock = DelayControllerClockAdapter(testScope) testScope.launch { - val image = SimWorkloadImage(UUID.randomUUID(), "<unnamed>", emptyMap(), SimFlopsWorkload(1000)) + val image = Image(UUID.randomUUID(), "<unnamed>", mapOf("machine" to SimFlopsWorkload(1000))) val driver = SimBareMetalDriver(this, clock, UUID.randomUUID(), "test", emptyMap(), machineModel) val provisioner = SimpleProvisioningService() @@ -72,7 +73,7 @@ internal class SimProvisioningServiceTest { delay(5) val nodes = provisioner.nodes() val node = provisioner.deploy(nodes.first(), image) - node.server!!.events.collect { println(it) } + node.events.collect { println(it) } } testScope.advanceUntilIdle() diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/power/CpuPowerModelTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/power/CpuPowerModelTest.kt index 7b0c7515..d4d88fb1 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/power/CpuPowerModelTest.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/power/CpuPowerModelTest.kt @@ -7,9 +7,9 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.Arguments import org.junit.jupiter.params.provider.MethodSource -import org.opendc.compute.core.metal.driver.BareMetalDriver import org.opendc.compute.simulator.power.api.CpuPowerModel import org.opendc.compute.simulator.power.models.* +import org.opendc.metal.driver.BareMetalDriver import java.util.stream.Stream import kotlin.math.pow diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index 8f3e686a..a5cf4fc0 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -32,22 +32,26 @@ import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.takeWhile import kotlinx.coroutines.launch import mu.KotlinLogging -import org.opendc.compute.core.Flavor -import org.opendc.compute.core.ServerEvent -import org.opendc.compute.core.metal.NODE_CLUSTER -import org.opendc.compute.core.metal.driver.BareMetalDriver -import org.opendc.compute.core.metal.service.ProvisioningService -import org.opendc.compute.core.virt.HypervisorEvent -import org.opendc.compute.core.virt.service.VirtProvisioningEvent -import org.opendc.compute.core.workload.VmWorkload +import org.opendc.compute.api.ComputeWorkload +import org.opendc.compute.api.Flavor +import org.opendc.compute.api.Server +import org.opendc.compute.api.ServerState +import org.opendc.compute.api.ServerWatcher +import org.opendc.compute.service.ComputeService +import org.opendc.compute.service.ComputeServiceEvent +import org.opendc.compute.service.driver.HostEvent +import org.opendc.compute.service.internal.ComputeServiceImpl +import org.opendc.compute.service.scheduler.AllocationPolicy import org.opendc.compute.simulator.SimBareMetalDriver -import org.opendc.compute.simulator.SimVirtDriver -import org.opendc.compute.simulator.SimVirtProvisioningService -import org.opendc.compute.simulator.allocation.AllocationPolicy +import org.opendc.compute.simulator.SimHost +import org.opendc.compute.simulator.SimHostProvisioner import org.opendc.experiments.capelin.monitor.ExperimentMonitor import org.opendc.experiments.capelin.trace.Sc20StreamingParquetTraceReader import org.opendc.format.environment.EnvironmentReader import org.opendc.format.trace.TraceReader +import org.opendc.metal.NODE_CLUSTER +import org.opendc.metal.NodeEvent +import org.opendc.metal.service.ProvisioningService import org.opendc.simulator.compute.SimFairShareHypervisorProvider import org.opendc.simulator.compute.interference.PerformanceInterferenceModel import org.opendc.simulator.failures.CorrelatedFaultInjector @@ -135,6 +139,12 @@ public fun createTraceReader( ) } +public data class ProvisionerResult( + val metal: ProvisioningService, + val provisioner: SimHostProvisioner, + val compute: ComputeServiceImpl +) + /** * Construct the environment for a VM provisioner and return the provisioner instance. */ @@ -144,45 +154,52 @@ public suspend fun createProvisioner( environmentReader: EnvironmentReader, allocationPolicy: AllocationPolicy, eventTracer: EventTracer -): Pair<ProvisioningService, SimVirtProvisioningService> { +): ProvisionerResult { val environment = environmentReader.use { it.construct(coroutineScope, clock) } val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService] // Wait for the bare metal nodes to be spawned delay(10) - val scheduler = SimVirtProvisioningService(coroutineScope, clock, bareMetalProvisioner, allocationPolicy, eventTracer, SimFairShareHypervisorProvider()) + val provisioner = SimHostProvisioner(coroutineScope.coroutineContext, bareMetalProvisioner, SimFairShareHypervisorProvider()) + val hosts = provisioner.provisionAll() + + val scheduler = ComputeService(coroutineScope.coroutineContext, clock, eventTracer, allocationPolicy) as ComputeServiceImpl + + for (host in hosts) { + scheduler.addHost(host) + } // Wait for the hypervisors to be spawned delay(10) - return bareMetalProvisioner to scheduler + return ProvisionerResult(bareMetalProvisioner, provisioner, scheduler) } /** * Attach the specified monitor to the VM provisioner. */ @OptIn(ExperimentalCoroutinesApi::class) -public suspend fun attachMonitor( +public fun attachMonitor( coroutineScope: CoroutineScope, clock: Clock, - scheduler: SimVirtProvisioningService, + scheduler: ComputeService, monitor: ExperimentMonitor ) { - val hypervisors = scheduler.drivers() + val hypervisors = scheduler.hosts // Monitor hypervisor events for (hypervisor in hypervisors) { - // TODO Do not expose VirtDriver directly but use Hypervisor class. - val server = (hypervisor as SimVirtDriver).server + // TODO Do not expose Host directly but use Hypervisor class. + val server = (hypervisor as SimHost).node monitor.reportHostStateChange(clock.millis(), hypervisor, server) server.events .onEach { event -> val time = clock.millis() when (event) { - is ServerEvent.StateChanged -> { - monitor.reportHostStateChange(time, hypervisor, event.server) + is NodeEvent.StateChanged -> { + monitor.reportHostStateChange(time, hypervisor, event.node) } } } @@ -190,7 +207,7 @@ public suspend fun attachMonitor( hypervisor.events .onEach { event -> when (event) { - is HypervisorEvent.SliceFinished -> monitor.reportHostSlice( + is HostEvent.SliceFinished -> monitor.reportHostSlice( clock.millis(), event.requestedBurst, event.grantedBurst, @@ -199,22 +216,22 @@ public suspend fun attachMonitor( event.cpuUsage, event.cpuDemand, event.numberOfDeployedImages, - event.hostServer + (event.driver as SimHost).node ) } } .launchIn(coroutineScope) - val driver = hypervisor.server.services[BareMetalDriver.Key] as SimBareMetalDriver + val driver = server.metadata["driver"] as SimBareMetalDriver driver.powerDraw - .onEach { monitor.reportPowerConsumption(hypervisor.server, it) } + .onEach { monitor.reportPowerConsumption(server, it) } .launchIn(coroutineScope) } scheduler.events .onEach { event -> when (event) { - is VirtProvisioningEvent.MetricsAvailable -> + is ComputeServiceEvent.MetricsAvailable -> monitor.reportProvisionerMetrics(clock.millis(), event) } } @@ -227,11 +244,12 @@ public suspend fun attachMonitor( public suspend fun processTrace( coroutineScope: CoroutineScope, clock: Clock, - reader: TraceReader<VmWorkload>, - scheduler: SimVirtProvisioningService, + reader: TraceReader<ComputeWorkload>, + scheduler: ComputeService, chan: Channel<Unit>, monitor: ExperimentMonitor ) { + val client = scheduler.newClient() try { var submitted = 0 @@ -242,7 +260,7 @@ public suspend fun processTrace( delay(max(0, time - clock.millis())) coroutineScope.launch { chan.send(Unit) - val server = scheduler.deploy( + val server = client.newServer( workload.image.name, workload.image, Flavor( @@ -250,21 +268,19 @@ public suspend fun processTrace( workload.image.tags["required-memory"] as Long ) ) - // Monitor server events - server.events - .onEach { - if (it is ServerEvent.StateChanged) { - monitor.reportVmStateChange(clock.millis(), it.server) - } + + server.watch(object : ServerWatcher { + override fun onStateChanged(server: Server, newState: ServerState) { + monitor.reportVmStateChange(clock.millis(), server, newState) } - .collect() + }) } } scheduler.events .takeWhile { when (it) { - is VirtProvisioningEvent.MetricsAvailable -> + is ComputeServiceEvent.MetricsAvailable -> it.inactiveVmCount + it.failedVmCount != submitted } } @@ -272,5 +288,6 @@ public suspend fun processTrace( delay(1) } finally { reader.close() + client.close() } } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 75b0d735..ff0a026d 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -28,6 +28,12 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.launch import kotlinx.coroutines.test.TestCoroutineScope import mu.KotlinLogging +import org.opendc.compute.service.scheduler.AllocationPolicy +import org.opendc.compute.service.scheduler.AvailableCoreMemoryAllocationPolicy +import org.opendc.compute.service.scheduler.AvailableMemoryAllocationPolicy +import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy +import org.opendc.compute.service.scheduler.ProvisionedCoresAllocationPolicy +import org.opendc.compute.service.scheduler.RandomAllocationPolicy import org.opendc.compute.simulator.allocation.* import org.opendc.experiments.capelin.experiment.attachMonitor import org.opendc.experiments.capelin.experiment.createFailureDomain @@ -151,7 +157,7 @@ public abstract class Portfolio(name: String) : Experiment(name) { ) testScope.launch { - val (bareMetalProvisioner, scheduler) = createProvisioner( + val (bareMetalProvisioner, provisioner, scheduler) = createProvisioner( this, clock, environment, @@ -190,7 +196,8 @@ public abstract class Portfolio(name: String) : Experiment(name) { logger.debug("FINISHED=${scheduler.finishedVms}") failureDomain?.cancel() - scheduler.terminate() + scheduler.close() + provisioner.close() } try { diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt index 3c6637bf..1e42cf56 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt @@ -22,9 +22,11 @@ package org.opendc.experiments.capelin.monitor -import org.opendc.compute.core.Server -import org.opendc.compute.core.virt.driver.VirtDriver -import org.opendc.compute.core.virt.service.VirtProvisioningEvent +import org.opendc.compute.api.Server +import org.opendc.compute.api.ServerState +import org.opendc.compute.service.ComputeServiceEvent +import org.opendc.compute.service.driver.Host +import org.opendc.metal.Node import java.io.Closeable /** @@ -34,22 +36,22 @@ public interface ExperimentMonitor : Closeable { /** * This method is invoked when the state of a VM changes. */ - public fun reportVmStateChange(time: Long, server: Server) {} + public fun reportVmStateChange(time: Long, server: Server, newState: ServerState) {} /** * This method is invoked when the state of a host changes. */ public fun reportHostStateChange( time: Long, - driver: VirtDriver, - server: Server + driver: Host, + host: Node ) { } /** * Report the power consumption of a host. */ - public fun reportPowerConsumption(host: Server, draw: Double) {} + public fun reportPowerConsumption(host: Node, draw: Double) {} /** * This method is invoked for a host for each slice that is finishes. @@ -63,7 +65,7 @@ public interface ExperimentMonitor : Closeable { cpuUsage: Double, cpuDemand: Double, numberOfDeployedImages: Int, - hostServer: Server, + host: Node, duration: Long = 5 * 60 * 1000L ) { } @@ -71,5 +73,5 @@ public interface ExperimentMonitor : Closeable { /** * This method is invoked for a provisioner event. */ - public fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) {} + public fun reportProvisionerMetrics(time: Long, event: ComputeServiceEvent.MetricsAvailable) {} } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt index a0d57656..98052214 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt @@ -23,13 +23,15 @@ package org.opendc.experiments.capelin.monitor import mu.KotlinLogging -import org.opendc.compute.core.Server -import org.opendc.compute.core.virt.driver.VirtDriver -import org.opendc.compute.core.virt.service.VirtProvisioningEvent +import org.opendc.compute.api.Server +import org.opendc.compute.api.ServerState +import org.opendc.compute.service.ComputeServiceEvent +import org.opendc.compute.service.driver.Host import org.opendc.experiments.capelin.telemetry.HostEvent import org.opendc.experiments.capelin.telemetry.ProvisionerEvent import org.opendc.experiments.capelin.telemetry.parquet.ParquetHostEventWriter import org.opendc.experiments.capelin.telemetry.parquet.ParquetProvisionerEventWriter +import org.opendc.metal.Node import java.io.File /** @@ -49,10 +51,10 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: File(base, "provisioner-metrics/$partition/data.parquet"), bufferSize ) - private val currentHostEvent = mutableMapOf<Server, HostEvent>() + private val currentHostEvent = mutableMapOf<Node, HostEvent>() private var startTime = -1L - override fun reportVmStateChange(time: Long, server: Server) { + override fun reportVmStateChange(time: Long, server: Server, newState: ServerState) { if (startTime < 0) { startTime = time @@ -63,12 +65,12 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: override fun reportHostStateChange( time: Long, - driver: VirtDriver, - server: Server + driver: Host, + host: Node ) { - logger.debug { "Host ${server.uid} changed state ${server.state} [$time]" } + logger.debug { "Host ${host.uid} changed state ${host.state} [$time]" } - val previousEvent = currentHostEvent[server] + val previousEvent = currentHostEvent[host] val roundedTime = previousEvent?.let { val duration = time - it.timestamp @@ -91,13 +93,13 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: 0.0, 0.0, 0, - server + host ) } - private val lastPowerConsumption = mutableMapOf<Server, Double>() + private val lastPowerConsumption = mutableMapOf<Node, Double>() - override fun reportPowerConsumption(host: Server, draw: Double) { + override fun reportPowerConsumption(host: Node, draw: Double) { lastPowerConsumption[host] = draw } @@ -110,16 +112,16 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: cpuUsage: Double, cpuDemand: Double, numberOfDeployedImages: Int, - hostServer: Server, + host: Node, duration: Long ) { - val previousEvent = currentHostEvent[hostServer] + val previousEvent = currentHostEvent[host] when { previousEvent == null -> { val event = HostEvent( time, 5 * 60 * 1000L, - hostServer, + host, numberOfDeployedImages, requestedBurst, grantedBurst, @@ -127,17 +129,17 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: interferedBurst, cpuUsage, cpuDemand, - lastPowerConsumption[hostServer] ?: 200.0, - hostServer.flavor.cpuCount + lastPowerConsumption[host] ?: 200.0, + host.flavor.cpuCount ) - currentHostEvent[hostServer] = event + currentHostEvent[host] = event } previousEvent.timestamp == time -> { val event = HostEvent( time, previousEvent.duration, - hostServer, + host, numberOfDeployedImages, requestedBurst, grantedBurst, @@ -145,11 +147,11 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: interferedBurst, cpuUsage, cpuDemand, - lastPowerConsumption[hostServer] ?: 200.0, - hostServer.flavor.cpuCount + lastPowerConsumption[host] ?: 200.0, + host.flavor.cpuCount ) - currentHostEvent[hostServer] = event + currentHostEvent[host] = event } else -> { hostWriter.write(previousEvent) @@ -157,7 +159,7 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: val event = HostEvent( time, time - previousEvent.timestamp, - hostServer, + host, numberOfDeployedImages, requestedBurst, grantedBurst, @@ -165,16 +167,16 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: interferedBurst, cpuUsage, cpuDemand, - lastPowerConsumption[hostServer] ?: 200.0, - hostServer.flavor.cpuCount + lastPowerConsumption[host] ?: 200.0, + host.flavor.cpuCount ) - currentHostEvent[hostServer] = event + currentHostEvent[host] = event } } } - override fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) { + override fun reportProvisionerMetrics(time: Long, event: ComputeServiceEvent.MetricsAvailable) { provisionerWriter.write( ProvisionerEvent( time, diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt index e5e9d520..e7b6a7bb 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt @@ -22,7 +22,7 @@ package org.opendc.experiments.capelin.telemetry -import org.opendc.compute.core.Server +import org.opendc.metal.Node /** * A periodic report of the host machine metrics. @@ -30,7 +30,7 @@ import org.opendc.compute.core.Server public data class HostEvent( override val timestamp: Long, public val duration: Long, - public val host: Server, + public val node: Node, public val vmCount: Int, public val requestedBurst: Long, public val grantedBurst: Long, diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt index 427c453a..7631f55f 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt @@ -22,7 +22,7 @@ package org.opendc.experiments.capelin.telemetry -import org.opendc.compute.core.Server +import org.opendc.compute.api.Server /** * A periodic report of a virtual machine's metrics. diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt index 4a3e7963..b4fdd66a 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt @@ -41,8 +41,8 @@ public class ParquetHostEventWriter(path: File, bufferSize: Int) : // record.put("portfolio_id", event.run.parent.parent.id) // record.put("scenario_id", event.run.parent.id) // record.put("run_id", event.run.id) - record.put("host_id", event.host.name) - record.put("state", event.host.state.name) + record.put("host_id", event.node.name) + record.put("state", event.node.state.name) record.put("timestamp", event.timestamp) record.put("duration", event.duration) record.put("vm_count", event.vmCount) diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt index 6cfdae40..f9630078 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt @@ -22,8 +22,8 @@ package org.opendc.experiments.capelin.trace -import org.opendc.compute.core.workload.VmWorkload -import org.opendc.compute.simulator.SimWorkloadImage +import org.opendc.compute.api.ComputeWorkload +import org.opendc.compute.api.Image import org.opendc.experiments.capelin.model.CompositeWorkload import org.opendc.experiments.capelin.model.Workload import org.opendc.format.trace.TraceEntry @@ -45,11 +45,11 @@ public class Sc20ParquetTraceReader( performanceInterferenceModel: Map<String, PerformanceInterferenceModel>, workload: Workload, seed: Int -) : TraceReader<VmWorkload> { +) : TraceReader<ComputeWorkload> { /** * The iterator over the actual trace. */ - private val iterator: Iterator<TraceEntry<VmWorkload>> = + private val iterator: Iterator<TraceEntry<ComputeWorkload>> = rawReaders .map { it.read() } .run { @@ -73,11 +73,10 @@ public class Sc20ParquetTraceReader( performanceInterferenceModel[id] ?: PerformanceInterferenceModel(TreeSet()) val newImage = - SimWorkloadImage( + Image( image.uid, image.name, image.tags + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), - (image as SimWorkloadImage).workload ) val newWorkload = entry.workload.copy(image = newImage) Sc20RawParquetTraceReader.TraceEntryImpl(entry.submissionTime, newWorkload) @@ -88,7 +87,7 @@ public class Sc20ParquetTraceReader( override fun hasNext(): Boolean = iterator.hasNext() - override fun next(): TraceEntry<VmWorkload> = iterator.next() + override fun next(): TraceEntry<ComputeWorkload> = iterator.next() override fun close() {} } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt index d2560d62..b29bdc54 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt @@ -26,8 +26,8 @@ import mu.KotlinLogging import org.apache.avro.generic.GenericData import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetReader -import org.opendc.compute.core.workload.VmWorkload -import org.opendc.compute.simulator.SimWorkloadImage +import org.opendc.compute.api.ComputeWorkload +import org.opendc.compute.api.Image import org.opendc.core.User import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader @@ -108,11 +108,12 @@ public class Sc20RawParquetTraceReader(private val path: File) { val vmFragments = fragments.getValue(id).asSequence() val totalLoad = vmFragments.sumByDouble { it.usage } * 5 * 60 // avg MHz * duration = MFLOPs - val vmWorkload = VmWorkload( + val workload = SimTraceWorkload(vmFragments) + val vmWorkload = ComputeWorkload( uid, id, UnnamedUser, - SimWorkloadImage( + Image( uid, id, mapOf( @@ -120,9 +121,9 @@ public class Sc20RawParquetTraceReader(private val path: File) { "end-time" to endTime, "total-load" to totalLoad, "cores" to maxCores, - "required-memory" to requiredMemory - ), - SimTraceWorkload(vmFragments) + "required-memory" to requiredMemory, + "workload" to workload + ) ) ) entries.add(TraceEntryImpl(submissionTime, vmWorkload)) @@ -150,7 +151,7 @@ public class Sc20RawParquetTraceReader(private val path: File) { /** * Read the entries in the trace. */ - public fun read(): List<TraceEntry<VmWorkload>> = entries + public fun read(): List<TraceEntry<ComputeWorkload>> = entries /** * An unnamed user. @@ -165,6 +166,6 @@ public class Sc20RawParquetTraceReader(private val path: File) { */ internal data class TraceEntryImpl( override var submissionTime: Long, - override val workload: VmWorkload - ) : TraceEntry<VmWorkload> + override val workload: ComputeWorkload + ) : TraceEntry<ComputeWorkload> } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt index 12705c80..c588fda3 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt @@ -31,8 +31,8 @@ import org.apache.parquet.filter2.predicate.FilterApi import org.apache.parquet.filter2.predicate.Statistics import org.apache.parquet.filter2.predicate.UserDefinedPredicate import org.apache.parquet.io.api.Binary -import org.opendc.compute.core.workload.VmWorkload -import org.opendc.compute.simulator.SimWorkloadImage +import org.opendc.compute.api.ComputeWorkload +import org.opendc.compute.api.Image import org.opendc.core.User import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader @@ -62,11 +62,11 @@ public class Sc20StreamingParquetTraceReader( performanceInterferenceModel: PerformanceInterferenceModel, selectedVms: List<String>, random: Random -) : TraceReader<VmWorkload> { +) : TraceReader<ComputeWorkload> { /** * The internal iterator to use for this reader. */ - private val iterator: Iterator<TraceEntry<VmWorkload>> + private val iterator: Iterator<TraceEntry<ComputeWorkload>> /** * The intermediate buffer to store the read records in. @@ -235,19 +235,20 @@ public class Sc20StreamingParquetTraceReader( performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSortedSet(), Random(random.nextInt()) ) - val vmWorkload = VmWorkload( + val workload = SimTraceWorkload(fragments) + val vmWorkload = ComputeWorkload( uid, "VM Workload $id", UnnamedUser, - SimWorkloadImage( + Image( uid, id, mapOf( IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems, "cores" to maxCores, - "required-memory" to requiredMemory - ), - SimTraceWorkload(fragments), + "required-memory" to requiredMemory, + "workload" to workload + ) ) ) @@ -263,7 +264,7 @@ public class Sc20StreamingParquetTraceReader( override fun hasNext(): Boolean = iterator.hasNext() - override fun next(): TraceEntry<VmWorkload> = iterator.next() + override fun next(): TraceEntry<ComputeWorkload> = iterator.next() override fun close() { readerThread.interrupt() @@ -300,6 +301,6 @@ public class Sc20StreamingParquetTraceReader( */ private data class TraceEntryImpl( override var submissionTime: Long, - override val workload: VmWorkload - ) : TraceEntry<VmWorkload> + override val workload: ComputeWorkload + ) : TraceEntry<ComputeWorkload> } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt index 4d9b9df1..881652f6 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt @@ -23,8 +23,8 @@ package org.opendc.experiments.capelin.trace import mu.KotlinLogging -import org.opendc.compute.core.workload.VmWorkload -import org.opendc.compute.simulator.SimWorkloadImage +import org.opendc.compute.api.ComputeWorkload +import org.opendc.compute.api.Image import org.opendc.experiments.capelin.model.CompositeWorkload import org.opendc.experiments.capelin.model.SamplingStrategy import org.opendc.experiments.capelin.model.Workload @@ -38,11 +38,11 @@ private val logger = KotlinLogging.logger {} * Sample the workload for the specified [run]. */ public fun sampleWorkload( - trace: List<TraceEntry<VmWorkload>>, + trace: List<TraceEntry<ComputeWorkload>>, workload: Workload, subWorkload: Workload, seed: Int -): List<TraceEntry<VmWorkload>> { +): List<TraceEntry<ComputeWorkload>> { return when { workload is CompositeWorkload -> sampleRegularWorkload(trace, workload, subWorkload, seed) workload.samplingStrategy == SamplingStrategy.HPC -> @@ -58,15 +58,15 @@ public fun sampleWorkload( * Sample a regular (non-HPC) workload. */ public fun sampleRegularWorkload( - trace: List<TraceEntry<VmWorkload>>, + trace: List<TraceEntry<ComputeWorkload>>, workload: Workload, subWorkload: Workload, seed: Int -): List<TraceEntry<VmWorkload>> { +): List<TraceEntry<ComputeWorkload>> { val fraction = subWorkload.fraction val shuffled = trace.shuffled(Random(seed)) - val res = mutableListOf<TraceEntry<VmWorkload>>() + val res = mutableListOf<TraceEntry<ComputeWorkload>>() val totalLoad = if (workload is CompositeWorkload) { workload.totalLoad } else { @@ -93,11 +93,11 @@ public fun sampleRegularWorkload( * Sample a HPC workload. */ public fun sampleHpcWorkload( - trace: List<TraceEntry<VmWorkload>>, + trace: List<TraceEntry<ComputeWorkload>>, workload: Workload, seed: Int, sampleOnLoad: Boolean -): List<TraceEntry<VmWorkload>> { +): List<TraceEntry<ComputeWorkload>> { val pattern = Regex("^vm__workload__(ComputeNode|cn).*") val random = Random(seed) @@ -109,7 +109,7 @@ public fun sampleHpcWorkload( val hpcSequence = generateSequence(0) { it + 1 } .map { index -> - val res = mutableListOf<TraceEntry<VmWorkload>>() + val res = mutableListOf<TraceEntry<ComputeWorkload>>() hpc.mapTo(res) { sample(it, index) } res.shuffle(random) res @@ -118,7 +118,7 @@ public fun sampleHpcWorkload( val nonHpcSequence = generateSequence(0) { it + 1 } .map { index -> - val res = mutableListOf<TraceEntry<VmWorkload>>() + val res = mutableListOf<TraceEntry<ComputeWorkload>>() nonHpc.mapTo(res) { sample(it, index) } res.shuffle(random) res @@ -139,7 +139,7 @@ public fun sampleHpcWorkload( var nonHpcCount = 0 var nonHpcLoad = 0.0 - val res = mutableListOf<TraceEntry<VmWorkload>>() + val res = mutableListOf<TraceEntry<ComputeWorkload>>() if (sampleOnLoad) { var currentLoad = 0.0 @@ -194,17 +194,16 @@ public fun sampleHpcWorkload( /** * Sample a random trace entry. */ -private fun sample(entry: TraceEntry<VmWorkload>, i: Int): TraceEntry<VmWorkload> { +private fun sample(entry: TraceEntry<ComputeWorkload>, i: Int): TraceEntry<ComputeWorkload> { val id = UUID.nameUUIDFromBytes("${entry.workload.image.uid}-$i".toByteArray()) - val image = SimWorkloadImage( + val image = Image( id, entry.workload.image.name, - entry.workload.image.tags, - (entry.workload.image as SimWorkloadImage).workload + entry.workload.image.tags ) val vmWorkload = entry.workload.copy(uid = id, image = image, name = entry.workload.name) return VmTraceEntry(vmWorkload, entry.submissionTime) } -private class VmTraceEntry(override val workload: VmWorkload, override val submissionTime: Long) : - TraceEntry<VmWorkload> +private class VmTraceEntry(override val workload: ComputeWorkload, override val submissionTime: Long) : + TraceEntry<ComputeWorkload> diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 6a0796f6..dfc6b90b 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -32,10 +32,9 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll -import org.opendc.compute.core.Server -import org.opendc.compute.core.workload.VmWorkload -import org.opendc.compute.simulator.SimVirtProvisioningService -import org.opendc.compute.simulator.allocation.AvailableCoreMemoryAllocationPolicy +import org.opendc.compute.api.ComputeWorkload +import org.opendc.compute.service.internal.ComputeServiceImpl +import org.opendc.compute.service.scheduler.AvailableCoreMemoryAllocationPolicy import org.opendc.experiments.capelin.experiment.attachMonitor import org.opendc.experiments.capelin.experiment.createFailureDomain import org.opendc.experiments.capelin.experiment.createProvisioner @@ -47,6 +46,7 @@ import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader import org.opendc.format.environment.EnvironmentReader import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader import org.opendc.format.trace.TraceReader +import org.opendc.metal.Node import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.trace.core.EventTracer import java.io.File @@ -97,7 +97,7 @@ class CapelinIntegrationTest { val allocationPolicy = AvailableCoreMemoryAllocationPolicy() val traceReader = createTestTraceReader() val environmentReader = createTestEnvironmentReader() - lateinit var scheduler: SimVirtProvisioningService + lateinit var scheduler: ComputeServiceImpl val tracer = EventTracer(clock) testScope.launch { @@ -108,8 +108,8 @@ class CapelinIntegrationTest { allocationPolicy, tracer ) - val bareMetalProvisioner = res.first - scheduler = res.second + val bareMetalProvisioner = res.metal + scheduler = res.compute val failureDomain = if (failures) { println("ENABLING failures") @@ -138,8 +138,9 @@ class CapelinIntegrationTest { println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") failureDomain?.cancel() - scheduler.terminate() + scheduler.close() monitor.close() + res.provisioner.close() } runSimulation() @@ -148,9 +149,9 @@ class CapelinIntegrationTest { assertAll( { assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") }, { assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") }, - { assertEquals(1684849230562, monitor.totalRequestedBurst) }, - { assertEquals(447612683996, monitor.totalGrantedBurst) }, - { assertEquals(1219535757406, monitor.totalOvercommissionedBurst) }, + { assertEquals(1678587333640, monitor.totalRequestedBurst) }, + { assertEquals(438118200924, monitor.totalGrantedBurst) }, + { assertEquals(1220323969993, monitor.totalOvercommissionedBurst) }, { assertEquals(0, monitor.totalInterferedBurst) } ) } @@ -162,19 +163,16 @@ class CapelinIntegrationTest { val allocationPolicy = AvailableCoreMemoryAllocationPolicy() val traceReader = createTestTraceReader(0.5, seed) val environmentReader = createTestEnvironmentReader("single") - lateinit var scheduler: SimVirtProvisioningService val tracer = EventTracer(clock) testScope.launch { - val res = createProvisioner( + val (_, provisioner, scheduler) = createProvisioner( this, clock, environmentReader, allocationPolicy, tracer ) - scheduler = res.second - attachMonitor(this, clock, scheduler, monitor) processTrace( this, @@ -187,8 +185,9 @@ class CapelinIntegrationTest { println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") - scheduler.terminate() + scheduler.close() monitor.close() + provisioner.close() } runSimulation() @@ -210,7 +209,7 @@ class CapelinIntegrationTest { /** * Obtain the trace reader for the test. */ - private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<VmWorkload> { + private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<ComputeWorkload> { return Sc20ParquetTraceReader( listOf(Sc20RawParquetTraceReader(File("src/test/resources/trace"))), emptyMap(), @@ -242,7 +241,7 @@ class CapelinIntegrationTest { cpuUsage: Double, cpuDemand: Double, numberOfDeployedImages: Int, - hostServer: Server, + host: Node, duration: Long ) { totalRequestedBurst += requestedBurst diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt index fc979363..7b9d70ed 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt +++ b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt @@ -24,13 +24,14 @@ package org.opendc.experiments.sc18 import kotlinx.coroutines.* import kotlinx.coroutines.test.TestCoroutineScope -import org.opendc.compute.core.metal.service.ProvisioningService -import org.opendc.compute.simulator.SimVirtProvisioningService -import org.opendc.compute.simulator.allocation.NumberOfActiveServersAllocationPolicy +import org.opendc.compute.service.ComputeService +import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy +import org.opendc.compute.simulator.SimHostProvisioner import org.opendc.format.environment.sc18.Sc18EnvironmentReader import org.opendc.format.trace.gwf.GwfTraceReader import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf +import org.opendc.metal.service.ProvisioningService import org.opendc.simulator.compute.SimSpaceSharedHypervisorProvider import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.trace.core.EventTracer @@ -91,16 +92,17 @@ public class UnderspecificationExperiment : Experiment("underspecification") { // Wait for the bare metal nodes to be spawned delay(10) - val provisioner = SimVirtProvisioningService( - testScope, + val provisioner = SimHostProvisioner(testScope.coroutineContext, bareMetal, SimSpaceSharedHypervisorProvider()) + val hosts = provisioner.provisionAll() + val compute = ComputeService( + testScope.coroutineContext, clock, - bareMetal, - NumberOfActiveServersAllocationPolicy(), tracer, - SimSpaceSharedHypervisorProvider(), - schedulingQuantum = 1000 + NumberOfActiveServersAllocationPolicy(), ) + hosts.forEach { compute.addHost(it) } + // Wait for the hypervisors to be spawned delay(10) @@ -108,7 +110,7 @@ public class UnderspecificationExperiment : Experiment("underspecification") { testScope, clock, tracer, - provisioner, + compute.newClient(), mode = WorkflowSchedulerMode.Batch(100), jobAdmissionPolicy = NullJobAdmissionPolicy, jobOrderPolicy = SubmissionTimeJobOrderPolicy(), diff --git a/simulator/opendc-format/build.gradle.kts b/simulator/opendc-format/build.gradle.kts index cd26c077..37e9c9c8 100644 --- a/simulator/opendc-format/build.gradle.kts +++ b/simulator/opendc-format/build.gradle.kts @@ -31,7 +31,7 @@ plugins { dependencies { api(platform(project(":opendc-platform"))) api(project(":opendc-core")) - api(project(":opendc-compute:opendc-compute-core")) + api(project(":opendc-compute:opendc-compute-api")) api(project(":opendc-workflows")) implementation(project(":opendc-simulator:opendc-simulator-compute")) implementation(project(":opendc-compute:opendc-compute-simulator")) diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt index 2e3e4a73..bbbbe87c 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt @@ -26,14 +26,14 @@ import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue import kotlinx.coroutines.CoroutineScope -import org.opendc.compute.core.metal.service.ProvisioningService -import org.opendc.compute.core.metal.service.SimpleProvisioningService import org.opendc.compute.simulator.SimBareMetalDriver import org.opendc.core.Environment import org.opendc.core.Platform import org.opendc.core.Zone import org.opendc.core.services.ServiceRegistry import org.opendc.format.environment.EnvironmentReader +import org.opendc.metal.service.ProvisioningService +import org.opendc.metal.service.SimpleProvisioningService import org.opendc.simulator.compute.SimMachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt index 6ec8ba4a..998f9cd6 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt @@ -23,9 +23,6 @@ package org.opendc.format.environment.sc20 import kotlinx.coroutines.CoroutineScope -import org.opendc.compute.core.metal.NODE_CLUSTER -import org.opendc.compute.core.metal.service.ProvisioningService -import org.opendc.compute.core.metal.service.SimpleProvisioningService import org.opendc.compute.simulator.SimBareMetalDriver import org.opendc.compute.simulator.power.models.LinearPowerModel import org.opendc.core.Environment @@ -33,6 +30,9 @@ import org.opendc.core.Platform import org.opendc.core.Zone import org.opendc.core.services.ServiceRegistry import org.opendc.format.environment.EnvironmentReader +import org.opendc.metal.NODE_CLUSTER +import org.opendc.metal.service.ProvisioningService +import org.opendc.metal.service.SimpleProvisioningService import org.opendc.simulator.compute.SimMachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt index a58a2524..6cf65f7f 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt @@ -26,8 +26,6 @@ import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue import kotlinx.coroutines.CoroutineScope -import org.opendc.compute.core.metal.service.ProvisioningService -import org.opendc.compute.core.metal.service.SimpleProvisioningService import org.opendc.compute.simulator.SimBareMetalDriver import org.opendc.compute.simulator.power.models.LinearPowerModel import org.opendc.core.Environment @@ -35,6 +33,8 @@ import org.opendc.core.Platform import org.opendc.core.Zone import org.opendc.core.services.ServiceRegistry import org.opendc.format.environment.EnvironmentReader +import org.opendc.metal.service.ProvisioningService +import org.opendc.metal.service.SimpleProvisioningService import org.opendc.simulator.compute.SimMachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt index 90d751ea..1571b17d 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt @@ -22,8 +22,8 @@ package org.opendc.format.trace.bitbrains -import org.opendc.compute.core.workload.VmWorkload -import org.opendc.compute.simulator.SimWorkloadImage +import org.opendc.compute.api.ComputeWorkload +import org.opendc.compute.api.Image import org.opendc.core.User import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader @@ -45,17 +45,17 @@ import kotlin.math.min public class BitbrainsTraceReader( traceDirectory: File, performanceInterferenceModel: PerformanceInterferenceModel -) : TraceReader<VmWorkload> { +) : TraceReader<ComputeWorkload> { /** * The internal iterator to use for this reader. */ - private val iterator: Iterator<TraceEntry<VmWorkload>> + private val iterator: Iterator<TraceEntry<ComputeWorkload>> /** * Initialize the reader. */ init { - val entries = mutableMapOf<Long, TraceEntry<VmWorkload>>() + val entries = mutableMapOf<Long, TraceEntry<ComputeWorkload>>() var timestampCol = 0 var coreCol = 0 @@ -131,19 +131,20 @@ public class BitbrainsTraceReader( .toSortedSet() ) - val vmWorkload = VmWorkload( + val workload = SimTraceWorkload(flopsHistory.asSequence()) + val vmWorkload = ComputeWorkload( uuid, "VM Workload $vmId", UnnamedUser, - SimWorkloadImage( + Image( uuid, vmId.toString(), mapOf( IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems, "cores" to cores, - "required-memory" to requiredMemory - ), - SimTraceWorkload(flopsHistory.asSequence()) + "required-memory" to requiredMemory, + "workload" to workload + ) ) ) entries[vmId] = TraceEntryImpl( @@ -158,7 +159,7 @@ public class BitbrainsTraceReader( override fun hasNext(): Boolean = iterator.hasNext() - override fun next(): TraceEntry<VmWorkload> = iterator.next() + override fun next(): TraceEntry<ComputeWorkload> = iterator.next() override fun close() {} @@ -175,6 +176,6 @@ public class BitbrainsTraceReader( */ private data class TraceEntryImpl( override var submissionTime: Long, - override val workload: VmWorkload - ) : TraceEntry<VmWorkload> + override val workload: ComputeWorkload + ) : TraceEntry<ComputeWorkload> } diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt index c76889c8..cd7aff3c 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt @@ -22,7 +22,7 @@ package org.opendc.format.trace.gwf -import org.opendc.compute.simulator.SimWorkloadImage +import org.opendc.compute.api.Image import org.opendc.core.User import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader @@ -41,7 +41,6 @@ import kotlin.collections.List import kotlin.collections.MutableSet import kotlin.collections.component1 import kotlin.collections.component2 -import kotlin.collections.emptyMap import kotlin.collections.filter import kotlin.collections.forEach import kotlin.collections.getOrPut @@ -136,10 +135,11 @@ public class GwfTraceReader(reader: BufferedReader) : TraceReader<Job> { TraceEntryImpl(submitTime, Job(UUID(0L, taskId), "<unnamed>", UnnamedUser, HashSet())) } val workflow = entry.workload + val workload = SimFlopsWorkload(flops) val task = Task( UUID(0L, taskId), "<unnamed>", - SimWorkloadImage(UUID.randomUUID(), "<unnamed>", emptyMap(), SimFlopsWorkload(flops)), + Image(UUID.randomUUID(), "<unnamed>", mapOf("workload" to workload)), HashSet(), mapOf( WORKFLOW_TASK_CORES to cores, diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt index 78f581ca..07785632 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt @@ -22,8 +22,8 @@ package org.opendc.format.trace.sc20 -import org.opendc.compute.core.workload.VmWorkload -import org.opendc.compute.simulator.SimWorkloadImage +import org.opendc.compute.api.ComputeWorkload +import org.opendc.compute.api.Image import org.opendc.core.User import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader @@ -49,17 +49,17 @@ public class Sc20TraceReader( performanceInterferenceModel: PerformanceInterferenceModel, selectedVms: List<String>, random: Random -) : TraceReader<VmWorkload> { +) : TraceReader<ComputeWorkload> { /** * The internal iterator to use for this reader. */ - private val iterator: Iterator<TraceEntry<VmWorkload>> + private val iterator: Iterator<TraceEntry<ComputeWorkload>> /** * Initialize the reader. */ init { - val entries = mutableMapOf<UUID, TraceEntry<VmWorkload>>() + val entries = mutableMapOf<UUID, TraceEntry<ComputeWorkload>>() val timestampCol = 0 val cpuUsageCol = 1 @@ -156,19 +156,20 @@ public class Sc20TraceReader( performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId) }.toSortedSet(), Random(random.nextInt()) ) - val vmWorkload = VmWorkload( + val workload = SimTraceWorkload(flopsFragments.asSequence()) + val vmWorkload = ComputeWorkload( uuid, "VM Workload $vmId", UnnamedUser, - SimWorkloadImage( + Image( uuid, vmId, mapOf( IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems, "cores" to cores, - "required-memory" to requiredMemory - ), - SimTraceWorkload(flopsFragments.asSequence()) + "required-memory" to requiredMemory, + "workload" to workload + ) ) ) entries[uuid] = TraceEntryImpl( @@ -183,7 +184,7 @@ public class Sc20TraceReader( override fun hasNext(): Boolean = iterator.hasNext() - override fun next(): TraceEntry<VmWorkload> = iterator.next() + override fun next(): TraceEntry<ComputeWorkload> = iterator.next() override fun close() {} @@ -200,6 +201,6 @@ public class Sc20TraceReader( */ private data class TraceEntryImpl( override var submissionTime: Long, - override val workload: VmWorkload - ) : TraceEntry<VmWorkload> + override val workload: ComputeWorkload + ) : TraceEntry<ComputeWorkload> } diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt index 80c54354..ead20c35 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt @@ -22,8 +22,8 @@ package org.opendc.format.trace.swf -import org.opendc.compute.core.workload.VmWorkload -import org.opendc.compute.simulator.SimWorkloadImage +import org.opendc.compute.api.ComputeWorkload +import org.opendc.compute.api.Image import org.opendc.core.User import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader @@ -43,17 +43,17 @@ import java.util.* public class SwfTraceReader( file: File, maxNumCores: Int = -1 -) : TraceReader<VmWorkload> { +) : TraceReader<ComputeWorkload> { /** * The internal iterator to use for this reader. */ - private val iterator: Iterator<TraceEntry<VmWorkload>> + private val iterator: Iterator<TraceEntry<ComputeWorkload>> /** * Initialize the reader. */ init { - val entries = mutableMapOf<Long, TraceEntry<VmWorkload>>() + val entries = mutableMapOf<Long, TraceEntry<ComputeWorkload>>() val jobNumberCol = 0 val submitTimeCol = 1 // seconds (begin of trace is 0) @@ -154,18 +154,19 @@ public class SwfTraceReader( } val uuid = UUID(0L, jobNumber) - val vmWorkload = VmWorkload( + val workload = SimTraceWorkload(flopsHistory.asSequence()) + val vmWorkload = ComputeWorkload( uuid, "SWF Workload $jobNumber", UnnamedUser, - SimWorkloadImage( + Image( uuid, jobNumber.toString(), mapOf( "cores" to cores, - "required-memory" to memory - ), - SimTraceWorkload(flopsHistory.asSequence()) + "required-memory" to memory, + "workload" to workload + ) ) ) @@ -179,7 +180,7 @@ public class SwfTraceReader( override fun hasNext(): Boolean = iterator.hasNext() - override fun next(): TraceEntry<VmWorkload> = iterator.next() + override fun next(): TraceEntry<ComputeWorkload> = iterator.next() override fun close() {} @@ -196,6 +197,6 @@ public class SwfTraceReader( */ private data class TraceEntryImpl( override var submissionTime: Long, - override val workload: VmWorkload - ) : TraceEntry<VmWorkload> + override val workload: ComputeWorkload + ) : TraceEntry<ComputeWorkload> } diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt index d7dc09fa..5a271fab 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt @@ -25,7 +25,7 @@ package org.opendc.format.trace.wtf import org.apache.avro.generic.GenericRecord import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetReader -import org.opendc.compute.simulator.SimWorkloadImage +import org.opendc.compute.api.Image import org.opendc.core.User import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader @@ -78,10 +78,17 @@ public class WtfTraceReader(path: String) : TraceReader<Job> { TraceEntryImpl(submitTime, Job(UUID(0L, taskId), "<unnamed>", UnnamedUser, HashSet())) } val workflow = entry.workload + val workload = SimFlopsWorkload(flops) val task = Task( UUID(0L, taskId), "<unnamed>", - SimWorkloadImage(UUID.randomUUID(), "<unnamed>", emptyMap(), SimFlopsWorkload(flops)), + Image( + UUID.randomUUID(), + "<unnamed>", + mapOf( + "workload" to workload + ) + ), HashSet(), mapOf( WORKFLOW_TASK_CORES to cores, diff --git a/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt b/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt index 45c125c4..7e3d2623 100644 --- a/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt +++ b/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt @@ -24,7 +24,6 @@ package org.opendc.format.trace.swf import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test -import org.opendc.compute.simulator.SimWorkloadImage import org.opendc.simulator.compute.workload.SimTraceWorkload import java.io.File @@ -35,12 +34,12 @@ class SwfTraceReaderTest { var entry = reader.next() assertEquals(0, entry.submissionTime) // 1961 slices for waiting, 3 full and 1 partial running slices - assertEquals(1965, ((entry.workload.image as SimWorkloadImage).workload as SimTraceWorkload).trace.toList().size) + assertEquals(1965, (entry.workload.image.tags["workload"] as SimTraceWorkload).trace.toList().size) entry = reader.next() assertEquals(164472, entry.submissionTime) // 1188 slices for waiting, 0 full and 1 partial running slices - assertEquals(1189, ((entry.workload.image as SimWorkloadImage).workload as SimTraceWorkload).trace.toList().size) - assertEquals(0.25, ((entry.workload.image as SimWorkloadImage).workload as SimTraceWorkload).trace.toList().last().usage) + assertEquals(1189, (entry.workload.image.tags["workload"] as SimTraceWorkload).trace.toList().size) + assertEquals(0.25, (entry.workload.image.tags["workload"] as SimTraceWorkload).trace.toList().last().usage) } } diff --git a/simulator/opendc-compute/opendc-compute-core/build.gradle.kts b/simulator/opendc-metal/build.gradle.kts index a1b6ec0f..9207de18 100644 --- a/simulator/opendc-compute/opendc-compute-core/build.gradle.kts +++ b/simulator/opendc-metal/build.gradle.kts @@ -20,7 +20,7 @@ * SOFTWARE. */ -description = "Core implementation of the OpenDC Compute service" +description = "Bare-metal provisioning in OpenDC" /* Build configuration */ plugins { @@ -30,6 +30,7 @@ plugins { dependencies { api(platform(project(":opendc-platform"))) api(project(":opendc-core")) + api(project(":opendc-compute:opendc-compute-api")) api(project(":opendc-trace:opendc-trace-core")) implementation(project(":opendc-utils")) diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/Metadata.kt b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/Metadata.kt index 11eadd87..ca98dab0 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/Metadata.kt +++ b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/Metadata.kt @@ -22,7 +22,7 @@ * SOFTWARE. */ -package org.opendc.compute.core.metal +package org.opendc.metal /* * Common metadata keys for bare-metal nodes. diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/Node.kt b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/Node.kt index 6d9506f1..1c5c7a8d 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/Node.kt +++ b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/Node.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,11 +20,11 @@ * SOFTWARE. */ -package org.opendc.compute.core.metal +package org.opendc.metal import kotlinx.coroutines.flow.Flow -import org.opendc.compute.core.Server -import org.opendc.compute.core.image.Image +import org.opendc.compute.api.Flavor +import org.opendc.compute.api.Image import org.opendc.core.Identity import java.util.UUID @@ -53,14 +53,14 @@ public data class Node( public val state: NodeState, /** - * The boot image of the node. + * The flavor of the node. */ - public val image: Image, + public val flavor: Flavor, /** - * The server instance that is running on the node or `null` if no server is running. + * The boot image of the node. */ - public val server: Server?, + public val image: Image, /** * The events that are emitted by the node. diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/NodeEvent.kt b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/NodeEvent.kt index 4423e2bf..30ce423c 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/NodeEvent.kt +++ b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/NodeEvent.kt @@ -1,7 +1,5 @@ /* - * MIT License - * - * Copyright (c) 2020 atlarge-research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.compute.core.metal +package org.opendc.metal /** * An event that is emitted by a [Node]. diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/NodeState.kt b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/NodeState.kt index bdc4841e..f1d4ea2e 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/NodeState.kt +++ b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/NodeState.kt @@ -22,7 +22,7 @@ * SOFTWARE. */ -package org.opendc.compute.core.metal +package org.opendc.metal /** * An enumeration describing the possible states of a bare-metal compute node. diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/driver/BareMetalDriver.kt b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/driver/BareMetalDriver.kt index 9db57127..3b15be94 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/driver/BareMetalDriver.kt +++ b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/driver/BareMetalDriver.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,13 +20,13 @@ * SOFTWARE. */ -package org.opendc.compute.core.metal.driver +package org.opendc.metal.driver import kotlinx.coroutines.flow.Flow -import org.opendc.compute.core.Server -import org.opendc.compute.core.image.Image -import org.opendc.compute.core.metal.Node +import org.opendc.compute.api.Image +import org.opendc.compute.api.Server import org.opendc.core.services.AbstractServiceKey +import org.opendc.metal.Node import java.util.UUID /** diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/service/ProvisioningService.kt b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/service/ProvisioningService.kt index bad5b47c..6548767e 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/service/ProvisioningService.kt +++ b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/service/ProvisioningService.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,12 +20,12 @@ * SOFTWARE. */ -package org.opendc.compute.core.metal.service +package org.opendc.metal.service -import org.opendc.compute.core.image.Image -import org.opendc.compute.core.metal.Node -import org.opendc.compute.core.metal.driver.BareMetalDriver +import org.opendc.compute.api.Image import org.opendc.core.services.AbstractServiceKey +import org.opendc.metal.Node +import org.opendc.metal.driver.BareMetalDriver import java.util.UUID /** diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/service/SimpleProvisioningService.kt b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/service/SimpleProvisioningService.kt index 5222f2fb..2d6353c8 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/service/SimpleProvisioningService.kt +++ b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/service/SimpleProvisioningService.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,12 +20,12 @@ * SOFTWARE. */ -package org.opendc.compute.core.metal.service +package org.opendc.metal.service import kotlinx.coroutines.CancellationException -import org.opendc.compute.core.image.Image -import org.opendc.compute.core.metal.Node -import org.opendc.compute.core.metal.driver.BareMetalDriver +import org.opendc.compute.api.Image +import org.opendc.metal.Node +import org.opendc.metal.driver.BareMetalDriver /** * A very basic implementation of the [ProvisioningService]. diff --git a/simulator/opendc-platform/build.gradle.kts b/simulator/opendc-platform/build.gradle.kts index 0ae1c72a..ea0591ad 100644 --- a/simulator/opendc-platform/build.gradle.kts +++ b/simulator/opendc-platform/build.gradle.kts @@ -33,5 +33,6 @@ dependencies { api("io.github.microutils:kotlin-logging:${versions.kotlinLogging}") runtime("org.slf4j:slf4j-simple:${versions.slf4j}") + runtime("org.apache.logging.log4j:log4j-slf4j-impl:${versions.log4j}") } } diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt index 533bf321..482fe754 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt @@ -40,6 +40,11 @@ import kotlinx.coroutines.test.TestCoroutineScope import mu.KotlinLogging import org.bson.Document import org.bson.types.ObjectId +import org.opendc.compute.service.scheduler.AvailableCoreMemoryAllocationPolicy +import org.opendc.compute.service.scheduler.AvailableMemoryAllocationPolicy +import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy +import org.opendc.compute.service.scheduler.ProvisionedCoresAllocationPolicy +import org.opendc.compute.service.scheduler.RandomAllocationPolicy import org.opendc.compute.simulator.allocation.* import org.opendc.experiments.capelin.experiment.attachMonitor import org.opendc.experiments.capelin.experiment.createFailureDomain @@ -242,7 +247,7 @@ public class RunnerCli : CliktCommand(name = "runner") { val tracer = EventTracer(clock) testScope.launch { - val (bareMetalProvisioner, scheduler) = createProvisioner( + val (bareMetalProvisioner, provisioner, scheduler) = createProvisioner( this, clock, environment, @@ -281,7 +286,8 @@ public class RunnerCli : CliktCommand(name = "runner") { logger.debug("FINISHED=${scheduler.finishedVms}") failureDomain?.cancel() - scheduler.terminate() + scheduler.close() + provisioner.close() } try { diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt index f43d0869..2f11347d 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt @@ -32,9 +32,6 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.launch import org.bson.Document import org.bson.types.ObjectId -import org.opendc.compute.core.metal.NODE_CLUSTER -import org.opendc.compute.core.metal.service.ProvisioningService -import org.opendc.compute.core.metal.service.SimpleProvisioningService import org.opendc.compute.simulator.SimBareMetalDriver import org.opendc.compute.simulator.power.models.LinearPowerModel import org.opendc.core.Environment @@ -42,6 +39,9 @@ import org.opendc.core.Platform import org.opendc.core.Zone import org.opendc.core.services.ServiceRegistry import org.opendc.format.environment.EnvironmentReader +import org.opendc.metal.NODE_CLUSTER +import org.opendc.metal.service.ProvisioningService +import org.opendc.metal.service.SimpleProvisioningService import org.opendc.simulator.compute.SimMachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt index f16f9b90..fe814c76 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt @@ -23,12 +23,14 @@ package org.opendc.runner.web import mu.KotlinLogging -import org.opendc.compute.core.Server -import org.opendc.compute.core.ServerState -import org.opendc.compute.core.virt.driver.VirtDriver -import org.opendc.compute.core.virt.service.VirtProvisioningEvent +import org.opendc.compute.api.Server +import org.opendc.compute.api.ServerState +import org.opendc.compute.service.ComputeServiceEvent +import org.opendc.compute.service.driver.Host import org.opendc.experiments.capelin.monitor.ExperimentMonitor import org.opendc.experiments.capelin.telemetry.HostEvent +import org.opendc.metal.Node +import org.opendc.metal.NodeState import kotlin.math.max /** @@ -36,10 +38,10 @@ import kotlin.math.max */ public class WebExperimentMonitor : ExperimentMonitor { private val logger = KotlinLogging.logger {} - private val currentHostEvent = mutableMapOf<Server, HostEvent>() + private val currentHostEvent = mutableMapOf<Node, HostEvent>() private var startTime = -1L - override fun reportVmStateChange(time: Long, server: Server) { + override fun reportVmStateChange(time: Long, server: Server, newState: ServerState) { if (startTime < 0) { startTime = time @@ -50,12 +52,12 @@ public class WebExperimentMonitor : ExperimentMonitor { override fun reportHostStateChange( time: Long, - driver: VirtDriver, - server: Server + driver: Host, + host: Node ) { - logger.debug { "Host ${server.uid} changed state ${server.state} [$time]" } + logger.debug { "Host ${host.uid} changed state ${host.state} [$time]" } - val previousEvent = currentHostEvent[server] + val previousEvent = currentHostEvent[host] val roundedTime = previousEvent?.let { val duration = time - it.timestamp @@ -78,13 +80,13 @@ public class WebExperimentMonitor : ExperimentMonitor { 0.0, 0.0, 0, - server + host ) } - private val lastPowerConsumption = mutableMapOf<Server, Double>() + private val lastPowerConsumption = mutableMapOf<Node, Double>() - override fun reportPowerConsumption(host: Server, draw: Double) { + override fun reportPowerConsumption(host: Node, draw: Double) { lastPowerConsumption[host] = draw } @@ -97,16 +99,16 @@ public class WebExperimentMonitor : ExperimentMonitor { cpuUsage: Double, cpuDemand: Double, numberOfDeployedImages: Int, - hostServer: Server, + host: Node, duration: Long ) { - val previousEvent = currentHostEvent[hostServer] + val previousEvent = currentHostEvent[host] when { previousEvent == null -> { val event = HostEvent( time, 5 * 60 * 1000L, - hostServer, + host, numberOfDeployedImages, requestedBurst, grantedBurst, @@ -114,17 +116,17 @@ public class WebExperimentMonitor : ExperimentMonitor { interferedBurst, cpuUsage, cpuDemand, - lastPowerConsumption[hostServer] ?: 200.0, - hostServer.flavor.cpuCount + lastPowerConsumption[host] ?: 200.0, + host.flavor.cpuCount ) - currentHostEvent[hostServer] = event + currentHostEvent[host] = event } previousEvent.timestamp == time -> { val event = HostEvent( time, previousEvent.duration, - hostServer, + host, numberOfDeployedImages, requestedBurst, grantedBurst, @@ -132,11 +134,11 @@ public class WebExperimentMonitor : ExperimentMonitor { interferedBurst, cpuUsage, cpuDemand, - lastPowerConsumption[hostServer] ?: 200.0, - hostServer.flavor.cpuCount + lastPowerConsumption[host] ?: 200.0, + host.flavor.cpuCount ) - currentHostEvent[hostServer] = event + currentHostEvent[host] = event } else -> { processHostEvent(previousEvent) @@ -144,7 +146,7 @@ public class WebExperimentMonitor : ExperimentMonitor { val event = HostEvent( time, time - previousEvent.timestamp, - hostServer, + host, numberOfDeployedImages, requestedBurst, grantedBurst, @@ -152,17 +154,17 @@ public class WebExperimentMonitor : ExperimentMonitor { interferedBurst, cpuUsage, cpuDemand, - lastPowerConsumption[hostServer] ?: 200.0, - hostServer.flavor.cpuCount + lastPowerConsumption[host] ?: 200.0, + host.flavor.cpuCount ) - currentHostEvent[hostServer] = event + currentHostEvent[host] = event } } } private var hostAggregateMetrics: AggregateHostMetrics = AggregateHostMetrics() - private val hostMetrics: MutableMap<Server, HostMetrics> = mutableMapOf() + private val hostMetrics: MutableMap<Node, HostMetrics> = mutableMapOf() private fun processHostEvent(event: HostEvent) { val slices = event.duration / SLICE_LENGTH @@ -173,14 +175,14 @@ public class WebExperimentMonitor : ExperimentMonitor { hostAggregateMetrics.totalOvercommittedBurst + event.overcommissionedBurst, hostAggregateMetrics.totalInterferedBurst + event.interferedBurst, hostAggregateMetrics.totalPowerDraw + (slices * (event.powerDraw / 12)), - hostAggregateMetrics.totalFailureSlices + if (event.host.state != ServerState.ACTIVE) slices.toLong() else 0, - hostAggregateMetrics.totalFailureVmSlices + if (event.host.state != ServerState.ACTIVE) event.vmCount * slices.toLong() else 0 + hostAggregateMetrics.totalFailureSlices + if (event.node.state != NodeState.ACTIVE) slices.toLong() else 0, + hostAggregateMetrics.totalFailureVmSlices + if (event.node.state != NodeState.ACTIVE) event.vmCount * slices.toLong() else 0 ) - hostMetrics.compute(event.host) { key, prev -> + hostMetrics.compute(event.node) { _, prev -> HostMetrics( - (event.cpuUsage.takeIf { event.host.state == ServerState.ACTIVE } ?: 0.0) + (prev?.cpuUsage ?: 0.0), - (event.cpuDemand.takeIf { event.host.state == ServerState.ACTIVE } ?: 0.0) + (prev?.cpuDemand ?: 0.0), + (event.cpuUsage.takeIf { event.node.state == NodeState.ACTIVE } ?: 0.0) + (prev?.cpuUsage ?: 0.0), + (event.cpuDemand.takeIf { event.node.state == NodeState.ACTIVE } ?: 0.0) + (prev?.cpuDemand ?: 0.0), event.vmCount + (prev?.vmCount ?: 0), 1 + (prev?.count ?: 0) ) @@ -208,7 +210,7 @@ public class WebExperimentMonitor : ExperimentMonitor { private var provisionerMetrics: AggregateProvisionerMetrics = AggregateProvisionerMetrics() - override fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) { + override fun reportProvisionerMetrics(time: Long, event: ComputeServiceEvent.MetricsAvailable) { provisionerMetrics = AggregateProvisionerMetrics( max(event.totalVmCount, provisionerMetrics.vmTotalCount), max(event.waitingVmCount, provisionerMetrics.vmWaitingCount), diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt index 812b5f20..f74c5697 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt @@ -84,33 +84,33 @@ public class SimBareMetalMachine( private val scheduler = TimerScheduler<Cpu>(coroutineScope, clock) /** - * The execution context in which the workload runs. - */ - private val ctx = object : SimExecutionContext { - override val machine: SimMachineModel - get() = this@SimBareMetalMachine.model - - override val clock: Clock - get() = this@SimBareMetalMachine.clock - - override fun interrupt(cpu: Int) { - require(cpu < cpus.size) { "Invalid CPU identifier" } - cpus[cpu].interrupt() - } - } - - /** * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. */ - override suspend fun run(workload: SimWorkload) { + override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) { require(!isTerminated) { "Machine is terminated" } require(cont == null) { "Run should not be called concurrently" } + val ctx = object : SimExecutionContext { + override val machine: SimMachineModel + get() = this@SimBareMetalMachine.model + + override val clock: Clock + get() = this@SimBareMetalMachine.clock + + override val meta: Map<String, Any> + get() = meta + + override fun interrupt(cpu: Int) { + require(cpu < cpus.size) { "Invalid CPU identifier" } + cpus[cpu].interrupt() + } + } + workload.onStart(ctx) return suspendCancellableCoroutine { cont -> this.cont = cont - this.cpus = model.cpus.map { Cpu(it, workload) } + this.cpus = model.cpus.map { Cpu(ctx, it, workload) } for (cpu in cpus) { cpu.start() @@ -161,7 +161,7 @@ public class SimBareMetalMachine( /** * A physical CPU of the machine. */ - private inner class Cpu(val model: ProcessingUnit, val workload: SimWorkload) { + private inner class Cpu(val ctx: SimExecutionContext, val model: ProcessingUnit, val workload: SimWorkload) { /** * The current command. */ diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt index c7c3d3cc..657dac66 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt @@ -41,6 +41,11 @@ public interface SimExecutionContext { public val machine: SimMachineModel /** + * The metadata associated with the context. + */ + public val meta: Map<String, Any> + + /** * Ask the host machine to interrupt the specified vCPU. * * @param cpu The id of the vCPU to interrupt. diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt index 5e86d32b..bf6d8a5e 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt @@ -336,33 +336,33 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener private var cpus: List<VCpu> = emptyList() /** - * The execution context in which the workload runs. - */ - val ctx = object : SimExecutionContext { - override val machine: SimMachineModel - get() = model - - override val clock: Clock - get() = this@SimFairShareHypervisor.ctx.clock - - override fun interrupt(cpu: Int) { - require(cpu < cpus.size) { "Invalid CPU identifier" } - cpus[cpu].interrupt() - } - } - - /** * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. */ - override suspend fun run(workload: SimWorkload) { + override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) { require(!isTerminated) { "Machine is terminated" } require(cont == null) { "Run should not be called concurrently" } + val ctx = object : SimExecutionContext { + override val machine: SimMachineModel + get() = model + + override val clock: Clock + get() = this@SimFairShareHypervisor.ctx.clock + + override val meta: Map<String, Any> + get() = meta + + override fun interrupt(cpu: Int) { + require(cpu < cpus.size) { "Invalid CPU identifier" } + cpus[cpu].interrupt() + } + } + workload.onStart(ctx) return suspendCancellableCoroutine { cont -> this.cont = cont - this.cpus = model.cpus.map { VCpu(this, it, workload) } + this.cpus = model.cpus.map { VCpu(this, ctx, it, workload) } for (cpu in cpus) { // Register vCPU to scheduler @@ -417,7 +417,12 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener /** * A CPU of the virtual machine. */ - private inner class VCpu(val vm: SimVm, val model: ProcessingUnit, val workload: SimWorkload) : Comparable<VCpu> { + private inner class VCpu( + val vm: SimVm, + val ctx: SimExecutionContext, + val model: ProcessingUnit, + val workload: SimWorkload + ) : Comparable<VCpu> { /** * The latest command processed by the CPU. */ @@ -488,7 +493,7 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener isIntermediate = true latestFlush = ctx.clock.millis() - process(workload.onStart(vm.ctx, model.id)) + process(workload.onStart(ctx, model.id)) } catch (e: Throwable) { fail(e) } finally { @@ -514,7 +519,7 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener // Act like nothing has happened in case the vCPU did not reach its deadline or was not // interrupted by the user. if (interrupt || command.deadline <= now) { - process(workload.onNext(vm.ctx, model.id, 0.0)) + process(workload.onNext(ctx, model.id, 0.0)) } } is SimResourceCommand.Consume -> { @@ -545,7 +550,7 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener totalOvercommittedWork += remainingWork } - process(workload.onNext(vm.ctx, model.id, remainingWork)) + process(workload.onNext(ctx, model.id, remainingWork)) } else { process(SimResourceCommand.Consume(remainingWork, command.limit, command.deadline)) } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt index ea8eeb37..bfaa60bc 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt @@ -42,7 +42,7 @@ public interface SimMachine : AutoCloseable { /** * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. */ - public suspend fun run(workload: SimWorkload) + public suspend fun run(workload: SimWorkload, meta: Map<String, Any> = emptyMap()) /** * Terminate this machine. diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt index 66d3eda7..778b68ca 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt @@ -117,33 +117,33 @@ public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listen private var cpus: List<VCpu> = emptyList() /** - * The execution context in which the workload runs. - */ - val ctx = object : SimExecutionContext { - override val machine: SimMachineModel - get() = model - - override val clock: Clock - get() = this@SimSpaceSharedHypervisor.ctx.clock - - override fun interrupt(cpu: Int) { - require(cpu < cpus.size) { "Invalid CPU identifier" } - cpus[cpu].interrupt() - } - } - - /** * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. */ - override suspend fun run(workload: SimWorkload) { + override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) { require(!isTerminated) { "Machine is terminated" } require(cont == null) { "Run should not be called concurrently" } + val ctx = object : SimExecutionContext { + override val machine: SimMachineModel + get() = model + + override val clock: Clock + get() = this@SimSpaceSharedHypervisor.ctx.clock + + override val meta: Map<String, Any> + get() = meta + + override fun interrupt(cpu: Int) { + require(cpu < cpus.size) { "Invalid CPU identifier" } + cpus[cpu].interrupt() + } + } + workload.onStart(ctx) return suspendCancellableCoroutine { cont -> this.cont = cont - this.cpus = model.cpus.mapIndexed { index, model -> VCpu(this, model, workload, pCPUs[index]) } + this.cpus = model.cpus.mapIndexed { index, model -> VCpu(this, ctx, model, workload, pCPUs[index]) } for (cpu in cpus) { cpu.start() @@ -193,7 +193,7 @@ public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listen /** * A CPU of the virtual machine. */ - private inner class VCpu(val vm: SimVm, val model: ProcessingUnit, val workload: SimWorkload, val pCPU: Int) { + private inner class VCpu(val vm: SimVm, val ctx: SimExecutionContext, val model: ProcessingUnit, val workload: SimWorkload, val pCPU: Int) { /** * The processing speed of the vCPU. */ @@ -267,7 +267,7 @@ public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listen * Interrupt the CPU. */ fun interrupt() { - ctx.interrupt(pCPU) + this@SimSpaceSharedHypervisor.ctx.interrupt(pCPU) } /** diff --git a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt index 948595b1..10f29f4e 100644 --- a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt +++ b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt @@ -58,12 +58,22 @@ public fun <T> EventFlow(): EventFlow<T> = EventFlowImpl() @OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class) private class EventFlowImpl<T> : EventFlow<T> { private var closed: Boolean = false - private val subscribers = HashMap<SendChannel<T>, Unit>() + private val subscribers = mutableListOf<SendChannel<T>>() override fun emit(event: T) { + if (closed) { + return + } + + val it = subscribers.iterator() synchronized(this) { - for ((chan, _) in subscribers) { - chan.offer(event) + while (it.hasNext()) { + val chan = it.next() + if (chan.isClosedForSend) { + it.remove() + } else { + chan.offer(event) + } } } } @@ -72,9 +82,11 @@ private class EventFlowImpl<T> : EventFlow<T> { synchronized(this) { closed = true - for ((chan, _) in subscribers) { + for (chan in subscribers) { chan.close() } + + subscribers.clear() } } @@ -87,9 +99,13 @@ private class EventFlowImpl<T> : EventFlow<T> { } channel = Channel(Channel.UNLIMITED) - subscribers[channel] = Unit + subscribers.add(channel) + } + try { + channel.consumeAsFlow().collect(collector) + } finally { + channel.close() } - channel.consumeAsFlow().collect(collector) } override fun toString(): String = "EventFlow" diff --git a/simulator/opendc-workflows/build.gradle.kts b/simulator/opendc-workflows/build.gradle.kts index b4ffac7d..b6a2fc45 100644 --- a/simulator/opendc-workflows/build.gradle.kts +++ b/simulator/opendc-workflows/build.gradle.kts @@ -29,15 +29,16 @@ plugins { } dependencies { + api(platform(project(":opendc-platform"))) api(project(":opendc-core")) - api(project(":opendc-compute:opendc-compute-core")) + api(project(":opendc-compute:opendc-compute-api")) api(project(":opendc-trace:opendc-trace-core")) implementation(project(":opendc-utils")) - implementation("io.github.microutils:kotlin-logging:${versions.kotlinLogging}") + implementation("io.github.microutils:kotlin-logging") testImplementation(project(":opendc-simulator:opendc-simulator-core")) testImplementation(project(":opendc-compute:opendc-compute-simulator")) testImplementation(project(":opendc-format")) testImplementation("com.fasterxml.jackson.module:jackson-module-kotlin:${versions["jackson-module-kotlin"]}") - testRuntimeOnly("org.slf4j:slf4j-simple:${versions.slf4j}") + testRuntimeOnly("org.apache.logging.log4j:log4j-slf4j-impl") } diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt index e04c8a4c..6b348ed4 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt @@ -25,16 +25,10 @@ package org.opendc.workflows.service import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.map -import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import mu.KotlinLogging -import org.opendc.compute.core.Flavor -import org.opendc.compute.core.Server -import org.opendc.compute.core.ServerEvent -import org.opendc.compute.core.ServerState -import org.opendc.compute.core.virt.service.VirtProvisioningService +import org.opendc.compute.api.* import org.opendc.trace.core.EventTracer import org.opendc.trace.core.consumeAsFlow import org.opendc.trace.core.enable @@ -55,13 +49,13 @@ public class StageWorkflowService( internal val coroutineScope: CoroutineScope, internal val clock: Clock, internal val tracer: EventTracer, - private val provisioningService: VirtProvisioningService, + private val computeClient: ComputeClient, mode: WorkflowSchedulerMode, jobAdmissionPolicy: JobAdmissionPolicy, jobOrderPolicy: JobOrderPolicy, taskEligibilityPolicy: TaskEligibilityPolicy, taskOrderPolicy: TaskOrderPolicy -) : WorkflowService { +) : WorkflowService, ServerWatcher { /** * The logger instance to use. */ @@ -103,12 +97,6 @@ public class StageWorkflowService( internal val taskByServer = mutableMapOf<Server, TaskState>() /** - * The load of the system. - */ - internal val load: Double - get() = (activeTasks.size / provisioningService.hostCount.toDouble()) - - /** * The root listener of this scheduler. */ private val rootListener = object : StageWorkflowSchedulerListener { @@ -205,7 +193,7 @@ public class StageWorkflowService( /** * Indicate to the scheduler that a scheduling cycle is needed. */ - private suspend fun requestCycle() = mode.requestCycle() + private fun requestCycle() = mode.requestCycle() /** * Perform a scheduling cycle immediately. @@ -273,15 +261,13 @@ public class StageWorkflowService( val flavor = Flavor(cores, 1000) // TODO How to determine memory usage for workflow task val image = instance.task.image coroutineScope.launch { - val server = provisioningService.deploy(instance.task.name, image, flavor) + val server = computeClient.newServer(instance.task.name, image, flavor) instance.state = TaskStatus.ACTIVE instance.server = server taskByServer[server] = instance - server.events - .onEach { event -> if (event is ServerEvent.StateChanged) stateChanged(event.server) } - .launchIn(coroutineScope) + server.watch(this@StageWorkflowService) } activeTasks += instance @@ -290,8 +276,8 @@ public class StageWorkflowService( } } - private suspend fun stateChanged(server: Server) { - when (server.state) { + public override fun onStateChanged(server: Server, newState: ServerState) { + when (newState) { ServerState.ACTIVE -> { val task = taskByServer.getValue(server) task.startedAt = clock.millis() diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt index d1eb6704..ef9714c2 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt @@ -22,7 +22,7 @@ package org.opendc.workflows.service -import org.opendc.compute.core.Server +import org.opendc.compute.api.Server import org.opendc.workflows.workload.Task public class TaskState(public val job: JobState, public val task: Task) { diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt index d03adc61..cf8f92e0 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt @@ -24,7 +24,6 @@ package org.opendc.workflows.service import kotlinx.coroutines.delay import kotlinx.coroutines.launch -import kotlinx.coroutines.yield import org.opendc.workflows.service.stage.StagePolicy /** @@ -38,7 +37,7 @@ public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Lo /** * Request a new scheduling cycle to be performed. */ - public suspend fun requestCycle() + public fun requestCycle() } /** @@ -46,9 +45,8 @@ public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Lo */ public object Interactive : WorkflowSchedulerMode() { override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic { - override suspend fun requestCycle() { - yield() - scheduler.schedule() + override fun requestCycle() { + scheduler.coroutineScope.launch { scheduler.schedule() } } } @@ -62,7 +60,7 @@ public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Lo private var next: kotlinx.coroutines.Job? = null override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic { - override suspend fun requestCycle() { + override fun requestCycle() { if (next == null) { // In batch mode, we assume that the scheduler runs at a fixed slot every time // quantum (e.g t=0, t=60, t=120). We calculate here the delay until the next scheduling slot. @@ -88,7 +86,7 @@ public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Lo private var next: kotlinx.coroutines.Job? = null override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic { - override suspend fun requestCycle() { + override fun requestCycle() { if (next == null) { val delay = random.nextInt(200).toLong() diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/LoadJobAdmissionPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/LoadJobAdmissionPolicy.kt deleted file mode 100644 index 4f0c269a..00000000 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/LoadJobAdmissionPolicy.kt +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.workflows.service.stage.job - -import org.opendc.workflows.service.JobState -import org.opendc.workflows.service.StageWorkflowService - -/** - * A [JobAdmissionPolicy] that limits the amount of jobs based on the average system load. - * - * @property limit The maximum load before stopping admission. - */ -public data class LoadJobAdmissionPolicy(public val limit: Double) : JobAdmissionPolicy { - override fun invoke(scheduler: StageWorkflowService): JobAdmissionPolicy.Logic = object : JobAdmissionPolicy.Logic { - override fun invoke( - job: JobState - ): JobAdmissionPolicy.Advice = - if (scheduler.load < limit) - JobAdmissionPolicy.Advice.ADMIT - else - JobAdmissionPolicy.Advice.STOP - } - - override fun toString(): String = "Limit-Load($limit)" -} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt deleted file mode 100644 index a80a8c63..00000000 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.workflows.service.stage.task - -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.service.TaskState - -/** - * A [TaskEligibilityPolicy] that limits the number of active tasks in the system based on the average system load. - */ -public data class LoadTaskEligibilityPolicy(val limit: Double) : TaskEligibilityPolicy { - override fun invoke(scheduler: StageWorkflowService): TaskEligibilityPolicy.Logic = object : TaskEligibilityPolicy.Logic { - override fun invoke( - task: TaskState - ): TaskEligibilityPolicy.Advice = - if (scheduler.load < limit) - TaskEligibilityPolicy.Advice.ADMIT - else - TaskEligibilityPolicy.Advice.STOP - } - - override fun toString(): String = "Limit-Load($limit)" -} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Task.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Task.kt index 1834a4c8..4c6d2842 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Task.kt +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Task.kt @@ -24,7 +24,7 @@ package org.opendc.workflows.workload -import org.opendc.compute.core.image.Image +import org.opendc.compute.api.Image import org.opendc.core.Identity import java.util.* diff --git a/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt b/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt index 2bfcba35..4207cdfd 100644 --- a/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt +++ b/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt @@ -36,11 +36,12 @@ import org.junit.jupiter.api.Assertions.assertNotEquals import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll -import org.opendc.compute.core.metal.service.ProvisioningService -import org.opendc.compute.simulator.SimVirtProvisioningService -import org.opendc.compute.simulator.allocation.NumberOfActiveServersAllocationPolicy +import org.opendc.compute.service.ComputeService +import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy +import org.opendc.compute.simulator.SimHostProvisioner import org.opendc.format.environment.sc18.Sc18EnvironmentReader import org.opendc.format.trace.gwf.GwfTraceReader +import org.opendc.metal.service.ProvisioningService import org.opendc.simulator.compute.SimSpaceSharedHypervisorProvider import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.trace.core.EventTracer @@ -80,7 +81,11 @@ internal class StageWorkflowSchedulerIntegrationTest { // Wait for the bare metal nodes to be spawned delay(10) - val provisioner = SimVirtProvisioningService(testScope, clock, bareMetal, NumberOfActiveServersAllocationPolicy(), tracer, SimSpaceSharedHypervisorProvider(), schedulingQuantum = 1000) + val provisioner = SimHostProvisioner(testScope.coroutineContext, bareMetal, SimSpaceSharedHypervisorProvider()) + val hosts = provisioner.provisionAll() + val compute = ComputeService(testScope.coroutineContext, clock, tracer, NumberOfActiveServersAllocationPolicy(), schedulingQuantum = 1000) + + hosts.forEach { compute.addHost(it) } // Wait for the hypervisors to be spawned delay(10) @@ -89,7 +94,7 @@ internal class StageWorkflowSchedulerIntegrationTest { testScope, clock, tracer, - provisioner, + compute.newClient(), mode = WorkflowSchedulerMode.Batch(100), jobAdmissionPolicy = NullJobAdmissionPolicy, jobOrderPolicy = SubmissionTimeJobOrderPolicy(), diff --git a/simulator/opendc-workflows/src/test/resources/log4j2.xml b/simulator/opendc-workflows/src/test/resources/log4j2.xml new file mode 100644 index 00000000..70a0eacc --- /dev/null +++ b/simulator/opendc-workflows/src/test/resources/log4j2.xml @@ -0,0 +1,35 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Copyright (c) 2021 AtLarge Research + ~ + ~ Permission is hereby granted, free of charge, to any person obtaining a copy + ~ of this software and associated documentation files (the "Software"), to deal + ~ in the Software without restriction, including without limitation the rights + ~ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + ~ copies of the Software, and to permit persons to whom the Software is + ~ furnished to do so, subject to the following conditions: + ~ + ~ The above copyright notice and this permission notice shall be included in all + ~ copies or substantial portions of the Software. + ~ + ~ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + ~ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + ~ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + ~ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + ~ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + ~ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + ~ SOFTWARE. + --> + +<Configuration status="WARN" packages="org.apache.logging.log4j.core"> + <Appenders> + <Console name="Console" target="SYSTEM_OUT"> + <PatternLayout pattern="%d{HH:mm:ss.SSS} [%highlight{%-5level}] %logger{36} - %msg%n" disableAnsi="false"/> + </Console> + </Appenders> + <Loggers> + <Root level="warn"> + <AppenderRef ref="Console"/> + </Root> + </Loggers> +</Configuration> diff --git a/simulator/settings.gradle.kts b/simulator/settings.gradle.kts index 77d78318..7a82adcd 100644 --- a/simulator/settings.gradle.kts +++ b/simulator/settings.gradle.kts @@ -23,8 +23,10 @@ rootProject.name = "opendc-simulator" include(":opendc-platform") include(":opendc-core") -include(":opendc-compute:opendc-compute-core") +include(":opendc-compute:opendc-compute-api") +include(":opendc-compute:opendc-compute-service") include(":opendc-compute:opendc-compute-simulator") +include(":opendc-metal") include(":opendc-workflows") include(":opendc-format") include(":opendc-experiments:opendc-experiments-sc18") |
