diff options
103 files changed, 2019 insertions, 1730 deletions
diff --git a/simulator/opendc-compute/opendc-compute-api/build.gradle.kts b/simulator/opendc-compute/opendc-compute-api/build.gradle.kts new file mode 100644 index 00000000..10046322 --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-api/build.gradle.kts @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "API interface for the OpenDC Compute service" + +/* Build configuration */ +plugins { + `kotlin-library-conventions` +} + +dependencies { + api(platform(project(":opendc-platform"))) + api(project(":opendc-core")) +} diff --git a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt new file mode 100644 index 00000000..025513e6 --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.api + +/** + * A client interface for the OpenDC Compute service. + */ +public interface ComputeClient : AutoCloseable { + /** + * Create a new [Server] instance at this compute service. + * + * @param name The name of the server to deploy. + * @param image The image to be deployed. + * @param flavor The flavor of the machine instance to run this [image] on. + */ + public suspend fun newServer( + name: String, + image: Image, + flavor: Flavor + ): Server + + /** + * Release the resources associated with this client, preventing any further API calls. + */ + public override fun close() +} diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/workload/VmWorkload.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeWorkload.kt index 6c724277..64a47277 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/workload/VmWorkload.kt +++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeWorkload.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,9 +20,8 @@ * SOFTWARE. */ -package org.opendc.compute.core.workload +package org.opendc.compute.api -import org.opendc.compute.core.image.Image import org.opendc.core.User import org.opendc.core.workload.Workload import java.util.UUID @@ -35,13 +34,13 @@ import java.util.UUID * @property owner The owner of the VM. * @property image The image of the VM. */ -public data class VmWorkload( +public data class ComputeWorkload( override val uid: UUID, override val name: String, override val owner: User, val image: Image ) : Workload { - override fun equals(other: Any?): Boolean = other is VmWorkload && uid == other.uid + override fun equals(other: Any?): Boolean = other is ComputeWorkload && uid == other.uid override fun hashCode(): Int = uid.hashCode() } diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/Flavor.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Flavor.kt index e5ca115f..bf5f0ce4 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/Flavor.kt +++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Flavor.kt @@ -1,7 +1,5 @@ /* - * MIT License - * - * Copyright (c) 2020 atlarge-research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.compute.core +package org.opendc.compute.api /** * Flavors define the compute and memory capacity of [Server] instance. To put it simply, a flavor is an available diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/image/Image.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Image.kt index e481fcc3..280c4d89 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/image/Image.kt +++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Image.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,9 +20,11 @@ * SOFTWARE. */ -package org.opendc.compute.core.image +package org.opendc.compute.api import org.opendc.core.resource.Resource +import org.opendc.core.resource.TagContainer +import java.util.* /** * An image containing a bootable operating system that can directly be executed by physical or virtual server. @@ -32,4 +34,15 @@ import org.opendc.core.resource.Resource * useful for backup purposes or for producing “gold” server images if you plan to deploy a particular server * configuration frequently. */ -public interface Image : Resource +public data class Image( + public override val uid: UUID, + public override val name: String, + public override val tags: TagContainer +) : Resource { + public companion object { + /** + * An empty boot disk [Image] that exits immediately on start. + */ + public val EMPTY: Image = Image(UUID.randomUUID(), "empty", emptyMap()) + } +} diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/Server.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Server.kt index 948f622f..ab1eb860 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/Server.kt +++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Server.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,59 +20,55 @@ * SOFTWARE. */ -package org.opendc.compute.core +package org.opendc.compute.api -import kotlinx.coroutines.flow.Flow -import org.opendc.compute.core.image.Image import org.opendc.core.resource.Resource -import org.opendc.core.resource.TagContainer -import org.opendc.core.services.ServiceRegistry -import java.util.UUID /** - * A server instance that is running on some physical or virtual machine. + * A stateful object representing a server instance that is running on some physical or virtual machine. */ -public data class Server( +public interface Server : Resource { /** - * The unique identifier of the server. + * The name of the server. */ - public override val uid: UUID, + public override val name: String /** - * The optional name of the server. + * The flavor of the server. */ - public override val name: String, + public val flavor: Flavor /** - * The tags of this server. + * The image of the server. */ - public override val tags: TagContainer, + public val image: Image /** - * The hardware configuration of the server. + * The tags assigned to the server. */ - public val flavor: Flavor, + public override val tags: Map<String, String> /** - * The image running on the server. + * The last known state of the server. */ - public val image: Image, + public val state: ServerState /** - * The last known state of the server. + * Register the specified [ServerWatcher] to watch the state of the server. + * + * @param watcher The watcher to register for the server. */ - public val state: ServerState, + public fun watch(watcher: ServerWatcher) /** - * The services published by this server. + * De-register the specified [ServerWatcher] from the server to stop it from receiving events. + * + * @param watcher The watcher to de-register from the server. */ - public val services: ServiceRegistry, + public fun unwatch(watcher: ServerWatcher) /** - * The events that are emitted by the server. + * Refresh the local state of the resource. */ - public val events: Flow<ServerEvent> -) : Resource { - override fun hashCode(): Int = uid.hashCode() - override fun equals(other: Any?): Boolean = other is Server && uid == other.uid + public suspend fun refresh() } diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/ServerState.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerState.kt index 4b9d7c13..25d2e519 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/ServerState.kt +++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerState.kt @@ -1,7 +1,5 @@ /* - * MIT License - * - * Copyright (c) 2020 atlarge-research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.compute.core +package org.opendc.compute.api /** * An enumeration describing the possible states of a server. diff --git a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerWatcher.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerWatcher.kt new file mode 100644 index 00000000..48a17b30 --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerWatcher.kt @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.api + +/** + * An interface used to watch the state of [Server] instances. + */ +public interface ServerWatcher { + /** + * This method is invoked when the state of a [Server] changes. + * + * Note that the state of [server] might not reflect the state as reported by the invocation, as a call to + * [Server.refresh] is required to update its state. + * + * @param server The server whose state has changed. + * @param newState The new state of the server. + */ + public fun onStateChanged(server: Server, newState: ServerState) {} +} diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/Hypervisor.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/Hypervisor.kt deleted file mode 100644 index 1ae52baa..00000000 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/Hypervisor.kt +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.core.virt - -import kotlinx.coroutines.flow.Flow -import org.opendc.core.Identity -import java.util.UUID - -/** - * A hypervisor (or virtual machine monitor) is software or firmware that virtualizes the host compute environment - * into several virtual guest machines. - */ -public class Hypervisor( - /** - * The unique identifier of the hypervisor. - */ - override val uid: UUID, - - /** - * The optional name of the hypervisor. - */ - override val name: String, - - /** - * Metadata of the hypervisor. - */ - public val metadata: Map<String, Any>, - - /** - * The events that are emitted by the hypervisor. - */ - public val events: Flow<HypervisorEvent> -) : Identity { - override fun hashCode(): Int = uid.hashCode() - override fun equals(other: Any?): Boolean = other is Hypervisor && uid == other.uid -} diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/InsufficientMemoryOnServerException.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/InsufficientMemoryOnServerException.kt deleted file mode 100644 index 6fe84ea6..00000000 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/InsufficientMemoryOnServerException.kt +++ /dev/null @@ -1,3 +0,0 @@ -package org.opendc.compute.core.virt.driver - -public class InsufficientMemoryOnServerException : IllegalStateException("Insufficient memory left on server.") diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/VirtDriver.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/VirtDriver.kt deleted file mode 100644 index 68cc7b50..00000000 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/VirtDriver.kt +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.core.virt.driver - -import kotlinx.coroutines.flow.Flow -import org.opendc.compute.core.Flavor -import org.opendc.compute.core.Server -import org.opendc.compute.core.image.Image -import org.opendc.compute.core.virt.HypervisorEvent -import org.opendc.core.services.AbstractServiceKey -import java.util.UUID - -/** - * A driver interface for a hypervisor running on some host server and communicating with the central compute service to - * provide virtualization for that particular resource. - */ -public interface VirtDriver { - /** - * The events emitted by the driver. - */ - public val events: Flow<HypervisorEvent> - - /** - * Determine whether the specified [flavor] can still fit on this driver. - */ - public fun canFit(flavor: Flavor): Boolean - - /** - * Spawn the given [Image] on the compute resource of this driver. - * - * @param name The name of the server to spawn. - * @param image The image to deploy. - * @param flavor The flavor of the server which this driver is controlling. - * @return The virtual server spawned by this method. - */ - public suspend fun spawn( - name: String, - image: Image, - flavor: Flavor - ): Server - - public companion object Key : AbstractServiceKey<VirtDriver>(UUID.randomUUID(), "virtual-driver") -} diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/VirtProvisioningService.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/VirtProvisioningService.kt deleted file mode 100644 index 3d722110..00000000 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/VirtProvisioningService.kt +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.core.virt.service - -import kotlinx.coroutines.flow.Flow -import org.opendc.compute.core.Server -import org.opendc.compute.core.image.Image -import org.opendc.compute.core.virt.driver.VirtDriver - -/** - * A service for VM provisioning on a cloud. - */ -public interface VirtProvisioningService { - /** - * The events emitted by the service. - */ - public val events: Flow<VirtProvisioningEvent> - - /** - * Obtain the active hypervisors for this provisioner. - */ - public suspend fun drivers(): Set<VirtDriver> - - /** - * The number of hosts available in the system. - */ - public val hostCount: Int - - /** - * Submit the specified [Image] to the provisioning service. - * - * @param name The name of the server to deploy. - * @param image The image to be deployed. - * @param flavor The flavor of the machine instance to run this [image] on. - */ - public suspend fun deploy( - name: String, - image: Image, - flavor: org.opendc.compute.core.Flavor - ): Server - - /** - * Terminate the provisioning service releasing all the leased bare-metal machines. - */ - public suspend fun terminate() -} diff --git a/simulator/opendc-compute/opendc-compute-service/build.gradle.kts b/simulator/opendc-compute/opendc-compute-service/build.gradle.kts new file mode 100644 index 00000000..1b09ef6d --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-service/build.gradle.kts @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "OpenDC Compute Service implementation" + +/* Build configuration */ +plugins { + `kotlin-library-conventions` + `testing-conventions` +} + +dependencies { + api(platform(project(":opendc-platform"))) + api(project(":opendc-compute:opendc-compute-api")) + api(project(":opendc-trace:opendc-trace-core")) + implementation(project(":opendc-utils")) + implementation("io.github.microutils:kotlin-logging") + + testImplementation(project(":opendc-simulator:opendc-simulator-core")) + testRuntimeOnly("org.slf4j:slf4j-simple:${versions.slf4j}") +} diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt new file mode 100644 index 00000000..593e4b56 --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service + +import kotlinx.coroutines.flow.Flow +import org.opendc.compute.api.ComputeClient +import org.opendc.compute.service.driver.Host +import org.opendc.compute.service.internal.ComputeServiceImpl +import org.opendc.compute.service.scheduler.AllocationPolicy +import org.opendc.trace.core.EventTracer +import java.time.Clock +import kotlin.coroutines.CoroutineContext + +/** + * The [ComputeService] hosts the API implementation of the OpenDC Compute service. + */ +public interface ComputeService : AutoCloseable { + /** + * The events emitted by the service. + */ + public val events: Flow<ComputeServiceEvent> + + /** + * The hosts that are used by the compute service. + */ + public val hosts: Set<Host> + + /** + * The number of hosts available in the system. + */ + public val hostCount: Int + + /** + * Create a new [ComputeClient] to control the compute service. + */ + public fun newClient(): ComputeClient + + /** + * Add a [host] to the scheduling pool of the compute service. + */ + public fun addHost(host: Host) + + /** + * Remove a [host] from the scheduling pool of the compute service. + */ + public fun removeHost(host: Host) + + /** + * Terminate the lifecycle of the compute service, stopping all running instances. + */ + public override fun close() + + public companion object { + /** + * Construct a new [ComputeService] implementation. + * + * @param context The [CoroutineContext] to use in the service. + * @param clock The clock instance to use. + * @param tracer The event tracer to use. + * @param allocationPolicy The allocation policy to use. + */ + public operator fun invoke( + context: CoroutineContext, + clock: Clock, + tracer: EventTracer, + allocationPolicy: AllocationPolicy, + schedulingQuantum: Long = 300000, + ): ComputeService { + return ComputeServiceImpl(context, clock, tracer, allocationPolicy, schedulingQuantum) + } + } +} diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/VirtProvisioningEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeServiceEvent.kt index abd2fc95..193008a7 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/VirtProvisioningEvent.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeServiceEvent.kt @@ -1,7 +1,5 @@ /* - * MIT License - * - * Copyright (c) 2020 atlarge-research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,22 +20,22 @@ * SOFTWARE. */ -package org.opendc.compute.core.virt.service +package org.opendc.compute.service /** - * An event that is emitted by the [VirtProvisioningService]. + * An event that is emitted by the [ComputeService]. */ -public sealed class VirtProvisioningEvent { +public sealed class ComputeServiceEvent { /** * The service that has emitted the event. */ - public abstract val provisioner: VirtProvisioningService + public abstract val provisioner: ComputeService /** * An event emitted for writing metrics. */ public data class MetricsAvailable( - override val provisioner: VirtProvisioningService, + override val provisioner: ComputeService, public val totalHostCount: Int, public val availableHostCount: Int, public val totalVmCount: Int, @@ -45,5 +43,5 @@ public sealed class VirtProvisioningEvent { public val inactiveVmCount: Int, public val waitingVmCount: Int, public val failedVmCount: Int - ) : VirtProvisioningEvent() + ) : ComputeServiceEvent() } diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt new file mode 100644 index 00000000..2cd91144 --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service.driver + +import kotlinx.coroutines.flow.Flow +import org.opendc.compute.api.Server +import java.util.* + +/** + * Base interface for representing compute resources that host virtualized [Server] instances. + */ +public interface Host { + /** + * A unique identifier representing the host. + */ + public val uid: UUID + + /** + * The name of this host. + */ + public val name: String + + /** + * The machine model of the host. + */ + public val model: HostModel + + /** + * The state of the host. + */ + public val state: HostState + + /** + * The events emitted by the driver. + */ + public val events: Flow<HostEvent> + + /** + * Determine whether the specified [instance][server] can still fit on this host. + */ + public fun canFit(server: Server): Boolean + + /** + * Register the specified [instance][server] on the host. + * + * Once the method returns, the instance should be running if [start] is true or else the instance should be + * stopped. + */ + public suspend fun spawn(server: Server, start: Boolean = true) + + /** + * Determine whether the specified [instance][server] exists on the host. + */ + public operator fun contains(server: Server): Boolean + + /** + * Stat the server [instance][server] if it is currently not running on this host. + * + * @throws IllegalArgumentException if the server is not present on the host. + */ + public suspend fun start(server: Server) + + /** + * Stop the server [instance][server] if it is currently running on this host. + * + * @throws IllegalArgumentException if the server is not present on the host. + */ + public suspend fun stop(server: Server) + + /** + * Terminate the specified [instance][server] on this host and cleanup all resources associated with it. + */ + public suspend fun terminate(server: Server) + + /** + * Add a [HostListener] to this host. + */ + public fun addListener(listener: HostListener) + + /** + * Remove a [HostListener] from this host. + */ + public fun removeListener(listener: HostListener) +} diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HypervisorEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostEvent.kt index 9fb437de..97350679 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HypervisorEvent.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostEvent.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,19 +20,16 @@ * SOFTWARE. */ -package org.opendc.compute.core.virt - -import org.opendc.compute.core.Server -import org.opendc.compute.core.virt.driver.VirtDriver +package org.opendc.compute.service.driver /** - * An event that is emitted by a [VirtDriver]. + * An event that is emitted by a [Host]. */ -public sealed class HypervisorEvent { +public sealed class HostEvent { /** * The driver that emitted the event. */ - public abstract val driver: VirtDriver + public abstract val driver: Host /** * This event is emitted when the number of active servers on the server managed by this driver is updated. @@ -42,10 +39,10 @@ public sealed class HypervisorEvent { * @property availableMemory The available memory, in MB. */ public data class VmsUpdated( - override val driver: VirtDriver, + override val driver: Host, public val numberOfActiveServers: Int, public val availableMemory: Long - ) : HypervisorEvent() + ) : HostEvent() /** * This event is emitted when a slice is finished. @@ -63,7 +60,7 @@ public sealed class HypervisorEvent { * @property numberOfDeployedImages The number of images deployed on this hypervisor. */ public data class SliceFinished( - override val driver: VirtDriver, + override val driver: Host, public val requestedBurst: Long, public val grantedBurst: Long, public val overcommissionedBurst: Long, @@ -71,6 +68,5 @@ public sealed class HypervisorEvent { public val cpuUsage: Double, public val cpuDemand: Double, public val numberOfDeployedImages: Int, - public val hostServer: Server - ) : HypervisorEvent() + ) : HostEvent() } diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/ServerEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostListener.kt index e9212832..f076cae3 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/ServerEvent.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostListener.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,22 +20,22 @@ * SOFTWARE. */ -package org.opendc.compute.core +package org.opendc.compute.service.driver + +import org.opendc.compute.api.Server +import org.opendc.compute.api.ServerState /** - * An event that is emitted by a [Server]. + * Listener interface for events originating from a [Host]. */ -public sealed class ServerEvent { +public interface HostListener { /** - * The server that emitted the event. + * This method is invoked when the state of an [instance][server] on [host] changes. */ - public abstract val server: Server + public fun onStateChanged(host: Host, server: Server, newState: ServerState) {} /** - * This event is emitted when the state of [server] changes. - * - * @property server The server of which the state changed. - * @property previousState The previous state of the server. + * This method is invoked when the state of a [Host] has changed. */ - public data class StateChanged(override val server: Server, val previousState: ServerState) : ServerEvent() + public fun onStateChanged(host: Host, newState: HostState) {} } diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt new file mode 100644 index 00000000..5632a55e --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service.driver + +/** + * Describes the static machine properties of the host. + * + * @property vcpuCount The number of logical processing cores available for this host. + * @property memorySize The amount of memory available for this host in MB. + */ +public data class HostModel(public val cpuCount: Int, public val memorySize: Long) diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostState.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostState.kt new file mode 100644 index 00000000..6d85ee2d --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostState.kt @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service.driver + +/** + * The state of a host. + */ +public enum class HostState { + /** + * The host is up. + */ + UP, + + /** + * The host is down. + */ + DOWN +} diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/HypervisorAvailableEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/HypervisorAvailableEvent.kt index c1802e64..a7974062 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/HypervisorAvailableEvent.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/HypervisorAvailableEvent.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.compute.core.virt.service.events +package org.opendc.compute.service.events import org.opendc.trace.core.Event import java.util.* diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/HypervisorUnavailableEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/HypervisorUnavailableEvent.kt index 1fea21ea..75bb09ed 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/HypervisorUnavailableEvent.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/HypervisorUnavailableEvent.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.compute.core.virt.service.events +package org.opendc.compute.service.events import org.opendc.trace.core.Event import java.util.* diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmScheduledEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmScheduledEvent.kt index 662068dd..f59c74b7 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmScheduledEvent.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmScheduledEvent.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.compute.core.virt.service.events +package org.opendc.compute.service.events import org.opendc.trace.core.Event diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmStoppedEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmStoppedEvent.kt index 96103129..eaf0736b 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmStoppedEvent.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmStoppedEvent.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.compute.core.virt.service.events +package org.opendc.compute.service.events import org.opendc.trace.core.Event diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmSubmissionEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmSubmissionEvent.kt index f6b71e22..fa0a8a13 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmSubmissionEvent.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmSubmissionEvent.kt @@ -20,10 +20,10 @@ * SOFTWARE. */ -package org.opendc.compute.core.virt.service.events +package org.opendc.compute.service.events -import org.opendc.compute.core.Flavor -import org.opendc.compute.core.image.Image +import org.opendc.compute.api.Flavor +import org.opendc.compute.api.Image import org.opendc.trace.core.Event /** diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmSubmissionInvalidEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmSubmissionInvalidEvent.kt index d0e5c102..52b91616 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmSubmissionInvalidEvent.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmSubmissionInvalidEvent.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.compute.core.virt.service.events +package org.opendc.compute.service.events import org.opendc.trace.core.Event diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt new file mode 100644 index 00000000..f84b7435 --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service.internal + +import org.opendc.compute.api.Flavor +import org.opendc.compute.api.Image +import org.opendc.compute.api.Server +import org.opendc.compute.api.ServerState +import org.opendc.compute.api.ServerWatcher +import java.util.* + +/** + * A [Server] implementation that is passed to clients but delegates its implementation to another class. + */ +internal class ClientServer(private val delegate: Server) : Server, ServerWatcher { + private val watchers = mutableListOf<ServerWatcher>() + + override val uid: UUID = delegate.uid + + override var name: String = delegate.name + private set + + override var flavor: Flavor = delegate.flavor + private set + + override var image: Image = delegate.image + private set + + override var tags: Map<String, String> = delegate.tags.toMap() + private set + + override var state: ServerState = delegate.state + private set + + override fun watch(watcher: ServerWatcher) { + if (watchers.isEmpty()) { + delegate.watch(this) + } + + watchers += watcher + } + + override fun unwatch(watcher: ServerWatcher) { + watchers += watcher + + if (watchers.isEmpty()) { + delegate.unwatch(this) + } + } + + override suspend fun refresh() { + name = delegate.name + flavor = delegate.flavor + image = delegate.image + tags = delegate.tags + state = delegate.state + } + + override fun onStateChanged(server: Server, newState: ServerState) { + val watchers = watchers + + for (watcher in watchers) { + watcher.onStateChanged(this, newState) + } + } +} diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt new file mode 100644 index 00000000..69d6bb59 --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -0,0 +1,414 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service.internal + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.cancel +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.launch +import kotlinx.coroutines.suspendCancellableCoroutine +import mu.KotlinLogging +import org.opendc.compute.api.* +import org.opendc.compute.service.ComputeService +import org.opendc.compute.service.ComputeServiceEvent +import org.opendc.compute.service.driver.Host +import org.opendc.compute.service.driver.HostListener +import org.opendc.compute.service.driver.HostState +import org.opendc.compute.service.events.* +import org.opendc.compute.service.scheduler.AllocationPolicy +import org.opendc.trace.core.EventTracer +import org.opendc.utils.TimerScheduler +import org.opendc.utils.flow.EventFlow +import java.time.Clock +import java.util.* +import kotlin.coroutines.Continuation +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.resume +import kotlin.math.max + +/** + * Internal implementation of the OpenDC Compute service. + * + * @param context The [CoroutineContext] to use. + * @param clock The clock instance to keep track of time. + */ +public class ComputeServiceImpl( + private val context: CoroutineContext, + private val clock: Clock, + private val tracer: EventTracer, + private val allocationPolicy: AllocationPolicy, + private val schedulingQuantum: Long +) : ComputeService, HostListener { + /** + * The [CoroutineScope] of the service bounded by the lifecycle of the service. + */ + private val scope = CoroutineScope(context) + + /** + * The logger instance of this server. + */ + private val logger = KotlinLogging.logger {} + + /** + * The [Random] instance used to generate unique identifiers for the objects. + */ + private val random = Random(0) + + /** + * A mapping from host to host view. + */ + private val hostToView = mutableMapOf<Host, HostView>() + + /** + * The available hypervisors. + */ + private val availableHosts: MutableSet<HostView> = mutableSetOf() + + /** + * The servers that should be launched by the service. + */ + private val queue: Deque<LaunchRequest> = ArrayDeque() + + /** + * The active servers in the system. + */ + private val activeServers: MutableSet<Server> = mutableSetOf() + + public var submittedVms: Int = 0 + public var queuedVms: Int = 0 + public var runningVms: Int = 0 + public var finishedVms: Int = 0 + public var unscheduledVms: Int = 0 + + private var maxCores = 0 + private var maxMemory = 0L + + /** + * The allocation logic to use. + */ + private val allocationLogic = allocationPolicy() + + override val events: Flow<ComputeServiceEvent> + get() = _events + private val _events = EventFlow<ComputeServiceEvent>() + + /** + * The [TimerScheduler] to use for scheduling the scheduler cycles. + */ + private var scheduler: TimerScheduler<Unit> = TimerScheduler(scope, clock) + + override val hosts: Set<Host> + get() = hostToView.keys + + override val hostCount: Int + get() = hostToView.size + + override fun newClient(): ComputeClient = object : ComputeClient { + private var isClosed: Boolean = false + + override suspend fun newServer(name: String, image: Image, flavor: Flavor): Server { + check(!isClosed) { "Client is closed" } + tracer.commit(VmSubmissionEvent(name, image, flavor)) + + _events.emit( + ComputeServiceEvent.MetricsAvailable( + this@ComputeServiceImpl, + hostCount, + availableHosts.size, + ++submittedVms, + runningVms, + finishedVms, + ++queuedVms, + unscheduledVms + ) + ) + + return suspendCancellableCoroutine { cont -> + val request = LaunchRequest(createServer(name, image, flavor), cont) + queue += request + requestCycle() + } + } + + override fun close() { + isClosed = true + } + + override fun toString(): String = "ComputeClient" + } + + override fun addHost(host: Host) { + // Check if host is already known + if (host in hostToView) { + return + } + + val hv = HostView(host) + maxCores = max(maxCores, host.model.cpuCount) + maxMemory = max(maxMemory, host.model.memorySize) + hostToView[host] = hv + + if (host.state == HostState.UP) { + availableHosts += hv + } + + host.addListener(this) + } + + override fun removeHost(host: Host) { + host.removeListener(this) + } + + override fun close() { + scope.cancel() + } + + private fun createServer( + name: String, + image: Image, + flavor: Flavor + ): Server { + return ServerImpl( + uid = UUID(random.nextLong(), random.nextLong()), + name = name, + flavor = flavor, + image = image + ) + } + + private fun requestCycle() { + // Bail out in case we have already requested a new cycle. + if (scheduler.isTimerActive(Unit)) { + return + } + + // We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120). + // This is important because the slices of the VMs need to be aligned. + // We calculate here the delay until the next scheduling slot. + val delay = schedulingQuantum - (clock.millis() % schedulingQuantum) + + scheduler.startSingleTimer(Unit, delay) { + schedule() + } + } + + private fun schedule() { + while (queue.isNotEmpty()) { + val (server, cont) = queue.peekFirst() + val requiredMemory = server.flavor.memorySize + val selectedHv = allocationLogic.select(availableHosts, server) + + if (selectedHv == null || !selectedHv.host.canFit(server)) { + logger.trace { "Server $server selected for scheduling but no capacity available for it." } + + if (requiredMemory > maxMemory || server.flavor.cpuCount > maxCores) { + tracer.commit(VmSubmissionInvalidEvent(server.name)) + + _events.emit( + ComputeServiceEvent.MetricsAvailable( + this@ComputeServiceImpl, + hostCount, + availableHosts.size, + submittedVms, + runningVms, + finishedVms, + --queuedVms, + ++unscheduledVms + ) + ) + + // Remove the incoming image + queue.poll() + + logger.warn("Failed to spawn $server: does not fit [${clock.millis()}]") + continue + } else { + break + } + } + + logger.info { "[${clock.millis()}] Spawning $server on ${selectedHv.host.uid} ${selectedHv.host.name} ${selectedHv.host.model}" } + queue.poll() + + // Speculatively update the hypervisor view information to prevent other images in the queue from + // deciding on stale values. + selectedHv.numberOfActiveServers++ + selectedHv.provisionedCores += server.flavor.cpuCount + selectedHv.availableMemory -= requiredMemory // XXX Temporary hack + + scope.launch { + try { + cont.resume(ClientServer(server)) + selectedHv.host.spawn(server) + activeServers += server + + tracer.commit(VmScheduledEvent(server.name)) + _events.emit( + ComputeServiceEvent.MetricsAvailable( + this@ComputeServiceImpl, + hostCount, + availableHosts.size, + submittedVms, + ++runningVms, + finishedVms, + --queuedVms, + unscheduledVms + ) + ) + } catch (e: Throwable) { + logger.error("Failed to deploy VM", e) + + selectedHv.numberOfActiveServers-- + selectedHv.provisionedCores -= server.flavor.cpuCount + selectedHv.availableMemory += requiredMemory + } + } + } + } + + override fun onStateChanged(host: Host, newState: HostState) { + when (newState) { + HostState.UP -> { + logger.debug { "[${clock.millis()}] Host ${host.uid} state changed: $newState" } + + val hv = hostToView[host] + if (hv != null) { + // Corner case for when the hypervisor already exists + availableHosts += hv + } + + tracer.commit(HypervisorAvailableEvent(host.uid)) + + _events.emit( + ComputeServiceEvent.MetricsAvailable( + this@ComputeServiceImpl, + hostCount, + availableHosts.size, + submittedVms, + runningVms, + finishedVms, + queuedVms, + unscheduledVms + ) + ) + + // Re-schedule on the new machine + if (queue.isNotEmpty()) { + requestCycle() + } + } + HostState.DOWN -> { + logger.debug { "[${clock.millis()}] Host ${host.uid} state changed: $newState" } + + val hv = hostToView[host] ?: return + availableHosts -= hv + + tracer.commit(HypervisorUnavailableEvent(hv.uid)) + + _events.emit( + ComputeServiceEvent.MetricsAvailable( + this@ComputeServiceImpl, + hostCount, + availableHosts.size, + submittedVms, + runningVms, + finishedVms, + queuedVms, + unscheduledVms + ) + ) + + if (queue.isNotEmpty()) { + requestCycle() + } + } + } + } + + override fun onStateChanged(host: Host, server: Server, newState: ServerState) { + val serverImpl = server as ServerImpl + serverImpl.state = newState + serverImpl.watchers.forEach { it.onStateChanged(server, newState) } + + if (newState == ServerState.SHUTOFF) { + logger.info { "[${clock.millis()}] Server ${server.uid} ${server.name} ${server.flavor} finished." } + + tracer.commit(VmStoppedEvent(server.name)) + + _events.emit( + ComputeServiceEvent.MetricsAvailable( + this@ComputeServiceImpl, + hostCount, + availableHosts.size, + submittedVms, + --runningVms, + ++finishedVms, + queuedVms, + unscheduledVms + ) + ) + + activeServers -= server + val hv = hostToView[host] + if (hv != null) { + hv.provisionedCores -= server.flavor.cpuCount + hv.numberOfActiveServers-- + hv.availableMemory += server.flavor.memorySize + } else { + logger.error { "Unknown host $host" } + } + + // Try to reschedule if needed + if (queue.isNotEmpty()) { + requestCycle() + } + } + } + + public data class LaunchRequest(val server: Server, val cont: Continuation<Server>) + + private class ServerImpl( + override val uid: UUID, + override val name: String, + override val flavor: Flavor, + override val image: Image + ) : Server { + val watchers = mutableListOf<ServerWatcher>() + + override fun watch(watcher: ServerWatcher) { + watchers += watcher + } + + override fun unwatch(watcher: ServerWatcher) { + watchers -= watcher + } + + override suspend fun refresh() { + // No-op: this object is the source-of-truth + } + + override val tags: Map<String, String> = emptyMap() + + override var state: ServerState = ServerState.BUILD + } +} diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/image/EmptyImage.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt index 01f86a1b..1bdfdf1a 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/image/EmptyImage.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt @@ -20,16 +20,16 @@ * SOFTWARE. */ -package org.opendc.compute.core.image +package org.opendc.compute.service.internal -import org.opendc.core.resource.TagContainer +import org.opendc.compute.service.driver.Host import java.util.UUID -/** - * An empty boot disk [Image] that exits immediately on start. - */ -public object EmptyImage : Image { - override val uid: UUID = UUID.randomUUID() - override val name: String = "empty" - override val tags: TagContainer = emptyMap() +public class HostView(public val host: Host) { + public val uid: UUID + get() = host.uid + + public var numberOfActiveServers: Int = 0 + public var availableMemory: Long = host.model.memorySize + public var provisionedCores: Int = 0 } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/AllocationPolicy.kt index 2018b9f2..5ee4c70f 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AllocationPolicy.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/AllocationPolicy.kt @@ -20,11 +20,10 @@ * SOFTWARE. */ -package org.opendc.compute.simulator.allocation +package org.opendc.compute.service.scheduler -import org.opendc.compute.core.metal.Node -import org.opendc.compute.simulator.HypervisorView -import org.opendc.compute.simulator.SimVirtProvisioningService +import org.opendc.compute.api.Server +import org.opendc.compute.service.internal.HostView /** * A policy for selecting the [Node] an image should be deployed to, @@ -38,9 +37,9 @@ public interface AllocationPolicy { * Select the node on which the server should be scheduled. */ public fun select( - hypervisors: Set<HypervisorView>, - image: SimVirtProvisioningService.ImageView - ): HypervisorView? + hypervisors: Set<HostView>, + server: Server + ): HostView? } /** diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AvailableCoreMemoryAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/AvailableCoreMemoryAllocationPolicy.kt index 38a07b2b..ad422415 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AvailableCoreMemoryAllocationPolicy.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/AvailableCoreMemoryAllocationPolicy.kt @@ -20,9 +20,9 @@ * SOFTWARE. */ -package org.opendc.compute.simulator.allocation +package org.opendc.compute.service.scheduler -import org.opendc.compute.simulator.HypervisorView +import org.opendc.compute.service.internal.HostView /** * An [AllocationPolicy] that selects the machine with the highest/lowest amount of memory per core. @@ -31,8 +31,8 @@ import org.opendc.compute.simulator.HypervisorView */ public class AvailableCoreMemoryAllocationPolicy(private val reversed: Boolean = false) : AllocationPolicy { override fun invoke(): AllocationPolicy.Logic = object : ComparableAllocationPolicyLogic { - override val comparator: Comparator<HypervisorView> = - compareBy<HypervisorView> { -it.availableMemory / it.server.flavor.cpuCount } + override val comparator: Comparator<HostView> = + compareBy<HostView> { -it.availableMemory / it.host.model.cpuCount } .run { if (reversed) reversed() else this } } } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AvailableMemoryAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/AvailableMemoryAllocationPolicy.kt index e87abd7b..6712b8a2 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AvailableMemoryAllocationPolicy.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/AvailableMemoryAllocationPolicy.kt @@ -20,9 +20,9 @@ * SOFTWARE. */ -package org.opendc.compute.simulator.allocation +package org.opendc.compute.service.scheduler -import org.opendc.compute.simulator.HypervisorView +import org.opendc.compute.service.internal.HostView /** * Allocation policy that selects the node with the most available memory. @@ -31,7 +31,7 @@ import org.opendc.compute.simulator.HypervisorView */ public class AvailableMemoryAllocationPolicy(public val reversed: Boolean = false) : AllocationPolicy { override fun invoke(): AllocationPolicy.Logic = object : ComparableAllocationPolicyLogic { - override val comparator: Comparator<HypervisorView> = compareBy<HypervisorView> { -it.availableMemory } + override val comparator: Comparator<HostView> = compareBy<HostView> { -it.availableMemory } .run { if (reversed) reversed() else this } } } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ComparableAllocationPolicyLogic.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ComparableAllocationPolicyLogic.kt index 4470eab9..265d514d 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ComparableAllocationPolicyLogic.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ComparableAllocationPolicyLogic.kt @@ -20,10 +20,10 @@ * SOFTWARE. */ -package org.opendc.compute.simulator.allocation +package org.opendc.compute.service.scheduler -import org.opendc.compute.simulator.HypervisorView -import org.opendc.compute.simulator.SimVirtProvisioningService +import org.opendc.compute.api.Server +import org.opendc.compute.service.internal.HostView /** * The logic for an [AllocationPolicy] that uses a [Comparator] to select the appropriate node. @@ -32,18 +32,18 @@ public interface ComparableAllocationPolicyLogic : AllocationPolicy.Logic { /** * The comparator to use. */ - public val comparator: Comparator<HypervisorView> + public val comparator: Comparator<HostView> override fun select( - hypervisors: Set<HypervisorView>, - image: SimVirtProvisioningService.ImageView - ): HypervisorView? { + hypervisors: Set<HostView>, + server: Server + ): HostView? { return hypervisors.asSequence() .filter { hv -> - val fitsMemory = hv.availableMemory >= (image.flavor.memorySize) - val fitsCpu = hv.server.flavor.cpuCount >= image.flavor.cpuCount + val fitsMemory = hv.availableMemory >= (server.flavor.memorySize) + val fitsCpu = hv.host.model.cpuCount >= server.flavor.cpuCount fitsMemory && fitsCpu } - .minWithOrNull(comparator.thenBy { it.server.uid }) + .minWithOrNull(comparator.thenBy { it.host.uid }) } } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/NumberOfActiveServersAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/NumberOfActiveServersAllocationPolicy.kt index 5e2b895c..29eba782 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/NumberOfActiveServersAllocationPolicy.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/NumberOfActiveServersAllocationPolicy.kt @@ -20,9 +20,9 @@ * SOFTWARE. */ -package org.opendc.compute.simulator.allocation +package org.opendc.compute.service.scheduler -import org.opendc.compute.simulator.HypervisorView +import org.opendc.compute.service.internal.HostView /** * Allocation policy that selects the node with the least amount of active servers. @@ -31,7 +31,7 @@ import org.opendc.compute.simulator.HypervisorView */ public class NumberOfActiveServersAllocationPolicy(public val reversed: Boolean = false) : AllocationPolicy { override fun invoke(): AllocationPolicy.Logic = object : ComparableAllocationPolicyLogic { - override val comparator: Comparator<HypervisorView> = compareBy<HypervisorView> { it.numberOfActiveServers } + override val comparator: Comparator<HostView> = compareBy<HostView> { it.numberOfActiveServers } .run { if (reversed) reversed() else this } } } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ProvisionedCoresAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ProvisionedCoresAllocationPolicy.kt index 4344d979..4c196953 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ProvisionedCoresAllocationPolicy.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ProvisionedCoresAllocationPolicy.kt @@ -20,9 +20,9 @@ * SOFTWARE. */ -package org.opendc.compute.simulator.allocation +package org.opendc.compute.service.scheduler -import org.opendc.compute.simulator.HypervisorView +import org.opendc.compute.service.internal.HostView /** * An [AllocationPolicy] that takes into account the number of vCPUs that have been provisioned on this machine @@ -33,8 +33,8 @@ import org.opendc.compute.simulator.HypervisorView */ public class ProvisionedCoresAllocationPolicy(private val reversed: Boolean = false) : AllocationPolicy { override fun invoke(): AllocationPolicy.Logic = object : ComparableAllocationPolicyLogic { - override val comparator: Comparator<HypervisorView> = - compareBy<HypervisorView> { it.provisionedCores / it.server.flavor.cpuCount } + override val comparator: Comparator<HostView> = + compareBy<HostView> { it.provisionedCores / it.host.model.cpuCount } .run { if (reversed) reversed() else this } } } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/RandomAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/RandomAllocationPolicy.kt index ac34f410..3facb182 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/RandomAllocationPolicy.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/RandomAllocationPolicy.kt @@ -20,10 +20,10 @@ * SOFTWARE. */ -package org.opendc.compute.simulator.allocation +package org.opendc.compute.service.scheduler -import org.opendc.compute.simulator.HypervisorView -import org.opendc.compute.simulator.SimVirtProvisioningService +import org.opendc.compute.api.Server +import org.opendc.compute.service.internal.HostView import kotlin.random.Random /** @@ -33,13 +33,13 @@ public class RandomAllocationPolicy(private val random: Random = Random(0)) : Al @OptIn(ExperimentalStdlibApi::class) override fun invoke(): AllocationPolicy.Logic = object : AllocationPolicy.Logic { override fun select( - hypervisors: Set<HypervisorView>, - image: SimVirtProvisioningService.ImageView - ): HypervisorView? { + hypervisors: Set<HostView>, + server: Server + ): HostView? { return hypervisors.asIterable() .filter { hv -> - val fitsMemory = hv.availableMemory >= (image.image.tags["required-memory"] as Long) - val fitsCpu = hv.server.flavor.cpuCount >= image.flavor.cpuCount + val fitsMemory = hv.availableMemory >= (server.image.tags["required-memory"] as Long) + val fitsCpu = hv.host.model.cpuCount >= server.flavor.cpuCount fitsMemory && fitsCpu } .randomOrNull(random) diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ReplayAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ReplayAllocationPolicy.kt index 5312f4da..ed1dc662 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ReplayAllocationPolicy.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ReplayAllocationPolicy.kt @@ -23,8 +23,9 @@ package org.opendc.compute.simulator.allocation import mu.KotlinLogging -import org.opendc.compute.simulator.HypervisorView -import org.opendc.compute.simulator.SimVirtProvisioningService +import org.opendc.compute.api.Server +import org.opendc.compute.service.internal.HostView +import org.opendc.compute.service.scheduler.AllocationPolicy private val logger = KotlinLogging.logger {} @@ -37,15 +38,15 @@ private val logger = KotlinLogging.logger {} public class ReplayAllocationPolicy(private val vmPlacements: Map<String, String>) : AllocationPolicy { override fun invoke(): AllocationPolicy.Logic = object : AllocationPolicy.Logic { override fun select( - hypervisors: Set<HypervisorView>, - image: SimVirtProvisioningService.ImageView - ): HypervisorView? { - val clusterName = vmPlacements[image.name] - ?: throw IllegalStateException("Could not find placement data in VM placement file for VM ${image.name}") - val machinesInCluster = hypervisors.filter { it.server.name.contains(clusterName) } + hypervisors: Set<HostView>, + server: Server + ): HostView? { + val clusterName = vmPlacements[server.name] + ?: throw IllegalStateException("Could not find placement data in VM placement file for VM ${server.name}") + val machinesInCluster = hypervisors.filter { it.host.name.contains(clusterName) } if (machinesInCluster.isEmpty()) { - logger.info { "Could not find any machines belonging to cluster $clusterName for image ${image.name}, assigning randomly." } + logger.info { "Could not find any machines belonging to cluster $clusterName for image ${server.name}, assigning randomly." } return hypervisors.maxByOrNull { it.availableMemory } } diff --git a/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts b/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts index f52d0f97..d7d5f002 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts +++ b/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts @@ -30,7 +30,8 @@ plugins { dependencies { api(platform(project(":opendc-platform"))) - api(project(":opendc-compute:opendc-compute-core")) + api(project(":opendc-compute:opendc-compute-service")) + api(project(":opendc-metal")) api(project(":opendc-simulator:opendc-simulator-compute")) api(project(":opendc-simulator:opendc-simulator-failures")) implementation(project(":opendc-utils")) diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ComputeSimExecutionContext.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ComputeSimExecutionContext.kt deleted file mode 100644 index 153a86b3..00000000 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ComputeSimExecutionContext.kt +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.simulator - -import org.opendc.compute.core.Server -import org.opendc.simulator.compute.SimExecutionContext - -/** - * Extended [SimExecutionContext] in which workloads within the OpenDC Compute module run. - */ -public interface ComputeSimExecutionContext : SimExecutionContext { - /** - * The server on which the image runs. - */ - public val server: Server -} diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/HypervisorView.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/HypervisorView.kt deleted file mode 100644 index 1a79523e..00000000 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/HypervisorView.kt +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.simulator - -import org.opendc.compute.core.Server -import org.opendc.compute.core.virt.driver.VirtDriver -import java.util.UUID - -public class HypervisorView( - public val uid: UUID, - public var server: Server, - public var numberOfActiveServers: Int, - public var availableMemory: Long, - public var provisionedCores: Int -) { - public lateinit var driver: VirtDriver -} diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimBareMetalDriver.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimBareMetalDriver.kt index 1e459e6f..2405a8f9 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimBareMetalDriver.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimBareMetalDriver.kt @@ -24,31 +24,23 @@ package org.opendc.compute.simulator import kotlinx.coroutines.* import kotlinx.coroutines.flow.Flow -import org.opendc.compute.core.Flavor -import org.opendc.compute.core.Server -import org.opendc.compute.core.ServerEvent -import org.opendc.compute.core.ServerState -import org.opendc.compute.core.image.EmptyImage -import org.opendc.compute.core.image.Image -import org.opendc.compute.core.metal.Node -import org.opendc.compute.core.metal.NodeEvent -import org.opendc.compute.core.metal.NodeState -import org.opendc.compute.core.metal.driver.BareMetalDriver +import org.opendc.compute.api.Flavor +import org.opendc.compute.api.Image import org.opendc.compute.simulator.power.api.CpuPowerModel import org.opendc.compute.simulator.power.api.Powerable import org.opendc.compute.simulator.power.models.ConstantPowerModel -import org.opendc.core.services.ServiceRegistry +import org.opendc.metal.Node +import org.opendc.metal.NodeEvent +import org.opendc.metal.NodeState +import org.opendc.metal.driver.BareMetalDriver import org.opendc.simulator.compute.SimBareMetalMachine -import org.opendc.simulator.compute.SimExecutionContext import org.opendc.simulator.compute.SimMachineModel -import org.opendc.simulator.compute.workload.SimResourceCommand import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.failures.FailureDomain import org.opendc.utils.flow.EventFlow import org.opendc.utils.flow.StateFlow import java.time.Clock import java.util.UUID -import kotlin.random.Random /** * A basic implementation of the [BareMetalDriver] that simulates an [Image] running on a bare-metal machine. @@ -88,7 +80,7 @@ public class SimBareMetalDriver( * The machine state. */ private val nodeState = - StateFlow(Node(uid, name, metadata + ("driver" to this), NodeState.SHUTOFF, EmptyImage, null, events)) + StateFlow(Node(uid, name, metadata + ("driver" to this), NodeState.SHUTOFF, flavor, Image.EMPTY, events)) /** * The [SimBareMetalMachine] we use to run the workload. @@ -103,20 +95,10 @@ public class SimBareMetalDriver( override val powerDraw: Flow<Double> = cpuPowerModel.getPowerDraw(this) /** - * The internal random instance. - */ - private val random = Random(uid.leastSignificantBits xor uid.mostSignificantBits) - - /** * The [Job] that runs the simulated workload. */ private var job: Job? = null - /** - * The event stream to publish to for the server. - */ - private var serverEvents: EventFlow<ServerEvent>? = null - override suspend fun init(): Node { return nodeState.value } @@ -127,51 +109,13 @@ public class SimBareMetalDriver( return node } - val events = EventFlow<ServerEvent>() - serverEvents = events - val server = Server( - UUID(random.nextLong(), random.nextLong()), - node.name, - emptyMap(), - flavor, - node.image, - ServerState.BUILD, - ServiceRegistry().put(BareMetalDriver, this@SimBareMetalDriver), - events - ) - - val delegate = (node.image as SimWorkloadImage).workload - // Wrap the workload to pass in a ComputeSimExecutionContext - val workload = object : SimWorkload { - lateinit var wrappedCtx: ComputeSimExecutionContext - - override fun onStart(ctx: SimExecutionContext) { - wrappedCtx = object : ComputeSimExecutionContext, SimExecutionContext by ctx { - override val server: Server - get() = nodeState.value.server!! - - override fun toString(): String = "WrappedSimExecutionContext" - } - - delegate.onStart(wrappedCtx) - } - - override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand { - return delegate.onStart(wrappedCtx, cpu) - } - - override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand { - return delegate.onNext(wrappedCtx, cpu, remainingWork) - } - - override fun toString(): String = "SimWorkloadWrapper(delegate=$delegate)" - } + val workload = node.image.tags["workload"] as SimWorkload job = coroutineScope.launch { delay(1) // TODO Introduce boot time initMachine() try { - machine.run(workload) + machine.run(workload, mapOf("driver" to this@SimBareMetalDriver, "node" to node)) exitMachine(null) } catch (_: CancellationException) { // Ignored @@ -180,31 +124,21 @@ public class SimBareMetalDriver( } } - setNode(node.copy(state = NodeState.BOOT, server = server)) + setNode(node.copy(state = NodeState.BOOT)) return nodeState.value } private fun initMachine() { - val server = nodeState.value.server?.copy(state = ServerState.ACTIVE) - setNode(nodeState.value.copy(state = NodeState.ACTIVE, server = server)) + setNode(nodeState.value.copy(state = NodeState.ACTIVE)) } private fun exitMachine(cause: Throwable?) { - val newServerState = - if (cause == null) - ServerState.SHUTOFF - else - ServerState.ERROR val newNodeState = if (cause == null) - nodeState.value.state + NodeState.SHUTOFF else NodeState.ERROR - val server = nodeState.value.server?.copy(state = newServerState) - setNode(nodeState.value.copy(state = newNodeState, server = server)) - - serverEvents?.close() - serverEvents = null + setNode(nodeState.value.copy(state = newNodeState)) } override suspend fun stop(): Node { @@ -214,7 +148,7 @@ public class SimBareMetalDriver( } job?.cancelAndJoin() - setNode(node.copy(state = NodeState.SHUTOFF, server = null)) + setNode(node.copy(state = NodeState.SHUTOFF)) return node } @@ -236,13 +170,6 @@ public class SimBareMetalDriver( events.emit(NodeEvent.StateChanged(value, field.state)) } - val oldServer = field.server - val newServer = value.server - - if (oldServer != null && newServer != null && oldServer.state != newServer.state) { - serverEvents?.emit(ServerEvent.StateChanged(newServer, oldServer.state)) - } - nodeState.value = value } @@ -250,13 +177,11 @@ public class SimBareMetalDriver( get() = coroutineScope override suspend fun fail() { - val server = nodeState.value.server?.copy(state = ServerState.ERROR) - setNode(nodeState.value.copy(state = NodeState.ERROR, server = server)) + setNode(nodeState.value.copy(state = NodeState.ERROR)) } override suspend fun recover() { - val server = nodeState.value.server?.copy(state = ServerState.ACTIVE) - setNode(nodeState.value.copy(state = NodeState.ACTIVE, server = server)) + setNode(nodeState.value.copy(state = NodeState.ACTIVE)) } override fun toString(): String = "SimBareMetalDriver(node = ${nodeState.value.uid})" diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt new file mode 100644 index 00000000..fd547d3d --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -0,0 +1,303 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.Flow +import mu.KotlinLogging +import org.opendc.compute.api.Flavor +import org.opendc.compute.api.Server +import org.opendc.compute.api.ServerState +import org.opendc.compute.service.driver.* +import org.opendc.metal.Node +import org.opendc.simulator.compute.* +import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL +import org.opendc.simulator.compute.interference.PerformanceInterferenceModel +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.workload.SimResourceCommand +import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.utils.flow.EventFlow +import java.util.* +import kotlin.coroutines.resume + +/** + * A [Host] that is simulates virtual machines on a physical machine using [SimHypervisor]. + */ +public class SimHost( + public val node: Node, + private val coroutineScope: CoroutineScope, + hypervisor: SimHypervisorProvider +) : Host, SimWorkload { + /** + * The logger instance of this server. + */ + private val logger = KotlinLogging.logger {} + + /** + * The execution context in which the [Host] runs. + */ + private lateinit var ctx: SimExecutionContext + + override val events: Flow<HostEvent> + get() = _events + internal val _events = EventFlow<HostEvent>() + + /** + * The event listeners registered with this host. + */ + private val listeners = mutableListOf<HostListener>() + + /** + * Current total memory use of the images on this hypervisor. + */ + private var availableMemory: Long = 0 + + /** + * The hypervisor to run multiple workloads. + */ + private val hypervisor = hypervisor.create( + object : SimHypervisor.Listener { + override fun onSliceFinish( + hypervisor: SimHypervisor, + requestedWork: Long, + grantedWork: Long, + overcommittedWork: Long, + interferedWork: Long, + cpuUsage: Double, + cpuDemand: Double + ) { + _events.emit( + HostEvent.SliceFinished( + this@SimHost, + requestedWork, + grantedWork, + overcommittedWork, + interferedWork, + cpuUsage, + cpuDemand, + guests.size + ) + ) + } + } + ) + + /** + * The virtual machines running on the hypervisor. + */ + private val guests = HashMap<Server, Guest>() + + override val uid: UUID + get() = node.uid + + override val name: String + get() = node.name + + override val model: HostModel + get() = HostModel(node.flavor.cpuCount, node.flavor.memorySize) + + override val state: HostState + get() = _state + private var _state: HostState = HostState.UP + set(value) { + listeners.forEach { it.onStateChanged(this, value) } + field = value + } + + override fun canFit(server: Server): Boolean { + val sufficientMemory = availableMemory > server.flavor.memorySize + val enoughCpus = ctx.machine.cpus.size >= server.flavor.cpuCount + val canFit = hypervisor.canFit(server.flavor.toMachineModel()) + + return sufficientMemory && enoughCpus && canFit + } + + override suspend fun spawn(server: Server, start: Boolean) { + // Return if the server already exists on this host + if (server in this) { + return + } + + require(canFit(server)) { "Server does not fit" } + val guest = Guest(server, hypervisor.createMachine(server.flavor.toMachineModel())) + guests[server] = guest + + if (start) { + guest.start() + } + + _events.emit(HostEvent.VmsUpdated(this, guests.count { it.value.state == ServerState.ACTIVE }, availableMemory)) + } + + override fun contains(server: Server): Boolean { + return server in guests + } + + override suspend fun start(server: Server) { + val guest = requireNotNull(guests[server]) { "Unknown server ${server.uid} at host $uid" } + guest.start() + } + + override suspend fun stop(server: Server) { + val guest = requireNotNull(guests[server]) { "Unknown server ${server.uid} at host $uid" } + guest.stop() + } + + override suspend fun terminate(server: Server) { + val guest = guests.remove(server) ?: return + guest.terminate() + } + + override fun addListener(listener: HostListener) { + listeners.add(listener) + } + + override fun removeListener(listener: HostListener) { + listeners.remove(listener) + } + + /** + * Convert flavor to machine model. + */ + private fun Flavor.toMachineModel(): SimMachineModel { + val originalCpu = ctx.machine.cpus[0] + val processingNode = originalCpu.node.copy(coreCount = cpuCount) + val processingUnits = (0 until cpuCount).map { originalCpu.copy(id = it, node = processingNode) } + val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize)) + + return SimMachineModel(processingUnits, memoryUnits) + } + + private fun onGuestStart(vm: Guest) { + guests.forEach { _, guest -> + if (guest.state == ServerState.ACTIVE) { + vm.performanceInterferenceModel?.onStart(vm.server.image.name) + } + } + + listeners.forEach { it.onStateChanged(this, vm.server, vm.state) } + } + + private fun onGuestStop(vm: Guest) { + guests.forEach { _, guest -> + if (guest.state == ServerState.ACTIVE) { + vm.performanceInterferenceModel?.onStop(vm.server.image.name) + } + } + + listeners.forEach { it.onStateChanged(this, vm.server, vm.state) } + + _events.emit(HostEvent.VmsUpdated(this@SimHost, guests.count { it.value.state == ServerState.ACTIVE }, availableMemory)) + } + + /** + * A virtual machine instance that the driver manages. + */ + private inner class Guest(val server: Server, val machine: SimMachine) { + val performanceInterferenceModel: PerformanceInterferenceModel? = server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? + + var state: ServerState = ServerState.SHUTOFF + + suspend fun start() { + when (state) { + ServerState.SHUTOFF -> { + logger.info { "User requested to start server ${server.uid}" } + launch() + } + ServerState.ACTIVE -> return + else -> assert(false) { "Invalid state transition" } + } + } + + suspend fun stop() { + when (state) { + ServerState.ACTIVE, ServerState.ERROR -> { + val job = job ?: throw IllegalStateException("Server should be active") + job.cancel() + job.join() + } + ServerState.SHUTOFF -> return + else -> assert(false) { "Invalid state transition" } + } + } + + suspend fun terminate() { + stop() + } + + private var job: Job? = null + + private suspend fun launch() = suspendCancellableCoroutine<Unit> { cont -> + assert(job == null) { "Concurrent job running" } + val workload = server.image.tags["workload"] as SimWorkload + + val job = coroutineScope.launch { + delay(1) // TODO Introduce boot time + init() + cont.resume(Unit) + try { + machine.run(workload, mapOf("driver" to this@SimHost, "server" to server)) + exit(null) + } catch (cause: Throwable) { + exit(cause) + } finally { + machine.close() + } + } + this.job = job + job.invokeOnCompletion { + this.job = null + } + } + + private fun init() { + state = ServerState.ACTIVE + onGuestStart(this) + } + + private fun exit(cause: Throwable?) { + state = + if (cause == null) + ServerState.SHUTOFF + else + ServerState.ERROR + + availableMemory += server.flavor.memorySize + onGuestStop(this) + } + } + + override fun onStart(ctx: SimExecutionContext) { + this.ctx = ctx + this.availableMemory = ctx.machine.memory.map { it.size }.sum() + this.hypervisor.onStart(ctx) + } + + override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand { + return hypervisor.onStart(ctx, cpu) + } + + override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand { + return hypervisor.onNext(ctx, cpu, remainingWork) + } +} diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHostProvisioner.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHostProvisioner.kt new file mode 100644 index 00000000..bb03777b --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHostProvisioner.kt @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator + +import kotlinx.coroutines.* +import org.opendc.compute.api.Image +import org.opendc.compute.service.driver.Host +import org.opendc.metal.Node +import org.opendc.metal.service.ProvisioningService +import org.opendc.simulator.compute.SimHypervisorProvider +import kotlin.coroutines.CoroutineContext + +/** + * A helper class to provision [SimHost]s on top of bare-metal machines using the [ProvisioningService]. + * + * @param context The [CoroutineContext] to use. + * @param metal The [ProvisioningService] to use. + * @param hypervisor The type of hypervisor to use. + */ +public class SimHostProvisioner( + private val context: CoroutineContext, + private val metal: ProvisioningService, + private val hypervisor: SimHypervisorProvider +) : AutoCloseable { + /** + * The [CoroutineScope] of the service bounded by the lifecycle of the service. + */ + private val scope = CoroutineScope(context) + + /** + * Provision all machines with a host. + */ + public suspend fun provisionAll(): List<Host> = coroutineScope { + metal.nodes().map { node -> async { provision(node) } }.awaitAll() + } + + /** + * Provision the specified [Node]. + */ + public suspend fun provision(node: Node): Host = coroutineScope { + val host = SimHost(node, scope, hypervisor) + metal.deploy(node, Image(node.uid, node.name, mapOf("workload" to host))) + host + } + + override fun close() { + scope.cancel() + } +} diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt deleted file mode 100644 index d7a8a8b2..00000000 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt +++ /dev/null @@ -1,253 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.simulator - -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.launch -import org.opendc.compute.core.* -import org.opendc.compute.core.image.Image -import org.opendc.compute.core.virt.HypervisorEvent -import org.opendc.compute.core.virt.driver.InsufficientMemoryOnServerException -import org.opendc.compute.core.virt.driver.VirtDriver -import org.opendc.core.services.ServiceRegistry -import org.opendc.simulator.compute.* -import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel -import org.opendc.simulator.compute.model.MemoryUnit -import org.opendc.simulator.compute.workload.SimResourceCommand -import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.utils.flow.EventFlow -import java.util.* - -/** - * A [VirtDriver] that is simulates virtual machines on a physical machine using [SimHypervisor]. - */ -public class SimVirtDriver(private val coroutineScope: CoroutineScope, hypervisor: SimHypervisorProvider) : VirtDriver, SimWorkload { - /** - * The execution context in which the [VirtDriver] runs. - */ - private lateinit var ctx: ComputeSimExecutionContext - - /** - * The server hosting this hypervisor. - */ - public val server: Server - get() = ctx.server - - /** - * The [EventFlow] to emit the events. - */ - internal val eventFlow = EventFlow<HypervisorEvent>() - - override val events: Flow<HypervisorEvent> = eventFlow - - /** - * Current total memory use of the images on this hypervisor. - */ - private var availableMemory: Long = 0 - - /** - * The hypervisor to run multiple workloads. - */ - private val hypervisor = hypervisor.create( - object : SimHypervisor.Listener { - override fun onSliceFinish( - hypervisor: SimHypervisor, - requestedWork: Long, - grantedWork: Long, - overcommittedWork: Long, - interferedWork: Long, - cpuUsage: Double, - cpuDemand: Double - ) { - eventFlow.emit( - HypervisorEvent.SliceFinished( - this@SimVirtDriver, - requestedWork, - grantedWork, - overcommittedWork, - interferedWork, - cpuUsage, - cpuDemand, - vms.size, - ctx.server - ) - ) - } - } - ) - - /** - * The virtual machines running on the hypervisor. - */ - private val vms = HashSet<VirtualMachine>() - - override fun canFit(flavor: Flavor): Boolean { - val sufficientMemory = availableMemory > flavor.memorySize - val enoughCpus = ctx.machine.cpus.size >= flavor.cpuCount - val canFit = hypervisor.canFit(flavor.toMachineModel()) - - return sufficientMemory && enoughCpus && canFit - } - - override suspend fun spawn(name: String, image: Image, flavor: Flavor): Server { - val requiredMemory = flavor.memorySize - if (availableMemory - requiredMemory < 0) { - throw InsufficientMemoryOnServerException() - } - require(flavor.cpuCount <= ctx.machine.cpus.size) { "Machine does not fit" } - - val events = EventFlow<ServerEvent>() - val server = Server( - UUID.randomUUID(), - name, - emptyMap(), - flavor, - image, - ServerState.BUILD, - ServiceRegistry(), - events - ) - availableMemory -= requiredMemory - - val vm = VirtualMachine(server, events, hypervisor.createMachine(flavor.toMachineModel())) - vms.add(vm) - vmStarted(vm) - eventFlow.emit(HypervisorEvent.VmsUpdated(this, vms.size, availableMemory)) - return server - } - - /** - * Convert flavor to machine model. - */ - private fun Flavor.toMachineModel(): SimMachineModel { - val originalCpu = ctx.machine.cpus[0] - val processingNode = originalCpu.node.copy(coreCount = cpuCount) - val processingUnits = (0 until cpuCount).map { originalCpu.copy(id = it, node = processingNode) } - val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize)) - - return SimMachineModel(processingUnits, memoryUnits) - } - - private fun vmStarted(vm: VirtualMachine) { - vms.forEach { it -> - vm.performanceInterferenceModel?.onStart(it.server.image.name) - } - } - - private fun vmStopped(vm: VirtualMachine) { - vms.forEach { it -> - vm.performanceInterferenceModel?.onStop(it.server.image.name) - } - } - - /** - * A virtual machine instance that the driver manages. - */ - private inner class VirtualMachine(server: Server, val events: EventFlow<ServerEvent>, val machine: SimMachine) { - val performanceInterferenceModel: PerformanceInterferenceModel? = server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? - - val job = coroutineScope.launch { - val delegate = (server.image as SimWorkloadImage).workload - // Wrap the workload to pass in a ComputeSimExecutionContext - val workload = object : SimWorkload { - lateinit var wrappedCtx: ComputeSimExecutionContext - - override fun onStart(ctx: SimExecutionContext) { - wrappedCtx = object : ComputeSimExecutionContext, SimExecutionContext by ctx { - override val server: Server - get() = server - - override fun toString(): String = "WrappedSimExecutionContext" - } - - delegate.onStart(wrappedCtx) - } - - override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand { - return delegate.onStart(wrappedCtx, cpu) - } - - override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand { - return delegate.onNext(wrappedCtx, cpu, remainingWork) - } - - override fun toString(): String = "SimWorkloadWrapper(delegate=$delegate)" - } - - delay(1) // TODO Introduce boot time - init() - try { - machine.run(workload) - exit(null) - } catch (cause: Throwable) { - exit(cause) - } finally { - machine.close() - } - } - - var server: Server = server - set(value) { - if (field.state != value.state) { - events.emit(ServerEvent.StateChanged(value, field.state)) - } - - field = value - } - - private fun init() { - server = server.copy(state = ServerState.ACTIVE) - } - - private fun exit(cause: Throwable?) { - val serverState = - if (cause == null) - ServerState.SHUTOFF - else - ServerState.ERROR - server = server.copy(state = serverState) - availableMemory += server.flavor.memorySize - vms.remove(this) - vmStopped(this) - eventFlow.emit(HypervisorEvent.VmsUpdated(this@SimVirtDriver, vms.size, availableMemory)) - events.close() - } - } - - override fun onStart(ctx: SimExecutionContext) { - this.ctx = ctx as ComputeSimExecutionContext - this.availableMemory = ctx.machine.memory.map { it.size }.sum() - this.hypervisor.onStart(ctx) - } - - override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand { - return hypervisor.onStart(ctx, cpu) - } - - override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand { - return hypervisor.onNext(ctx, cpu, remainingWork) - } -} diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt deleted file mode 100644 index defea888..00000000 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt +++ /dev/null @@ -1,418 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.simulator - -import kotlinx.coroutines.* -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach -import mu.KotlinLogging -import org.opendc.compute.core.Flavor -import org.opendc.compute.core.Server -import org.opendc.compute.core.ServerEvent -import org.opendc.compute.core.ServerState -import org.opendc.compute.core.image.Image -import org.opendc.compute.core.metal.service.ProvisioningService -import org.opendc.compute.core.virt.HypervisorEvent -import org.opendc.compute.core.virt.driver.InsufficientMemoryOnServerException -import org.opendc.compute.core.virt.driver.VirtDriver -import org.opendc.compute.core.virt.service.VirtProvisioningEvent -import org.opendc.compute.core.virt.service.VirtProvisioningService -import org.opendc.compute.core.virt.service.events.* -import org.opendc.compute.simulator.allocation.AllocationPolicy -import org.opendc.simulator.compute.SimHypervisorProvider -import org.opendc.trace.core.EventTracer -import org.opendc.utils.TimerScheduler -import org.opendc.utils.flow.EventFlow -import java.time.Clock -import java.util.* -import kotlin.coroutines.Continuation -import kotlin.coroutines.resume -import kotlin.math.max - -@OptIn(ExperimentalCoroutinesApi::class) -public class SimVirtProvisioningService( - private val coroutineScope: CoroutineScope, - private val clock: Clock, - private val provisioningService: ProvisioningService, - public val allocationPolicy: AllocationPolicy, - private val tracer: EventTracer, - private val hypervisor: SimHypervisorProvider, - private val schedulingQuantum: Long = 300000, // 5 minutes in milliseconds -) : VirtProvisioningService { - /** - * The logger instance to use. - */ - private val logger = KotlinLogging.logger {} - - /** - * The hypervisors that have been launched by the service. - */ - private val hypervisors: MutableMap<Server, HypervisorView> = mutableMapOf() - - /** - * The available hypervisors. - */ - private val availableHypervisors: MutableSet<HypervisorView> = mutableSetOf() - - /** - * The incoming images to be processed by the provisioner. - */ - private val incomingImages: Deque<ImageView> = ArrayDeque() - - /** - * The active images in the system. - */ - private val activeImages: MutableSet<ImageView> = mutableSetOf() - - public var submittedVms: Int = 0 - public var queuedVms: Int = 0 - public var runningVms: Int = 0 - public var finishedVms: Int = 0 - public var unscheduledVms: Int = 0 - - private var maxCores = 0 - private var maxMemory = 0L - - /** - * The allocation logic to use. - */ - private val allocationLogic = allocationPolicy() - - /** - * The [EventFlow] to emit the events. - */ - internal val eventFlow = EventFlow<VirtProvisioningEvent>() - - override val events: Flow<VirtProvisioningEvent> = eventFlow - - /** - * The [TimerScheduler] to use for scheduling the scheduler cycles. - */ - private var scheduler: TimerScheduler<Unit> = TimerScheduler(coroutineScope, clock) - - init { - coroutineScope.launch { - val provisionedNodes = provisioningService.nodes() - provisionedNodes.forEach { node -> - val workload = SimVirtDriver(coroutineScope, hypervisor) - val hypervisorImage = SimWorkloadImage(UUID.randomUUID(), "vmm", emptyMap(), workload) - launch { - var init = false - val deployedNode = provisioningService.deploy(node, hypervisorImage) - val server = deployedNode.server!! - server.events.onEach { event -> - when (event) { - is ServerEvent.StateChanged -> { - if (!init) { - init = true - } - stateChanged(event.server) - } - } - }.launchIn(this) - - delay(1) - onHypervisorAvailable(server, workload) - } - } - } - } - - override suspend fun drivers(): Set<VirtDriver> { - return availableHypervisors.map { it.driver }.toSet() - } - - override val hostCount: Int = hypervisors.size - - override suspend fun deploy( - name: String, - image: Image, - flavor: Flavor - ): Server { - tracer.commit(VmSubmissionEvent(name, image, flavor)) - - eventFlow.emit( - VirtProvisioningEvent.MetricsAvailable( - this@SimVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, - ++submittedVms, - runningVms, - finishedVms, - ++queuedVms, - unscheduledVms - ) - ) - - return suspendCancellableCoroutine<Server> { cont -> - val vmInstance = ImageView(name, image, flavor, cont) - incomingImages += vmInstance - requestCycle() - } - } - - override suspend fun terminate() { - val provisionedNodes = provisioningService.nodes() - provisionedNodes.forEach { node -> provisioningService.stop(node) } - } - - private fun requestCycle() { - // Bail out in case we have already requested a new cycle. - if (scheduler.isTimerActive(Unit)) { - return - } - - // We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120). - // This is important because the slices of the VMs need to be aligned. - // We calculate here the delay until the next scheduling slot. - val delay = schedulingQuantum - (clock.millis() % schedulingQuantum) - - scheduler.startSingleTimer(Unit, delay) { - coroutineScope.launch { schedule() } - } - } - - private suspend fun schedule() { - while (incomingImages.isNotEmpty()) { - val imageInstance = incomingImages.peekFirst() - val requiredMemory = imageInstance.flavor.memorySize - val selectedHv = allocationLogic.select(availableHypervisors, imageInstance) - - if (selectedHv == null || !selectedHv.driver.canFit(imageInstance.flavor)) { - logger.trace { "Image ${imageInstance.image} selected for scheduling but no capacity available for it." } - - if (requiredMemory > maxMemory || imageInstance.flavor.cpuCount > maxCores) { - tracer.commit(VmSubmissionInvalidEvent(imageInstance.name)) - - eventFlow.emit( - VirtProvisioningEvent.MetricsAvailable( - this@SimVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, - submittedVms, - runningVms, - finishedVms, - --queuedVms, - ++unscheduledVms - ) - ) - - // Remove the incoming image - incomingImages.poll() - - logger.warn("Failed to spawn ${imageInstance.image}: does not fit [${clock.millis()}]") - continue - } else { - break - } - } - - try { - logger.info { "[${clock.millis()}] Spawning ${imageInstance.image} on ${selectedHv.server.uid} ${selectedHv.server.name} ${selectedHv.server.flavor}" } - incomingImages.poll() - - // Speculatively update the hypervisor view information to prevent other images in the queue from - // deciding on stale values. - selectedHv.numberOfActiveServers++ - selectedHv.provisionedCores += imageInstance.flavor.cpuCount - selectedHv.availableMemory -= requiredMemory // XXX Temporary hack - - val server = selectedHv.driver.spawn( - imageInstance.name, - imageInstance.image, - imageInstance.flavor - ) - imageInstance.server = server - imageInstance.continuation.resume(server) - - tracer.commit(VmScheduledEvent(imageInstance.name)) - - eventFlow.emit( - VirtProvisioningEvent.MetricsAvailable( - this@SimVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, - submittedVms, - ++runningVms, - finishedVms, - --queuedVms, - unscheduledVms - ) - ) - activeImages += imageInstance - - server.events - .onEach { event -> - when (event) { - is ServerEvent.StateChanged -> { - if (event.server.state == ServerState.SHUTOFF) { - logger.info { "[${clock.millis()}] Server ${event.server.uid} ${event.server.name} ${event.server.flavor} finished." } - - tracer.commit(VmStoppedEvent(event.server.name)) - - eventFlow.emit( - VirtProvisioningEvent.MetricsAvailable( - this@SimVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, - submittedVms, - --runningVms, - ++finishedVms, - queuedVms, - unscheduledVms - ) - ) - - activeImages -= imageInstance - selectedHv.provisionedCores -= server.flavor.cpuCount - - // Try to reschedule if needed - if (incomingImages.isNotEmpty()) { - requestCycle() - } - } - } - } - } - .launchIn(coroutineScope) - } catch (e: InsufficientMemoryOnServerException) { - logger.error("Failed to deploy VM", e) - - selectedHv.numberOfActiveServers-- - selectedHv.provisionedCores -= imageInstance.flavor.cpuCount - selectedHv.availableMemory += requiredMemory - } catch (e: Throwable) { - logger.error("Failed to deploy VM", e) - } - } - } - - private fun stateChanged(server: Server) { - when (server.state) { - ServerState.ACTIVE -> { - logger.debug { "[${clock.millis()}] Server ${server.uid} available: ${server.state}" } - - if (server in hypervisors) { - // Corner case for when the hypervisor already exists - availableHypervisors += hypervisors.getValue(server) - } else { - val hv = HypervisorView( - server.uid, - server, - 0, - server.flavor.memorySize, - 0 - ) - maxCores = max(maxCores, server.flavor.cpuCount) - maxMemory = max(maxMemory, server.flavor.memorySize) - hypervisors[server] = hv - } - - tracer.commit(HypervisorAvailableEvent(server.uid)) - - eventFlow.emit( - VirtProvisioningEvent.MetricsAvailable( - this@SimVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, - submittedVms, - runningVms, - finishedVms, - queuedVms, - unscheduledVms - ) - ) - - // Re-schedule on the new machine - if (incomingImages.isNotEmpty()) { - requestCycle() - } - } - ServerState.SHUTOFF, ServerState.ERROR -> { - logger.debug { "[${clock.millis()}] Server ${server.uid} unavailable: ${server.state}" } - val hv = hypervisors[server] ?: return - availableHypervisors -= hv - - tracer.commit(HypervisorUnavailableEvent(hv.uid)) - - eventFlow.emit( - VirtProvisioningEvent.MetricsAvailable( - this@SimVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, - submittedVms, - runningVms, - finishedVms, - queuedVms, - unscheduledVms - ) - ) - - if (incomingImages.isNotEmpty()) { - requestCycle() - } - } - else -> throw IllegalStateException() - } - } - - private fun onHypervisorAvailable(server: Server, hypervisor: SimVirtDriver) { - val hv = hypervisors[server] ?: return - hv.driver = hypervisor - availableHypervisors += hv - - tracer.commit(HypervisorAvailableEvent(hv.uid)) - - eventFlow.emit( - VirtProvisioningEvent.MetricsAvailable( - this@SimVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, - submittedVms, - runningVms, - finishedVms, - queuedVms, - unscheduledVms - ) - ) - - hv.driver.events - .onEach { event -> - if (event is HypervisorEvent.VmsUpdated) { - hv.numberOfActiveServers = event.numberOfActiveServers - hv.availableMemory = event.availableMemory - } - }.launchIn(coroutineScope) - - requestCycle() - } - - public data class ImageView( - public val name: String, - public val image: Image, - public val flavor: Flavor, - public val continuation: Continuation<Server>, - public var server: Server? = null - ) -} diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimWorkloadImage.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimWorkloadImage.kt deleted file mode 100644 index b48de1d5..00000000 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimWorkloadImage.kt +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.simulator - -import org.opendc.compute.core.image.Image -import org.opendc.core.resource.TagContainer -import org.opendc.simulator.compute.workload.SimWorkload -import java.util.* - -/** - * An application [Image] that runs a [SimWorkload]. - * - * @property uid The unique identifier of this image. - * @property name The name of this image. - * @property tags The tags attached to the image. - * @property workload The workload to run for this image. - */ -public data class SimWorkloadImage( - public override val uid: UUID, - public override val name: String, - public override val tags: TagContainer, - public val workload: SimWorkload -) : Image diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/power/api/CpuPowerModel.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/power/api/CpuPowerModel.kt index ee9e130b..0141bc8c 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/power/api/CpuPowerModel.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/power/api/CpuPowerModel.kt @@ -2,7 +2,7 @@ package org.opendc.compute.simulator.power.api import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.map -import org.opendc.compute.core.metal.driver.BareMetalDriver +import org.opendc.metal.driver.BareMetalDriver public interface CpuPowerModel { /** diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/power/models/ZeroIdlePowerDecorator.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/power/models/ZeroIdlePowerDecorator.kt index 938e5607..b0c3fa4c 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/power/models/ZeroIdlePowerDecorator.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/power/models/ZeroIdlePowerDecorator.kt @@ -5,10 +5,10 @@ import org.opendc.compute.simulator.power.api.CpuPowerModel /** * A decorator for ignoring the idle power when computing energy consumption of components. * - * @param cpuModelWrappee The wrappe of a [CpuPowerModel]. + * @param delegate The [CpuPowerModel] to delegate to. */ -public class ZeroIdlePowerDecorator(private val cpuModelWrappee: CpuPowerModel) : CpuPowerModel { +public class ZeroIdlePowerDecorator(private val delegate: CpuPowerModel) : CpuPowerModel { override fun computeCpuPower(cpuUtil: Double): Double { - return if (cpuUtil == 0.0) 0.0 else cpuModelWrappee.computeCpuPower(cpuUtil) + return if (cpuUtil == 0.0) 0.0 else delegate.computeCpuPower(cpuUtil) } } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimBareMetalDriverTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimBareMetalDriverTest.kt index fb8a5f47..0d90376e 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimBareMetalDriverTest.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimBareMetalDriverTest.kt @@ -30,8 +30,10 @@ import kotlinx.coroutines.withContext import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test -import org.opendc.compute.core.ServerEvent -import org.opendc.compute.core.ServerState +import org.junit.jupiter.api.assertAll +import org.opendc.compute.api.Image +import org.opendc.metal.NodeEvent +import org.opendc.metal.NodeState import org.opendc.simulator.compute.SimMachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode @@ -59,22 +61,21 @@ internal class SimBareMetalDriverTest { val testScope = TestCoroutineScope() val clock = DelayControllerClockAdapter(testScope) - var finalState: ServerState = ServerState.BUILD + var finalState: NodeState = NodeState.UNKNOWN var finalTime = 0L testScope.launch { val driver = SimBareMetalDriver(this, clock, UUID.randomUUID(), "test", emptyMap(), machineModel) - val image = SimWorkloadImage(UUID.randomUUID(), "<unnamed>", emptyMap(), SimFlopsWorkload(4_000, utilization = 1.0)) - + val image = Image(UUID.randomUUID(), "<unnamed>", mapOf("workload" to SimFlopsWorkload(4_000, utilization = 1.0))) // Batch driver commands withContext(coroutineContext) { driver.init() driver.setImage(image) - val server = driver.start().server!! - server.events.collect { event -> + val node = driver.start() + node.events.collect { event -> when (event) { - is ServerEvent.StateChanged -> { - finalState = event.server.state + is NodeEvent.StateChanged -> { + finalState = event.node.state finalTime = clock.millis() } } @@ -83,7 +84,9 @@ internal class SimBareMetalDriverTest { } testScope.advanceUntilIdle() - assertEquals(ServerState.SHUTOFF, finalState) - assertEquals(501, finalTime) + assertAll( + { assertEquals(NodeState.SHUTOFF, finalState) }, + { assertEquals(501, finalTime) } + ) } } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimVirtDriverTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index 1831eae0..61bff39f 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimVirtDriverTest.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -24,6 +24,7 @@ package org.opendc.compute.simulator import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.emptyFlow import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch @@ -32,8 +33,14 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll -import org.opendc.compute.core.Flavor -import org.opendc.compute.core.virt.HypervisorEvent +import org.opendc.compute.api.Flavor +import org.opendc.compute.api.Image +import org.opendc.compute.api.Server +import org.opendc.compute.api.ServerState +import org.opendc.compute.api.ServerWatcher +import org.opendc.compute.service.driver.HostEvent +import org.opendc.metal.Node +import org.opendc.metal.NodeState import org.opendc.simulator.compute.SimFairShareHypervisorProvider import org.opendc.simulator.compute.SimMachineModel import org.opendc.simulator.compute.model.MemoryUnit @@ -48,7 +55,7 @@ import java.util.UUID * Basic test-suite for the hypervisor. */ @OptIn(ExperimentalCoroutinesApi::class) -internal class SimVirtDriverTest { +internal class SimHostTest { private lateinit var scope: TestCoroutineScope private lateinit var clock: Clock private lateinit var machineModel: SimMachineModel @@ -75,35 +82,42 @@ internal class SimVirtDriverTest { var grantedWork = 0L var overcommittedWork = 0L + val node = Node( + UUID.randomUUID(), "name", emptyMap(), NodeState.SHUTOFF, + Flavor(machineModel.cpus.size, machineModel.memory.map { it.size }.sum()), Image.EMPTY, emptyFlow() + ) + scope.launch { - val virtDriver = SimVirtDriver(this, SimFairShareHypervisorProvider()) - val vmm = SimWorkloadImage(UUID.randomUUID(), "vmm", emptyMap(), virtDriver) + val virtDriver = SimHost(node, this, SimFairShareHypervisorProvider()) + val vmm = Image(UUID.randomUUID(), "vmm", mapOf("workload" to virtDriver)) val duration = 5 * 60L - val vmImageA = SimWorkloadImage( + val vmImageA = Image( UUID.randomUUID(), "<unnamed>", - emptyMap(), - SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(duration * 1000, 28.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 3500.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 0.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 183.0, 2) - ), + mapOf( + "workload" to SimTraceWorkload( + sequenceOf( + SimTraceWorkload.Fragment(duration * 1000, 28.0, 2), + SimTraceWorkload.Fragment(duration * 1000, 3500.0, 2), + SimTraceWorkload.Fragment(duration * 1000, 0.0, 2), + SimTraceWorkload.Fragment(duration * 1000, 183.0, 2) + ), + ) ) ) - val vmImageB = SimWorkloadImage( + val vmImageB = Image( UUID.randomUUID(), "<unnamed>", - emptyMap(), - SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(duration * 1000, 28.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 3100.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 0.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 73.0, 2) + mapOf( + "workload" to SimTraceWorkload( + sequenceOf( + SimTraceWorkload.Fragment(duration * 1000, 28.0, 2), + SimTraceWorkload.Fragment(duration * 1000, 3100.0, 2), + SimTraceWorkload.Fragment(duration * 1000, 0.0, 2), + SimTraceWorkload.Fragment(duration * 1000, 73.0, 2) + ) ) - ), + ) ) val metalDriver = @@ -119,7 +133,7 @@ internal class SimVirtDriverTest { virtDriver.events .onEach { event -> when (event) { - is HypervisorEvent.SliceFinished -> { + is HostEvent.SliceFinished -> { requestedWork += event.requestedBurst grantedWork += event.grantedBurst overcommittedWork += event.overcommissionedBurst @@ -128,8 +142,8 @@ internal class SimVirtDriverTest { } .launchIn(this) - virtDriver.spawn("a", vmImageA, flavor) - virtDriver.spawn("b", vmImageB, flavor) + launch { virtDriver.spawn(MockServer(UUID.randomUUID(), "a", flavor, vmImageA)) } + launch { virtDriver.spawn(MockServer(UUID.randomUUID(), "b", flavor, vmImageB)) } } scope.advanceUntilIdle() @@ -142,4 +156,20 @@ internal class SimVirtDriverTest { { assertEquals(1200006, scope.currentTime) } ) } + + private class MockServer( + override val uid: UUID, + override val name: String, + override val flavor: Flavor, + override val image: Image + ) : Server { + override val tags: Map<String, String> = emptyMap() + override val state: ServerState = ServerState.BUILD + + override fun watch(watcher: ServerWatcher) {} + + override fun unwatch(watcher: ServerWatcher) {} + + override suspend fun refresh() {} + } } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimProvisioningServiceTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimProvisioningServiceTest.kt index a33a4e5f..33b3db94 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimProvisioningServiceTest.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimProvisioningServiceTest.kt @@ -29,7 +29,8 @@ import kotlinx.coroutines.launch import kotlinx.coroutines.test.TestCoroutineScope import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test -import org.opendc.compute.core.metal.service.SimpleProvisioningService +import org.opendc.compute.api.Image +import org.opendc.metal.service.SimpleProvisioningService import org.opendc.simulator.compute.SimMachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode @@ -64,7 +65,7 @@ internal class SimProvisioningServiceTest { val clock = DelayControllerClockAdapter(testScope) testScope.launch { - val image = SimWorkloadImage(UUID.randomUUID(), "<unnamed>", emptyMap(), SimFlopsWorkload(1000)) + val image = Image(UUID.randomUUID(), "<unnamed>", mapOf("machine" to SimFlopsWorkload(1000))) val driver = SimBareMetalDriver(this, clock, UUID.randomUUID(), "test", emptyMap(), machineModel) val provisioner = SimpleProvisioningService() @@ -72,7 +73,7 @@ internal class SimProvisioningServiceTest { delay(5) val nodes = provisioner.nodes() val node = provisioner.deploy(nodes.first(), image) - node.server!!.events.collect { println(it) } + node.events.collect { println(it) } } testScope.advanceUntilIdle() diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/power/CpuPowerModelTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/power/CpuPowerModelTest.kt index 7b0c7515..d4d88fb1 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/power/CpuPowerModelTest.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/power/CpuPowerModelTest.kt @@ -7,9 +7,9 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.Arguments import org.junit.jupiter.params.provider.MethodSource -import org.opendc.compute.core.metal.driver.BareMetalDriver import org.opendc.compute.simulator.power.api.CpuPowerModel import org.opendc.compute.simulator.power.models.* +import org.opendc.metal.driver.BareMetalDriver import java.util.stream.Stream import kotlin.math.pow diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index 8f3e686a..a5cf4fc0 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -32,22 +32,26 @@ import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.takeWhile import kotlinx.coroutines.launch import mu.KotlinLogging -import org.opendc.compute.core.Flavor -import org.opendc.compute.core.ServerEvent -import org.opendc.compute.core.metal.NODE_CLUSTER -import org.opendc.compute.core.metal.driver.BareMetalDriver -import org.opendc.compute.core.metal.service.ProvisioningService -import org.opendc.compute.core.virt.HypervisorEvent -import org.opendc.compute.core.virt.service.VirtProvisioningEvent -import org.opendc.compute.core.workload.VmWorkload +import org.opendc.compute.api.ComputeWorkload +import org.opendc.compute.api.Flavor +import org.opendc.compute.api.Server +import org.opendc.compute.api.ServerState +import org.opendc.compute.api.ServerWatcher +import org.opendc.compute.service.ComputeService +import org.opendc.compute.service.ComputeServiceEvent +import org.opendc.compute.service.driver.HostEvent +import org.opendc.compute.service.internal.ComputeServiceImpl +import org.opendc.compute.service.scheduler.AllocationPolicy import org.opendc.compute.simulator.SimBareMetalDriver -import org.opendc.compute.simulator.SimVirtDriver -import org.opendc.compute.simulator.SimVirtProvisioningService -import org.opendc.compute.simulator.allocation.AllocationPolicy +import org.opendc.compute.simulator.SimHost +import org.opendc.compute.simulator.SimHostProvisioner import org.opendc.experiments.capelin.monitor.ExperimentMonitor import org.opendc.experiments.capelin.trace.Sc20StreamingParquetTraceReader import org.opendc.format.environment.EnvironmentReader import org.opendc.format.trace.TraceReader +import org.opendc.metal.NODE_CLUSTER +import org.opendc.metal.NodeEvent +import org.opendc.metal.service.ProvisioningService import org.opendc.simulator.compute.SimFairShareHypervisorProvider import org.opendc.simulator.compute.interference.PerformanceInterferenceModel import org.opendc.simulator.failures.CorrelatedFaultInjector @@ -135,6 +139,12 @@ public fun createTraceReader( ) } +public data class ProvisionerResult( + val metal: ProvisioningService, + val provisioner: SimHostProvisioner, + val compute: ComputeServiceImpl +) + /** * Construct the environment for a VM provisioner and return the provisioner instance. */ @@ -144,45 +154,52 @@ public suspend fun createProvisioner( environmentReader: EnvironmentReader, allocationPolicy: AllocationPolicy, eventTracer: EventTracer -): Pair<ProvisioningService, SimVirtProvisioningService> { +): ProvisionerResult { val environment = environmentReader.use { it.construct(coroutineScope, clock) } val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService] // Wait for the bare metal nodes to be spawned delay(10) - val scheduler = SimVirtProvisioningService(coroutineScope, clock, bareMetalProvisioner, allocationPolicy, eventTracer, SimFairShareHypervisorProvider()) + val provisioner = SimHostProvisioner(coroutineScope.coroutineContext, bareMetalProvisioner, SimFairShareHypervisorProvider()) + val hosts = provisioner.provisionAll() + + val scheduler = ComputeService(coroutineScope.coroutineContext, clock, eventTracer, allocationPolicy) as ComputeServiceImpl + + for (host in hosts) { + scheduler.addHost(host) + } // Wait for the hypervisors to be spawned delay(10) - return bareMetalProvisioner to scheduler + return ProvisionerResult(bareMetalProvisioner, provisioner, scheduler) } /** * Attach the specified monitor to the VM provisioner. */ @OptIn(ExperimentalCoroutinesApi::class) -public suspend fun attachMonitor( +public fun attachMonitor( coroutineScope: CoroutineScope, clock: Clock, - scheduler: SimVirtProvisioningService, + scheduler: ComputeService, monitor: ExperimentMonitor ) { - val hypervisors = scheduler.drivers() + val hypervisors = scheduler.hosts // Monitor hypervisor events for (hypervisor in hypervisors) { - // TODO Do not expose VirtDriver directly but use Hypervisor class. - val server = (hypervisor as SimVirtDriver).server + // TODO Do not expose Host directly but use Hypervisor class. + val server = (hypervisor as SimHost).node monitor.reportHostStateChange(clock.millis(), hypervisor, server) server.events .onEach { event -> val time = clock.millis() when (event) { - is ServerEvent.StateChanged -> { - monitor.reportHostStateChange(time, hypervisor, event.server) + is NodeEvent.StateChanged -> { + monitor.reportHostStateChange(time, hypervisor, event.node) } } } @@ -190,7 +207,7 @@ public suspend fun attachMonitor( hypervisor.events .onEach { event -> when (event) { - is HypervisorEvent.SliceFinished -> monitor.reportHostSlice( + is HostEvent.SliceFinished -> monitor.reportHostSlice( clock.millis(), event.requestedBurst, event.grantedBurst, @@ -199,22 +216,22 @@ public suspend fun attachMonitor( event.cpuUsage, event.cpuDemand, event.numberOfDeployedImages, - event.hostServer + (event.driver as SimHost).node ) } } .launchIn(coroutineScope) - val driver = hypervisor.server.services[BareMetalDriver.Key] as SimBareMetalDriver + val driver = server.metadata["driver"] as SimBareMetalDriver driver.powerDraw - .onEach { monitor.reportPowerConsumption(hypervisor.server, it) } + .onEach { monitor.reportPowerConsumption(server, it) } .launchIn(coroutineScope) } scheduler.events .onEach { event -> when (event) { - is VirtProvisioningEvent.MetricsAvailable -> + is ComputeServiceEvent.MetricsAvailable -> monitor.reportProvisionerMetrics(clock.millis(), event) } } @@ -227,11 +244,12 @@ public suspend fun attachMonitor( public suspend fun processTrace( coroutineScope: CoroutineScope, clock: Clock, - reader: TraceReader<VmWorkload>, - scheduler: SimVirtProvisioningService, + reader: TraceReader<ComputeWorkload>, + scheduler: ComputeService, chan: Channel<Unit>, monitor: ExperimentMonitor ) { + val client = scheduler.newClient() try { var submitted = 0 @@ -242,7 +260,7 @@ public suspend fun processTrace( delay(max(0, time - clock.millis())) coroutineScope.launch { chan.send(Unit) - val server = scheduler.deploy( + val server = client.newServer( workload.image.name, workload.image, Flavor( @@ -250,21 +268,19 @@ public suspend fun processTrace( workload.image.tags["required-memory"] as Long ) ) - // Monitor server events - server.events - .onEach { - if (it is ServerEvent.StateChanged) { - monitor.reportVmStateChange(clock.millis(), it.server) - } + + server.watch(object : ServerWatcher { + override fun onStateChanged(server: Server, newState: ServerState) { + monitor.reportVmStateChange(clock.millis(), server, newState) } - .collect() + }) } } scheduler.events .takeWhile { when (it) { - is VirtProvisioningEvent.MetricsAvailable -> + is ComputeServiceEvent.MetricsAvailable -> it.inactiveVmCount + it.failedVmCount != submitted } } @@ -272,5 +288,6 @@ public suspend fun processTrace( delay(1) } finally { reader.close() + client.close() } } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 75b0d735..ff0a026d 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -28,6 +28,12 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.launch import kotlinx.coroutines.test.TestCoroutineScope import mu.KotlinLogging +import org.opendc.compute.service.scheduler.AllocationPolicy +import org.opendc.compute.service.scheduler.AvailableCoreMemoryAllocationPolicy +import org.opendc.compute.service.scheduler.AvailableMemoryAllocationPolicy +import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy +import org.opendc.compute.service.scheduler.ProvisionedCoresAllocationPolicy +import org.opendc.compute.service.scheduler.RandomAllocationPolicy import org.opendc.compute.simulator.allocation.* import org.opendc.experiments.capelin.experiment.attachMonitor import org.opendc.experiments.capelin.experiment.createFailureDomain @@ -151,7 +157,7 @@ public abstract class Portfolio(name: String) : Experiment(name) { ) testScope.launch { - val (bareMetalProvisioner, scheduler) = createProvisioner( + val (bareMetalProvisioner, provisioner, scheduler) = createProvisioner( this, clock, environment, @@ -190,7 +196,8 @@ public abstract class Portfolio(name: String) : Experiment(name) { logger.debug("FINISHED=${scheduler.finishedVms}") failureDomain?.cancel() - scheduler.terminate() + scheduler.close() + provisioner.close() } try { diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt index 3c6637bf..1e42cf56 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt @@ -22,9 +22,11 @@ package org.opendc.experiments.capelin.monitor -import org.opendc.compute.core.Server -import org.opendc.compute.core.virt.driver.VirtDriver -import org.opendc.compute.core.virt.service.VirtProvisioningEvent +import org.opendc.compute.api.Server +import org.opendc.compute.api.ServerState +import org.opendc.compute.service.ComputeServiceEvent +import org.opendc.compute.service.driver.Host +import org.opendc.metal.Node import java.io.Closeable /** @@ -34,22 +36,22 @@ public interface ExperimentMonitor : Closeable { /** * This method is invoked when the state of a VM changes. */ - public fun reportVmStateChange(time: Long, server: Server) {} + public fun reportVmStateChange(time: Long, server: Server, newState: ServerState) {} /** * This method is invoked when the state of a host changes. */ public fun reportHostStateChange( time: Long, - driver: VirtDriver, - server: Server + driver: Host, + host: Node ) { } /** * Report the power consumption of a host. */ - public fun reportPowerConsumption(host: Server, draw: Double) {} + public fun reportPowerConsumption(host: Node, draw: Double) {} /** * This method is invoked for a host for each slice that is finishes. @@ -63,7 +65,7 @@ public interface ExperimentMonitor : Closeable { cpuUsage: Double, cpuDemand: Double, numberOfDeployedImages: Int, - hostServer: Server, + host: Node, duration: Long = 5 * 60 * 1000L ) { } @@ -71,5 +73,5 @@ public interface ExperimentMonitor : Closeable { /** * This method is invoked for a provisioner event. */ - public fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) {} + public fun reportProvisionerMetrics(time: Long, event: ComputeServiceEvent.MetricsAvailable) {} } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt index a0d57656..98052214 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt @@ -23,13 +23,15 @@ package org.opendc.experiments.capelin.monitor import mu.KotlinLogging -import org.opendc.compute.core.Server -import org.opendc.compute.core.virt.driver.VirtDriver -import org.opendc.compute.core.virt.service.VirtProvisioningEvent +import org.opendc.compute.api.Server +import org.opendc.compute.api.ServerState +import org.opendc.compute.service.ComputeServiceEvent +import org.opendc.compute.service.driver.Host import org.opendc.experiments.capelin.telemetry.HostEvent import org.opendc.experiments.capelin.telemetry.ProvisionerEvent import org.opendc.experiments.capelin.telemetry.parquet.ParquetHostEventWriter import org.opendc.experiments.capelin.telemetry.parquet.ParquetProvisionerEventWriter +import org.opendc.metal.Node import java.io.File /** @@ -49,10 +51,10 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: File(base, "provisioner-metrics/$partition/data.parquet"), bufferSize ) - private val currentHostEvent = mutableMapOf<Server, HostEvent>() + private val currentHostEvent = mutableMapOf<Node, HostEvent>() private var startTime = -1L - override fun reportVmStateChange(time: Long, server: Server) { + override fun reportVmStateChange(time: Long, server: Server, newState: ServerState) { if (startTime < 0) { startTime = time @@ -63,12 +65,12 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: override fun reportHostStateChange( time: Long, - driver: VirtDriver, - server: Server + driver: Host, + host: Node ) { - logger.debug { "Host ${server.uid} changed state ${server.state} [$time]" } + logger.debug { "Host ${host.uid} changed state ${host.state} [$time]" } - val previousEvent = currentHostEvent[server] + val previousEvent = currentHostEvent[host] val roundedTime = previousEvent?.let { val duration = time - it.timestamp @@ -91,13 +93,13 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: 0.0, 0.0, 0, - server + host ) } - private val lastPowerConsumption = mutableMapOf<Server, Double>() + private val lastPowerConsumption = mutableMapOf<Node, Double>() - override fun reportPowerConsumption(host: Server, draw: Double) { + override fun reportPowerConsumption(host: Node, draw: Double) { lastPowerConsumption[host] = draw } @@ -110,16 +112,16 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: cpuUsage: Double, cpuDemand: Double, numberOfDeployedImages: Int, - hostServer: Server, + host: Node, duration: Long ) { - val previousEvent = currentHostEvent[hostServer] + val previousEvent = currentHostEvent[host] when { previousEvent == null -> { val event = HostEvent( time, 5 * 60 * 1000L, - hostServer, + host, numberOfDeployedImages, requestedBurst, grantedBurst, @@ -127,17 +129,17 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: interferedBurst, cpuUsage, cpuDemand, - lastPowerConsumption[hostServer] ?: 200.0, - hostServer.flavor.cpuCount + lastPowerConsumption[host] ?: 200.0, + host.flavor.cpuCount ) - currentHostEvent[hostServer] = event + currentHostEvent[host] = event } previousEvent.timestamp == time -> { val event = HostEvent( time, previousEvent.duration, - hostServer, + host, numberOfDeployedImages, requestedBurst, grantedBurst, @@ -145,11 +147,11 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: interferedBurst, cpuUsage, cpuDemand, - lastPowerConsumption[hostServer] ?: 200.0, - hostServer.flavor.cpuCount + lastPowerConsumption[host] ?: 200.0, + host.flavor.cpuCount ) - currentHostEvent[hostServer] = event + currentHostEvent[host] = event } else -> { hostWriter.write(previousEvent) @@ -157,7 +159,7 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: val event = HostEvent( time, time - previousEvent.timestamp, - hostServer, + host, numberOfDeployedImages, requestedBurst, grantedBurst, @@ -165,16 +167,16 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: interferedBurst, cpuUsage, cpuDemand, - lastPowerConsumption[hostServer] ?: 200.0, - hostServer.flavor.cpuCount + lastPowerConsumption[host] ?: 200.0, + host.flavor.cpuCount ) - currentHostEvent[hostServer] = event + currentHostEvent[host] = event } } } - override fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) { + override fun reportProvisionerMetrics(time: Long, event: ComputeServiceEvent.MetricsAvailable) { provisionerWriter.write( ProvisionerEvent( time, diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt index e5e9d520..e7b6a7bb 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt @@ -22,7 +22,7 @@ package org.opendc.experiments.capelin.telemetry -import org.opendc.compute.core.Server +import org.opendc.metal.Node /** * A periodic report of the host machine metrics. @@ -30,7 +30,7 @@ import org.opendc.compute.core.Server public data class HostEvent( override val timestamp: Long, public val duration: Long, - public val host: Server, + public val node: Node, public val vmCount: Int, public val requestedBurst: Long, public val grantedBurst: Long, diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt index 427c453a..7631f55f 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt @@ -22,7 +22,7 @@ package org.opendc.experiments.capelin.telemetry -import org.opendc.compute.core.Server +import org.opendc.compute.api.Server /** * A periodic report of a virtual machine's metrics. diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt index 4a3e7963..b4fdd66a 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt @@ -41,8 +41,8 @@ public class ParquetHostEventWriter(path: File, bufferSize: Int) : // record.put("portfolio_id", event.run.parent.parent.id) // record.put("scenario_id", event.run.parent.id) // record.put("run_id", event.run.id) - record.put("host_id", event.host.name) - record.put("state", event.host.state.name) + record.put("host_id", event.node.name) + record.put("state", event.node.state.name) record.put("timestamp", event.timestamp) record.put("duration", event.duration) record.put("vm_count", event.vmCount) diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt index 6cfdae40..f9630078 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt @@ -22,8 +22,8 @@ package org.opendc.experiments.capelin.trace -import org.opendc.compute.core.workload.VmWorkload -import org.opendc.compute.simulator.SimWorkloadImage +import org.opendc.compute.api.ComputeWorkload +import org.opendc.compute.api.Image import org.opendc.experiments.capelin.model.CompositeWorkload import org.opendc.experiments.capelin.model.Workload import org.opendc.format.trace.TraceEntry @@ -45,11 +45,11 @@ public class Sc20ParquetTraceReader( performanceInterferenceModel: Map<String, PerformanceInterferenceModel>, workload: Workload, seed: Int -) : TraceReader<VmWorkload> { +) : TraceReader<ComputeWorkload> { /** * The iterator over the actual trace. */ - private val iterator: Iterator<TraceEntry<VmWorkload>> = + private val iterator: Iterator<TraceEntry<ComputeWorkload>> = rawReaders .map { it.read() } .run { @@ -73,11 +73,10 @@ public class Sc20ParquetTraceReader( performanceInterferenceModel[id] ?: PerformanceInterferenceModel(TreeSet()) val newImage = - SimWorkloadImage( + Image( image.uid, image.name, image.tags + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), - (image as SimWorkloadImage).workload ) val newWorkload = entry.workload.copy(image = newImage) Sc20RawParquetTraceReader.TraceEntryImpl(entry.submissionTime, newWorkload) @@ -88,7 +87,7 @@ public class Sc20ParquetTraceReader( override fun hasNext(): Boolean = iterator.hasNext() - override fun next(): TraceEntry<VmWorkload> = iterator.next() + override fun next(): TraceEntry<ComputeWorkload> = iterator.next() override fun close() {} } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt index d2560d62..b29bdc54 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt @@ -26,8 +26,8 @@ import mu.KotlinLogging import org.apache.avro.generic.GenericData import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetReader -import org.opendc.compute.core.workload.VmWorkload -import org.opendc.compute.simulator.SimWorkloadImage +import org.opendc.compute.api.ComputeWorkload +import org.opendc.compute.api.Image import org.opendc.core.User import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader @@ -108,11 +108,12 @@ public class Sc20RawParquetTraceReader(private val path: File) { val vmFragments = fragments.getValue(id).asSequence() val totalLoad = vmFragments.sumByDouble { it.usage } * 5 * 60 // avg MHz * duration = MFLOPs - val vmWorkload = VmWorkload( + val workload = SimTraceWorkload(vmFragments) + val vmWorkload = ComputeWorkload( uid, id, UnnamedUser, - SimWorkloadImage( + Image( uid, id, mapOf( @@ -120,9 +121,9 @@ public class Sc20RawParquetTraceReader(private val path: File) { "end-time" to endTime, "total-load" to totalLoad, "cores" to maxCores, - "required-memory" to requiredMemory - ), - SimTraceWorkload(vmFragments) + "required-memory" to requiredMemory, + "workload" to workload + ) ) ) entries.add(TraceEntryImpl(submissionTime, vmWorkload)) @@ -150,7 +151,7 @@ public class Sc20RawParquetTraceReader(private val path: File) { /** * Read the entries in the trace. */ - public fun read(): List<TraceEntry<VmWorkload>> = entries + public fun read(): List<TraceEntry<ComputeWorkload>> = entries /** * An unnamed user. @@ -165,6 +166,6 @@ public class Sc20RawParquetTraceReader(private val path: File) { */ internal data class TraceEntryImpl( override var submissionTime: Long, - override val workload: VmWorkload - ) : TraceEntry<VmWorkload> + override val workload: ComputeWorkload + ) : TraceEntry<ComputeWorkload> } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt index 12705c80..c588fda3 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt @@ -31,8 +31,8 @@ import org.apache.parquet.filter2.predicate.FilterApi import org.apache.parquet.filter2.predicate.Statistics import org.apache.parquet.filter2.predicate.UserDefinedPredicate import org.apache.parquet.io.api.Binary -import org.opendc.compute.core.workload.VmWorkload -import org.opendc.compute.simulator.SimWorkloadImage +import org.opendc.compute.api.ComputeWorkload +import org.opendc.compute.api.Image import org.opendc.core.User import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader @@ -62,11 +62,11 @@ public class Sc20StreamingParquetTraceReader( performanceInterferenceModel: PerformanceInterferenceModel, selectedVms: List<String>, random: Random -) : TraceReader<VmWorkload> { +) : TraceReader<ComputeWorkload> { /** * The internal iterator to use for this reader. */ - private val iterator: Iterator<TraceEntry<VmWorkload>> + private val iterator: Iterator<TraceEntry<ComputeWorkload>> /** * The intermediate buffer to store the read records in. @@ -235,19 +235,20 @@ public class Sc20StreamingParquetTraceReader( performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSortedSet(), Random(random.nextInt()) ) - val vmWorkload = VmWorkload( + val workload = SimTraceWorkload(fragments) + val vmWorkload = ComputeWorkload( uid, "VM Workload $id", UnnamedUser, - SimWorkloadImage( + Image( uid, id, mapOf( IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems, "cores" to maxCores, - "required-memory" to requiredMemory - ), - SimTraceWorkload(fragments), + "required-memory" to requiredMemory, + "workload" to workload + ) ) ) @@ -263,7 +264,7 @@ public class Sc20StreamingParquetTraceReader( override fun hasNext(): Boolean = iterator.hasNext() - override fun next(): TraceEntry<VmWorkload> = iterator.next() + override fun next(): TraceEntry<ComputeWorkload> = iterator.next() override fun close() { readerThread.interrupt() @@ -300,6 +301,6 @@ public class Sc20StreamingParquetTraceReader( */ private data class TraceEntryImpl( override var submissionTime: Long, - override val workload: VmWorkload - ) : TraceEntry<VmWorkload> + override val workload: ComputeWorkload + ) : TraceEntry<ComputeWorkload> } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt index 4d9b9df1..881652f6 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt @@ -23,8 +23,8 @@ package org.opendc.experiments.capelin.trace import mu.KotlinLogging -import org.opendc.compute.core.workload.VmWorkload -import org.opendc.compute.simulator.SimWorkloadImage +import org.opendc.compute.api.ComputeWorkload +import org.opendc.compute.api.Image import org.opendc.experiments.capelin.model.CompositeWorkload import org.opendc.experiments.capelin.model.SamplingStrategy import org.opendc.experiments.capelin.model.Workload @@ -38,11 +38,11 @@ private val logger = KotlinLogging.logger {} * Sample the workload for the specified [run]. */ public fun sampleWorkload( - trace: List<TraceEntry<VmWorkload>>, + trace: List<TraceEntry<ComputeWorkload>>, workload: Workload, subWorkload: Workload, seed: Int -): List<TraceEntry<VmWorkload>> { +): List<TraceEntry<ComputeWorkload>> { return when { workload is CompositeWorkload -> sampleRegularWorkload(trace, workload, subWorkload, seed) workload.samplingStrategy == SamplingStrategy.HPC -> @@ -58,15 +58,15 @@ public fun sampleWorkload( * Sample a regular (non-HPC) workload. */ public fun sampleRegularWorkload( - trace: List<TraceEntry<VmWorkload>>, + trace: List<TraceEntry<ComputeWorkload>>, workload: Workload, subWorkload: Workload, seed: Int -): List<TraceEntry<VmWorkload>> { +): List<TraceEntry<ComputeWorkload>> { val fraction = subWorkload.fraction val shuffled = trace.shuffled(Random(seed)) - val res = mutableListOf<TraceEntry<VmWorkload>>() + val res = mutableListOf<TraceEntry<ComputeWorkload>>() val totalLoad = if (workload is CompositeWorkload) { workload.totalLoad } else { @@ -93,11 +93,11 @@ public fun sampleRegularWorkload( * Sample a HPC workload. */ public fun sampleHpcWorkload( - trace: List<TraceEntry<VmWorkload>>, + trace: List<TraceEntry<ComputeWorkload>>, workload: Workload, seed: Int, sampleOnLoad: Boolean -): List<TraceEntry<VmWorkload>> { +): List<TraceEntry<ComputeWorkload>> { val pattern = Regex("^vm__workload__(ComputeNode|cn).*") val random = Random(seed) @@ -109,7 +109,7 @@ public fun sampleHpcWorkload( val hpcSequence = generateSequence(0) { it + 1 } .map { index -> - val res = mutableListOf<TraceEntry<VmWorkload>>() + val res = mutableListOf<TraceEntry<ComputeWorkload>>() hpc.mapTo(res) { sample(it, index) } res.shuffle(random) res @@ -118,7 +118,7 @@ public fun sampleHpcWorkload( val nonHpcSequence = generateSequence(0) { it + 1 } .map { index -> - val res = mutableListOf<TraceEntry<VmWorkload>>() + val res = mutableListOf<TraceEntry<ComputeWorkload>>() nonHpc.mapTo(res) { sample(it, index) } res.shuffle(random) res @@ -139,7 +139,7 @@ public fun sampleHpcWorkload( var nonHpcCount = 0 var nonHpcLoad = 0.0 - val res = mutableListOf<TraceEntry<VmWorkload>>() + val res = mutableListOf<TraceEntry<ComputeWorkload>>() if (sampleOnLoad) { var currentLoad = 0.0 @@ -194,17 +194,16 @@ public fun sampleHpcWorkload( /** * Sample a random trace entry. */ -private fun sample(entry: TraceEntry<VmWorkload>, i: Int): TraceEntry<VmWorkload> { +private fun sample(entry: TraceEntry<ComputeWorkload>, i: Int): TraceEntry<ComputeWorkload> { val id = UUID.nameUUIDFromBytes("${entry.workload.image.uid}-$i".toByteArray()) - val image = SimWorkloadImage( + val image = Image( id, entry.workload.image.name, - entry.workload.image.tags, - (entry.workload.image as SimWorkloadImage).workload + entry.workload.image.tags ) val vmWorkload = entry.workload.copy(uid = id, image = image, name = entry.workload.name) return VmTraceEntry(vmWorkload, entry.submissionTime) } -private class VmTraceEntry(override val workload: VmWorkload, override val submissionTime: Long) : - TraceEntry<VmWorkload> +private class VmTraceEntry(override val workload: ComputeWorkload, override val submissionTime: Long) : + TraceEntry<ComputeWorkload> diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 6a0796f6..dfc6b90b 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -32,10 +32,9 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll -import org.opendc.compute.core.Server -import org.opendc.compute.core.workload.VmWorkload -import org.opendc.compute.simulator.SimVirtProvisioningService -import org.opendc.compute.simulator.allocation.AvailableCoreMemoryAllocationPolicy +import org.opendc.compute.api.ComputeWorkload +import org.opendc.compute.service.internal.ComputeServiceImpl +import org.opendc.compute.service.scheduler.AvailableCoreMemoryAllocationPolicy import org.opendc.experiments.capelin.experiment.attachMonitor import org.opendc.experiments.capelin.experiment.createFailureDomain import org.opendc.experiments.capelin.experiment.createProvisioner @@ -47,6 +46,7 @@ import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader import org.opendc.format.environment.EnvironmentReader import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader import org.opendc.format.trace.TraceReader +import org.opendc.metal.Node import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.trace.core.EventTracer import java.io.File @@ -97,7 +97,7 @@ class CapelinIntegrationTest { val allocationPolicy = AvailableCoreMemoryAllocationPolicy() val traceReader = createTestTraceReader() val environmentReader = createTestEnvironmentReader() - lateinit var scheduler: SimVirtProvisioningService + lateinit var scheduler: ComputeServiceImpl val tracer = EventTracer(clock) testScope.launch { @@ -108,8 +108,8 @@ class CapelinIntegrationTest { allocationPolicy, tracer ) - val bareMetalProvisioner = res.first - scheduler = res.second + val bareMetalProvisioner = res.metal + scheduler = res.compute val failureDomain = if (failures) { println("ENABLING failures") @@ -138,8 +138,9 @@ class CapelinIntegrationTest { println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") failureDomain?.cancel() - scheduler.terminate() + scheduler.close() monitor.close() + res.provisioner.close() } runSimulation() @@ -148,9 +149,9 @@ class CapelinIntegrationTest { assertAll( { assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") }, { assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") }, - { assertEquals(1684849230562, monitor.totalRequestedBurst) }, - { assertEquals(447612683996, monitor.totalGrantedBurst) }, - { assertEquals(1219535757406, monitor.totalOvercommissionedBurst) }, + { assertEquals(1678587333640, monitor.totalRequestedBurst) }, + { assertEquals(438118200924, monitor.totalGrantedBurst) }, + { assertEquals(1220323969993, monitor.totalOvercommissionedBurst) }, { assertEquals(0, monitor.totalInterferedBurst) } ) } @@ -162,19 +163,16 @@ class CapelinIntegrationTest { val allocationPolicy = AvailableCoreMemoryAllocationPolicy() val traceReader = createTestTraceReader(0.5, seed) val environmentReader = createTestEnvironmentReader("single") - lateinit var scheduler: SimVirtProvisioningService val tracer = EventTracer(clock) testScope.launch { - val res = createProvisioner( + val (_, provisioner, scheduler) = createProvisioner( this, clock, environmentReader, allocationPolicy, tracer ) - scheduler = res.second - attachMonitor(this, clock, scheduler, monitor) processTrace( this, @@ -187,8 +185,9 @@ class CapelinIntegrationTest { println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") - scheduler.terminate() + scheduler.close() monitor.close() + provisioner.close() } runSimulation() @@ -210,7 +209,7 @@ class CapelinIntegrationTest { /** * Obtain the trace reader for the test. */ - private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<VmWorkload> { + private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<ComputeWorkload> { return Sc20ParquetTraceReader( listOf(Sc20RawParquetTraceReader(File("src/test/resources/trace"))), emptyMap(), @@ -242,7 +241,7 @@ class CapelinIntegrationTest { cpuUsage: Double, cpuDemand: Double, numberOfDeployedImages: Int, - hostServer: Server, + host: Node, duration: Long ) { totalRequestedBurst += requestedBurst diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt index fc979363..7b9d70ed 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt +++ b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt @@ -24,13 +24,14 @@ package org.opendc.experiments.sc18 import kotlinx.coroutines.* import kotlinx.coroutines.test.TestCoroutineScope -import org.opendc.compute.core.metal.service.ProvisioningService -import org.opendc.compute.simulator.SimVirtProvisioningService -import org.opendc.compute.simulator.allocation.NumberOfActiveServersAllocationPolicy +import org.opendc.compute.service.ComputeService +import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy +import org.opendc.compute.simulator.SimHostProvisioner import org.opendc.format.environment.sc18.Sc18EnvironmentReader import org.opendc.format.trace.gwf.GwfTraceReader import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf +import org.opendc.metal.service.ProvisioningService import org.opendc.simulator.compute.SimSpaceSharedHypervisorProvider import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.trace.core.EventTracer @@ -91,16 +92,17 @@ public class UnderspecificationExperiment : Experiment("underspecification") { // Wait for the bare metal nodes to be spawned delay(10) - val provisioner = SimVirtProvisioningService( - testScope, + val provisioner = SimHostProvisioner(testScope.coroutineContext, bareMetal, SimSpaceSharedHypervisorProvider()) + val hosts = provisioner.provisionAll() + val compute = ComputeService( + testScope.coroutineContext, clock, - bareMetal, - NumberOfActiveServersAllocationPolicy(), tracer, - SimSpaceSharedHypervisorProvider(), - schedulingQuantum = 1000 + NumberOfActiveServersAllocationPolicy(), ) + hosts.forEach { compute.addHost(it) } + // Wait for the hypervisors to be spawned delay(10) @@ -108,7 +110,7 @@ public class UnderspecificationExperiment : Experiment("underspecification") { testScope, clock, tracer, - provisioner, + compute.newClient(), mode = WorkflowSchedulerMode.Batch(100), jobAdmissionPolicy = NullJobAdmissionPolicy, jobOrderPolicy = SubmissionTimeJobOrderPolicy(), diff --git a/simulator/opendc-format/build.gradle.kts b/simulator/opendc-format/build.gradle.kts index cd26c077..37e9c9c8 100644 --- a/simulator/opendc-format/build.gradle.kts +++ b/simulator/opendc-format/build.gradle.kts @@ -31,7 +31,7 @@ plugins { dependencies { api(platform(project(":opendc-platform"))) api(project(":opendc-core")) - api(project(":opendc-compute:opendc-compute-core")) + api(project(":opendc-compute:opendc-compute-api")) api(project(":opendc-workflows")) implementation(project(":opendc-simulator:opendc-simulator-compute")) implementation(project(":opendc-compute:opendc-compute-simulator")) diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt index 2e3e4a73..bbbbe87c 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt @@ -26,14 +26,14 @@ import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue import kotlinx.coroutines.CoroutineScope -import org.opendc.compute.core.metal.service.ProvisioningService -import org.opendc.compute.core.metal.service.SimpleProvisioningService import org.opendc.compute.simulator.SimBareMetalDriver import org.opendc.core.Environment import org.opendc.core.Platform import org.opendc.core.Zone import org.opendc.core.services.ServiceRegistry import org.opendc.format.environment.EnvironmentReader +import org.opendc.metal.service.ProvisioningService +import org.opendc.metal.service.SimpleProvisioningService import org.opendc.simulator.compute.SimMachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt index 6ec8ba4a..998f9cd6 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt @@ -23,9 +23,6 @@ package org.opendc.format.environment.sc20 import kotlinx.coroutines.CoroutineScope -import org.opendc.compute.core.metal.NODE_CLUSTER -import org.opendc.compute.core.metal.service.ProvisioningService -import org.opendc.compute.core.metal.service.SimpleProvisioningService import org.opendc.compute.simulator.SimBareMetalDriver import org.opendc.compute.simulator.power.models.LinearPowerModel import org.opendc.core.Environment @@ -33,6 +30,9 @@ import org.opendc.core.Platform import org.opendc.core.Zone import org.opendc.core.services.ServiceRegistry import org.opendc.format.environment.EnvironmentReader +import org.opendc.metal.NODE_CLUSTER +import org.opendc.metal.service.ProvisioningService +import org.opendc.metal.service.SimpleProvisioningService import org.opendc.simulator.compute.SimMachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt index a58a2524..6cf65f7f 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt @@ -26,8 +26,6 @@ import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue import kotlinx.coroutines.CoroutineScope -import org.opendc.compute.core.metal.service.ProvisioningService -import org.opendc.compute.core.metal.service.SimpleProvisioningService import org.opendc.compute.simulator.SimBareMetalDriver import org.opendc.compute.simulator.power.models.LinearPowerModel import org.opendc.core.Environment @@ -35,6 +33,8 @@ import org.opendc.core.Platform import org.opendc.core.Zone import org.opendc.core.services.ServiceRegistry import org.opendc.format.environment.EnvironmentReader +import org.opendc.metal.service.ProvisioningService +import org.opendc.metal.service.SimpleProvisioningService import org.opendc.simulator.compute.SimMachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt index 90d751ea..1571b17d 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt @@ -22,8 +22,8 @@ package org.opendc.format.trace.bitbrains -import org.opendc.compute.core.workload.VmWorkload -import org.opendc.compute.simulator.SimWorkloadImage +import org.opendc.compute.api.ComputeWorkload +import org.opendc.compute.api.Image import org.opendc.core.User import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader @@ -45,17 +45,17 @@ import kotlin.math.min public class BitbrainsTraceReader( traceDirectory: File, performanceInterferenceModel: PerformanceInterferenceModel -) : TraceReader<VmWorkload> { +) : TraceReader<ComputeWorkload> { /** * The internal iterator to use for this reader. */ - private val iterator: Iterator<TraceEntry<VmWorkload>> + private val iterator: Iterator<TraceEntry<ComputeWorkload>> /** * Initialize the reader. */ init { - val entries = mutableMapOf<Long, TraceEntry<VmWorkload>>() + val entries = mutableMapOf<Long, TraceEntry<ComputeWorkload>>() var timestampCol = 0 var coreCol = 0 @@ -131,19 +131,20 @@ public class BitbrainsTraceReader( .toSortedSet() ) - val vmWorkload = VmWorkload( + val workload = SimTraceWorkload(flopsHistory.asSequence()) + val vmWorkload = ComputeWorkload( uuid, "VM Workload $vmId", UnnamedUser, - SimWorkloadImage( + Image( uuid, vmId.toString(), mapOf( IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems, "cores" to cores, - "required-memory" to requiredMemory - ), - SimTraceWorkload(flopsHistory.asSequence()) + "required-memory" to requiredMemory, + "workload" to workload + ) ) ) entries[vmId] = TraceEntryImpl( @@ -158,7 +159,7 @@ public class BitbrainsTraceReader( override fun hasNext(): Boolean = iterator.hasNext() - override fun next(): TraceEntry<VmWorkload> = iterator.next() + override fun next(): TraceEntry<ComputeWorkload> = iterator.next() override fun close() {} @@ -175,6 +176,6 @@ public class BitbrainsTraceReader( */ private data class TraceEntryImpl( override var submissionTime: Long, - override val workload: VmWorkload - ) : TraceEntry<VmWorkload> + override val workload: ComputeWorkload + ) : TraceEntry<ComputeWorkload> } diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt index c76889c8..cd7aff3c 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt @@ -22,7 +22,7 @@ package org.opendc.format.trace.gwf -import org.opendc.compute.simulator.SimWorkloadImage +import org.opendc.compute.api.Image import org.opendc.core.User import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader @@ -41,7 +41,6 @@ import kotlin.collections.List import kotlin.collections.MutableSet import kotlin.collections.component1 import kotlin.collections.component2 -import kotlin.collections.emptyMap import kotlin.collections.filter import kotlin.collections.forEach import kotlin.collections.getOrPut @@ -136,10 +135,11 @@ public class GwfTraceReader(reader: BufferedReader) : TraceReader<Job> { TraceEntryImpl(submitTime, Job(UUID(0L, taskId), "<unnamed>", UnnamedUser, HashSet())) } val workflow = entry.workload + val workload = SimFlopsWorkload(flops) val task = Task( UUID(0L, taskId), "<unnamed>", - SimWorkloadImage(UUID.randomUUID(), "<unnamed>", emptyMap(), SimFlopsWorkload(flops)), + Image(UUID.randomUUID(), "<unnamed>", mapOf("workload" to workload)), HashSet(), mapOf( WORKFLOW_TASK_CORES to cores, diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt index 78f581ca..07785632 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt @@ -22,8 +22,8 @@ package org.opendc.format.trace.sc20 -import org.opendc.compute.core.workload.VmWorkload -import org.opendc.compute.simulator.SimWorkloadImage +import org.opendc.compute.api.ComputeWorkload +import org.opendc.compute.api.Image import org.opendc.core.User import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader @@ -49,17 +49,17 @@ public class Sc20TraceReader( performanceInterferenceModel: PerformanceInterferenceModel, selectedVms: List<String>, random: Random -) : TraceReader<VmWorkload> { +) : TraceReader<ComputeWorkload> { /** * The internal iterator to use for this reader. */ - private val iterator: Iterator<TraceEntry<VmWorkload>> + private val iterator: Iterator<TraceEntry<ComputeWorkload>> /** * Initialize the reader. */ init { - val entries = mutableMapOf<UUID, TraceEntry<VmWorkload>>() + val entries = mutableMapOf<UUID, TraceEntry<ComputeWorkload>>() val timestampCol = 0 val cpuUsageCol = 1 @@ -156,19 +156,20 @@ public class Sc20TraceReader( performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId) }.toSortedSet(), Random(random.nextInt()) ) - val vmWorkload = VmWorkload( + val workload = SimTraceWorkload(flopsFragments.asSequence()) + val vmWorkload = ComputeWorkload( uuid, "VM Workload $vmId", UnnamedUser, - SimWorkloadImage( + Image( uuid, vmId, mapOf( IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems, "cores" to cores, - "required-memory" to requiredMemory - ), - SimTraceWorkload(flopsFragments.asSequence()) + "required-memory" to requiredMemory, + "workload" to workload + ) ) ) entries[uuid] = TraceEntryImpl( @@ -183,7 +184,7 @@ public class Sc20TraceReader( override fun hasNext(): Boolean = iterator.hasNext() - override fun next(): TraceEntry<VmWorkload> = iterator.next() + override fun next(): TraceEntry<ComputeWorkload> = iterator.next() override fun close() {} @@ -200,6 +201,6 @@ public class Sc20TraceReader( */ private data class TraceEntryImpl( override var submissionTime: Long, - override val workload: VmWorkload - ) : TraceEntry<VmWorkload> + override val workload: ComputeWorkload + ) : TraceEntry<ComputeWorkload> } diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt index 80c54354..ead20c35 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt @@ -22,8 +22,8 @@ package org.opendc.format.trace.swf -import org.opendc.compute.core.workload.VmWorkload -import org.opendc.compute.simulator.SimWorkloadImage +import org.opendc.compute.api.ComputeWorkload +import org.opendc.compute.api.Image import org.opendc.core.User import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader @@ -43,17 +43,17 @@ import java.util.* public class SwfTraceReader( file: File, maxNumCores: Int = -1 -) : TraceReader<VmWorkload> { +) : TraceReader<ComputeWorkload> { /** * The internal iterator to use for this reader. */ - private val iterator: Iterator<TraceEntry<VmWorkload>> + private val iterator: Iterator<TraceEntry<ComputeWorkload>> /** * Initialize the reader. */ init { - val entries = mutableMapOf<Long, TraceEntry<VmWorkload>>() + val entries = mutableMapOf<Long, TraceEntry<ComputeWorkload>>() val jobNumberCol = 0 val submitTimeCol = 1 // seconds (begin of trace is 0) @@ -154,18 +154,19 @@ public class SwfTraceReader( } val uuid = UUID(0L, jobNumber) - val vmWorkload = VmWorkload( + val workload = SimTraceWorkload(flopsHistory.asSequence()) + val vmWorkload = ComputeWorkload( uuid, "SWF Workload $jobNumber", UnnamedUser, - SimWorkloadImage( + Image( uuid, jobNumber.toString(), mapOf( "cores" to cores, - "required-memory" to memory - ), - SimTraceWorkload(flopsHistory.asSequence()) + "required-memory" to memory, + "workload" to workload + ) ) ) @@ -179,7 +180,7 @@ public class SwfTraceReader( override fun hasNext(): Boolean = iterator.hasNext() - override fun next(): TraceEntry<VmWorkload> = iterator.next() + override fun next(): TraceEntry<ComputeWorkload> = iterator.next() override fun close() {} @@ -196,6 +197,6 @@ public class SwfTraceReader( */ private data class TraceEntryImpl( override var submissionTime: Long, - override val workload: VmWorkload - ) : TraceEntry<VmWorkload> + override val workload: ComputeWorkload + ) : TraceEntry<ComputeWorkload> } diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt index d7dc09fa..5a271fab 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt @@ -25,7 +25,7 @@ package org.opendc.format.trace.wtf import org.apache.avro.generic.GenericRecord import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetReader -import org.opendc.compute.simulator.SimWorkloadImage +import org.opendc.compute.api.Image import org.opendc.core.User import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader @@ -78,10 +78,17 @@ public class WtfTraceReader(path: String) : TraceReader<Job> { TraceEntryImpl(submitTime, Job(UUID(0L, taskId), "<unnamed>", UnnamedUser, HashSet())) } val workflow = entry.workload + val workload = SimFlopsWorkload(flops) val task = Task( UUID(0L, taskId), "<unnamed>", - SimWorkloadImage(UUID.randomUUID(), "<unnamed>", emptyMap(), SimFlopsWorkload(flops)), + Image( + UUID.randomUUID(), + "<unnamed>", + mapOf( + "workload" to workload + ) + ), HashSet(), mapOf( WORKFLOW_TASK_CORES to cores, diff --git a/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt b/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt index 45c125c4..7e3d2623 100644 --- a/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt +++ b/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt @@ -24,7 +24,6 @@ package org.opendc.format.trace.swf import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test -import org.opendc.compute.simulator.SimWorkloadImage import org.opendc.simulator.compute.workload.SimTraceWorkload import java.io.File @@ -35,12 +34,12 @@ class SwfTraceReaderTest { var entry = reader.next() assertEquals(0, entry.submissionTime) // 1961 slices for waiting, 3 full and 1 partial running slices - assertEquals(1965, ((entry.workload.image as SimWorkloadImage).workload as SimTraceWorkload).trace.toList().size) + assertEquals(1965, (entry.workload.image.tags["workload"] as SimTraceWorkload).trace.toList().size) entry = reader.next() assertEquals(164472, entry.submissionTime) // 1188 slices for waiting, 0 full and 1 partial running slices - assertEquals(1189, ((entry.workload.image as SimWorkloadImage).workload as SimTraceWorkload).trace.toList().size) - assertEquals(0.25, ((entry.workload.image as SimWorkloadImage).workload as SimTraceWorkload).trace.toList().last().usage) + assertEquals(1189, (entry.workload.image.tags["workload"] as SimTraceWorkload).trace.toList().size) + assertEquals(0.25, (entry.workload.image.tags["workload"] as SimTraceWorkload).trace.toList().last().usage) } } diff --git a/simulator/opendc-compute/opendc-compute-core/build.gradle.kts b/simulator/opendc-metal/build.gradle.kts index a1b6ec0f..9207de18 100644 --- a/simulator/opendc-compute/opendc-compute-core/build.gradle.kts +++ b/simulator/opendc-metal/build.gradle.kts @@ -20,7 +20,7 @@ * SOFTWARE. */ -description = "Core implementation of the OpenDC Compute service" +description = "Bare-metal provisioning in OpenDC" /* Build configuration */ plugins { @@ -30,6 +30,7 @@ plugins { dependencies { api(platform(project(":opendc-platform"))) api(project(":opendc-core")) + api(project(":opendc-compute:opendc-compute-api")) api(project(":opendc-trace:opendc-trace-core")) implementation(project(":opendc-utils")) diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/Metadata.kt b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/Metadata.kt index 11eadd87..ca98dab0 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/Metadata.kt +++ b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/Metadata.kt @@ -22,7 +22,7 @@ * SOFTWARE. */ -package org.opendc.compute.core.metal +package org.opendc.metal /* * Common metadata keys for bare-metal nodes. diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/Node.kt b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/Node.kt index 6d9506f1..1c5c7a8d 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/Node.kt +++ b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/Node.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,11 +20,11 @@ * SOFTWARE. */ -package org.opendc.compute.core.metal +package org.opendc.metal import kotlinx.coroutines.flow.Flow -import org.opendc.compute.core.Server -import org.opendc.compute.core.image.Image +import org.opendc.compute.api.Flavor +import org.opendc.compute.api.Image import org.opendc.core.Identity import java.util.UUID @@ -53,14 +53,14 @@ public data class Node( public val state: NodeState, /** - * The boot image of the node. + * The flavor of the node. */ - public val image: Image, + public val flavor: Flavor, /** - * The server instance that is running on the node or `null` if no server is running. + * The boot image of the node. */ - public val server: Server?, + public val image: Image, /** * The events that are emitted by the node. diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/NodeEvent.kt b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/NodeEvent.kt index 4423e2bf..30ce423c 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/NodeEvent.kt +++ b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/NodeEvent.kt @@ -1,7 +1,5 @@ /* - * MIT License - * - * Copyright (c) 2020 atlarge-research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.compute.core.metal +package org.opendc.metal /** * An event that is emitted by a [Node]. diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/NodeState.kt b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/NodeState.kt index bdc4841e..f1d4ea2e 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/NodeState.kt +++ b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/NodeState.kt @@ -22,7 +22,7 @@ * SOFTWARE. */ -package org.opendc.compute.core.metal +package org.opendc.metal /** * An enumeration describing the possible states of a bare-metal compute node. diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/driver/BareMetalDriver.kt b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/driver/BareMetalDriver.kt index 9db57127..3b15be94 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/driver/BareMetalDriver.kt +++ b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/driver/BareMetalDriver.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,13 +20,13 @@ * SOFTWARE. */ -package org.opendc.compute.core.metal.driver +package org.opendc.metal.driver import kotlinx.coroutines.flow.Flow -import org.opendc.compute.core.Server -import org.opendc.compute.core.image.Image -import org.opendc.compute.core.metal.Node +import org.opendc.compute.api.Image +import org.opendc.compute.api.Server import org.opendc.core.services.AbstractServiceKey +import org.opendc.metal.Node import java.util.UUID /** diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/service/ProvisioningService.kt b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/service/ProvisioningService.kt index bad5b47c..6548767e 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/service/ProvisioningService.kt +++ b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/service/ProvisioningService.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,12 +20,12 @@ * SOFTWARE. */ -package org.opendc.compute.core.metal.service +package org.opendc.metal.service -import org.opendc.compute.core.image.Image -import org.opendc.compute.core.metal.Node -import org.opendc.compute.core.metal.driver.BareMetalDriver +import org.opendc.compute.api.Image import org.opendc.core.services.AbstractServiceKey +import org.opendc.metal.Node +import org.opendc.metal.driver.BareMetalDriver import java.util.UUID /** diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/service/SimpleProvisioningService.kt b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/service/SimpleProvisioningService.kt index 5222f2fb..2d6353c8 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/service/SimpleProvisioningService.kt +++ b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/service/SimpleProvisioningService.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,12 +20,12 @@ * SOFTWARE. */ -package org.opendc.compute.core.metal.service +package org.opendc.metal.service import kotlinx.coroutines.CancellationException -import org.opendc.compute.core.image.Image -import org.opendc.compute.core.metal.Node -import org.opendc.compute.core.metal.driver.BareMetalDriver +import org.opendc.compute.api.Image +import org.opendc.metal.Node +import org.opendc.metal.driver.BareMetalDriver /** * A very basic implementation of the [ProvisioningService]. diff --git a/simulator/opendc-platform/build.gradle.kts b/simulator/opendc-platform/build.gradle.kts index 0ae1c72a..ea0591ad 100644 --- a/simulator/opendc-platform/build.gradle.kts +++ b/simulator/opendc-platform/build.gradle.kts @@ -33,5 +33,6 @@ dependencies { api("io.github.microutils:kotlin-logging:${versions.kotlinLogging}") runtime("org.slf4j:slf4j-simple:${versions.slf4j}") + runtime("org.apache.logging.log4j:log4j-slf4j-impl:${versions.log4j}") } } diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt index 533bf321..482fe754 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt @@ -40,6 +40,11 @@ import kotlinx.coroutines.test.TestCoroutineScope import mu.KotlinLogging import org.bson.Document import org.bson.types.ObjectId +import org.opendc.compute.service.scheduler.AvailableCoreMemoryAllocationPolicy +import org.opendc.compute.service.scheduler.AvailableMemoryAllocationPolicy +import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy +import org.opendc.compute.service.scheduler.ProvisionedCoresAllocationPolicy +import org.opendc.compute.service.scheduler.RandomAllocationPolicy import org.opendc.compute.simulator.allocation.* import org.opendc.experiments.capelin.experiment.attachMonitor import org.opendc.experiments.capelin.experiment.createFailureDomain @@ -242,7 +247,7 @@ public class RunnerCli : CliktCommand(name = "runner") { val tracer = EventTracer(clock) testScope.launch { - val (bareMetalProvisioner, scheduler) = createProvisioner( + val (bareMetalProvisioner, provisioner, scheduler) = createProvisioner( this, clock, environment, @@ -281,7 +286,8 @@ public class RunnerCli : CliktCommand(name = "runner") { logger.debug("FINISHED=${scheduler.finishedVms}") failureDomain?.cancel() - scheduler.terminate() + scheduler.close() + provisioner.close() } try { diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt index f43d0869..2f11347d 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt @@ -32,9 +32,6 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.launch import org.bson.Document import org.bson.types.ObjectId -import org.opendc.compute.core.metal.NODE_CLUSTER -import org.opendc.compute.core.metal.service.ProvisioningService -import org.opendc.compute.core.metal.service.SimpleProvisioningService import org.opendc.compute.simulator.SimBareMetalDriver import org.opendc.compute.simulator.power.models.LinearPowerModel import org.opendc.core.Environment @@ -42,6 +39,9 @@ import org.opendc.core.Platform import org.opendc.core.Zone import org.opendc.core.services.ServiceRegistry import org.opendc.format.environment.EnvironmentReader +import org.opendc.metal.NODE_CLUSTER +import org.opendc.metal.service.ProvisioningService +import org.opendc.metal.service.SimpleProvisioningService import org.opendc.simulator.compute.SimMachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt index f16f9b90..fe814c76 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt @@ -23,12 +23,14 @@ package org.opendc.runner.web import mu.KotlinLogging -import org.opendc.compute.core.Server -import org.opendc.compute.core.ServerState -import org.opendc.compute.core.virt.driver.VirtDriver -import org.opendc.compute.core.virt.service.VirtProvisioningEvent +import org.opendc.compute.api.Server +import org.opendc.compute.api.ServerState +import org.opendc.compute.service.ComputeServiceEvent +import org.opendc.compute.service.driver.Host import org.opendc.experiments.capelin.monitor.ExperimentMonitor import org.opendc.experiments.capelin.telemetry.HostEvent +import org.opendc.metal.Node +import org.opendc.metal.NodeState import kotlin.math.max /** @@ -36,10 +38,10 @@ import kotlin.math.max */ public class WebExperimentMonitor : ExperimentMonitor { private val logger = KotlinLogging.logger {} - private val currentHostEvent = mutableMapOf<Server, HostEvent>() + private val currentHostEvent = mutableMapOf<Node, HostEvent>() private var startTime = -1L - override fun reportVmStateChange(time: Long, server: Server) { + override fun reportVmStateChange(time: Long, server: Server, newState: ServerState) { if (startTime < 0) { startTime = time @@ -50,12 +52,12 @@ public class WebExperimentMonitor : ExperimentMonitor { override fun reportHostStateChange( time: Long, - driver: VirtDriver, - server: Server + driver: Host, + host: Node ) { - logger.debug { "Host ${server.uid} changed state ${server.state} [$time]" } + logger.debug { "Host ${host.uid} changed state ${host.state} [$time]" } - val previousEvent = currentHostEvent[server] + val previousEvent = currentHostEvent[host] val roundedTime = previousEvent?.let { val duration = time - it.timestamp @@ -78,13 +80,13 @@ public class WebExperimentMonitor : ExperimentMonitor { 0.0, 0.0, 0, - server + host ) } - private val lastPowerConsumption = mutableMapOf<Server, Double>() + private val lastPowerConsumption = mutableMapOf<Node, Double>() - override fun reportPowerConsumption(host: Server, draw: Double) { + override fun reportPowerConsumption(host: Node, draw: Double) { lastPowerConsumption[host] = draw } @@ -97,16 +99,16 @@ public class WebExperimentMonitor : ExperimentMonitor { cpuUsage: Double, cpuDemand: Double, numberOfDeployedImages: Int, - hostServer: Server, + host: Node, duration: Long ) { - val previousEvent = currentHostEvent[hostServer] + val previousEvent = currentHostEvent[host] when { previousEvent == null -> { val event = HostEvent( time, 5 * 60 * 1000L, - hostServer, + host, numberOfDeployedImages, requestedBurst, grantedBurst, @@ -114,17 +116,17 @@ public class WebExperimentMonitor : ExperimentMonitor { interferedBurst, cpuUsage, cpuDemand, - lastPowerConsumption[hostServer] ?: 200.0, - hostServer.flavor.cpuCount + lastPowerConsumption[host] ?: 200.0, + host.flavor.cpuCount ) - currentHostEvent[hostServer] = event + currentHostEvent[host] = event } previousEvent.timestamp == time -> { val event = HostEvent( time, previousEvent.duration, - hostServer, + host, numberOfDeployedImages, requestedBurst, grantedBurst, @@ -132,11 +134,11 @@ public class WebExperimentMonitor : ExperimentMonitor { interferedBurst, cpuUsage, cpuDemand, - lastPowerConsumption[hostServer] ?: 200.0, - hostServer.flavor.cpuCount + lastPowerConsumption[host] ?: 200.0, + host.flavor.cpuCount ) - currentHostEvent[hostServer] = event + currentHostEvent[host] = event } else -> { processHostEvent(previousEvent) @@ -144,7 +146,7 @@ public class WebExperimentMonitor : ExperimentMonitor { val event = HostEvent( time, time - previousEvent.timestamp, - hostServer, + host, numberOfDeployedImages, requestedBurst, grantedBurst, @@ -152,17 +154,17 @@ public class WebExperimentMonitor : ExperimentMonitor { interferedBurst, cpuUsage, cpuDemand, - lastPowerConsumption[hostServer] ?: 200.0, - hostServer.flavor.cpuCount + lastPowerConsumption[host] ?: 200.0, + host.flavor.cpuCount ) - currentHostEvent[hostServer] = event + currentHostEvent[host] = event } } } private var hostAggregateMetrics: AggregateHostMetrics = AggregateHostMetrics() - private val hostMetrics: MutableMap<Server, HostMetrics> = mutableMapOf() + private val hostMetrics: MutableMap<Node, HostMetrics> = mutableMapOf() private fun processHostEvent(event: HostEvent) { val slices = event.duration / SLICE_LENGTH @@ -173,14 +175,14 @@ public class WebExperimentMonitor : ExperimentMonitor { hostAggregateMetrics.totalOvercommittedBurst + event.overcommissionedBurst, hostAggregateMetrics.totalInterferedBurst + event.interferedBurst, hostAggregateMetrics.totalPowerDraw + (slices * (event.powerDraw / 12)), - hostAggregateMetrics.totalFailureSlices + if (event.host.state != ServerState.ACTIVE) slices.toLong() else 0, - hostAggregateMetrics.totalFailureVmSlices + if (event.host.state != ServerState.ACTIVE) event.vmCount * slices.toLong() else 0 + hostAggregateMetrics.totalFailureSlices + if (event.node.state != NodeState.ACTIVE) slices.toLong() else 0, + hostAggregateMetrics.totalFailureVmSlices + if (event.node.state != NodeState.ACTIVE) event.vmCount * slices.toLong() else 0 ) - hostMetrics.compute(event.host) { key, prev -> + hostMetrics.compute(event.node) { _, prev -> HostMetrics( - (event.cpuUsage.takeIf { event.host.state == ServerState.ACTIVE } ?: 0.0) + (prev?.cpuUsage ?: 0.0), - (event.cpuDemand.takeIf { event.host.state == ServerState.ACTIVE } ?: 0.0) + (prev?.cpuDemand ?: 0.0), + (event.cpuUsage.takeIf { event.node.state == NodeState.ACTIVE } ?: 0.0) + (prev?.cpuUsage ?: 0.0), + (event.cpuDemand.takeIf { event.node.state == NodeState.ACTIVE } ?: 0.0) + (prev?.cpuDemand ?: 0.0), event.vmCount + (prev?.vmCount ?: 0), 1 + (prev?.count ?: 0) ) @@ -208,7 +210,7 @@ public class WebExperimentMonitor : ExperimentMonitor { private var provisionerMetrics: AggregateProvisionerMetrics = AggregateProvisionerMetrics() - override fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) { + override fun reportProvisionerMetrics(time: Long, event: ComputeServiceEvent.MetricsAvailable) { provisionerMetrics = AggregateProvisionerMetrics( max(event.totalVmCount, provisionerMetrics.vmTotalCount), max(event.waitingVmCount, provisionerMetrics.vmWaitingCount), diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt index 812b5f20..f74c5697 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt @@ -84,33 +84,33 @@ public class SimBareMetalMachine( private val scheduler = TimerScheduler<Cpu>(coroutineScope, clock) /** - * The execution context in which the workload runs. - */ - private val ctx = object : SimExecutionContext { - override val machine: SimMachineModel - get() = this@SimBareMetalMachine.model - - override val clock: Clock - get() = this@SimBareMetalMachine.clock - - override fun interrupt(cpu: Int) { - require(cpu < cpus.size) { "Invalid CPU identifier" } - cpus[cpu].interrupt() - } - } - - /** * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. */ - override suspend fun run(workload: SimWorkload) { + override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) { require(!isTerminated) { "Machine is terminated" } require(cont == null) { "Run should not be called concurrently" } + val ctx = object : SimExecutionContext { + override val machine: SimMachineModel + get() = this@SimBareMetalMachine.model + + override val clock: Clock + get() = this@SimBareMetalMachine.clock + + override val meta: Map<String, Any> + get() = meta + + override fun interrupt(cpu: Int) { + require(cpu < cpus.size) { "Invalid CPU identifier" } + cpus[cpu].interrupt() + } + } + workload.onStart(ctx) return suspendCancellableCoroutine { cont -> this.cont = cont - this.cpus = model.cpus.map { Cpu(it, workload) } + this.cpus = model.cpus.map { Cpu(ctx, it, workload) } for (cpu in cpus) { cpu.start() @@ -161,7 +161,7 @@ public class SimBareMetalMachine( /** * A physical CPU of the machine. */ - private inner class Cpu(val model: ProcessingUnit, val workload: SimWorkload) { + private inner class Cpu(val ctx: SimExecutionContext, val model: ProcessingUnit, val workload: SimWorkload) { /** * The current command. */ diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt index c7c3d3cc..657dac66 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt @@ -41,6 +41,11 @@ public interface SimExecutionContext { public val machine: SimMachineModel /** + * The metadata associated with the context. + */ + public val meta: Map<String, Any> + + /** * Ask the host machine to interrupt the specified vCPU. * * @param cpu The id of the vCPU to interrupt. diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt index 5e86d32b..bf6d8a5e 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt @@ -336,33 +336,33 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener private var cpus: List<VCpu> = emptyList() /** - * The execution context in which the workload runs. - */ - val ctx = object : SimExecutionContext { - override val machine: SimMachineModel - get() = model - - override val clock: Clock - get() = this@SimFairShareHypervisor.ctx.clock - - override fun interrupt(cpu: Int) { - require(cpu < cpus.size) { "Invalid CPU identifier" } - cpus[cpu].interrupt() - } - } - - /** * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. */ - override suspend fun run(workload: SimWorkload) { + override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) { require(!isTerminated) { "Machine is terminated" } require(cont == null) { "Run should not be called concurrently" } + val ctx = object : SimExecutionContext { + override val machine: SimMachineModel + get() = model + + override val clock: Clock + get() = this@SimFairShareHypervisor.ctx.clock + + override val meta: Map<String, Any> + get() = meta + + override fun interrupt(cpu: Int) { + require(cpu < cpus.size) { "Invalid CPU identifier" } + cpus[cpu].interrupt() + } + } + workload.onStart(ctx) return suspendCancellableCoroutine { cont -> this.cont = cont - this.cpus = model.cpus.map { VCpu(this, it, workload) } + this.cpus = model.cpus.map { VCpu(this, ctx, it, workload) } for (cpu in cpus) { // Register vCPU to scheduler @@ -417,7 +417,12 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener /** * A CPU of the virtual machine. */ - private inner class VCpu(val vm: SimVm, val model: ProcessingUnit, val workload: SimWorkload) : Comparable<VCpu> { + private inner class VCpu( + val vm: SimVm, + val ctx: SimExecutionContext, + val model: ProcessingUnit, + val workload: SimWorkload + ) : Comparable<VCpu> { /** * The latest command processed by the CPU. */ @@ -488,7 +493,7 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener isIntermediate = true latestFlush = ctx.clock.millis() - process(workload.onStart(vm.ctx, model.id)) + process(workload.onStart(ctx, model.id)) } catch (e: Throwable) { fail(e) } finally { @@ -514,7 +519,7 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener // Act like nothing has happened in case the vCPU did not reach its deadline or was not // interrupted by the user. if (interrupt || command.deadline <= now) { - process(workload.onNext(vm.ctx, model.id, 0.0)) + process(workload.onNext(ctx, model.id, 0.0)) } } is SimResourceCommand.Consume -> { @@ -545,7 +550,7 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener totalOvercommittedWork += remainingWork } - process(workload.onNext(vm.ctx, model.id, remainingWork)) + process(workload.onNext(ctx, model.id, remainingWork)) } else { process(SimResourceCommand.Consume(remainingWork, command.limit, command.deadline)) } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt index ea8eeb37..bfaa60bc 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt @@ -42,7 +42,7 @@ public interface SimMachine : AutoCloseable { /** * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. */ - public suspend fun run(workload: SimWorkload) + public suspend fun run(workload: SimWorkload, meta: Map<String, Any> = emptyMap()) /** * Terminate this machine. diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt index 66d3eda7..778b68ca 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt @@ -117,33 +117,33 @@ public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listen private var cpus: List<VCpu> = emptyList() /** - * The execution context in which the workload runs. - */ - val ctx = object : SimExecutionContext { - override val machine: SimMachineModel - get() = model - - override val clock: Clock - get() = this@SimSpaceSharedHypervisor.ctx.clock - - override fun interrupt(cpu: Int) { - require(cpu < cpus.size) { "Invalid CPU identifier" } - cpus[cpu].interrupt() - } - } - - /** * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. */ - override suspend fun run(workload: SimWorkload) { + override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) { require(!isTerminated) { "Machine is terminated" } require(cont == null) { "Run should not be called concurrently" } + val ctx = object : SimExecutionContext { + override val machine: SimMachineModel + get() = model + + override val clock: Clock + get() = this@SimSpaceSharedHypervisor.ctx.clock + + override val meta: Map<String, Any> + get() = meta + + override fun interrupt(cpu: Int) { + require(cpu < cpus.size) { "Invalid CPU identifier" } + cpus[cpu].interrupt() + } + } + workload.onStart(ctx) return suspendCancellableCoroutine { cont -> this.cont = cont - this.cpus = model.cpus.mapIndexed { index, model -> VCpu(this, model, workload, pCPUs[index]) } + this.cpus = model.cpus.mapIndexed { index, model -> VCpu(this, ctx, model, workload, pCPUs[index]) } for (cpu in cpus) { cpu.start() @@ -193,7 +193,7 @@ public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listen /** * A CPU of the virtual machine. */ - private inner class VCpu(val vm: SimVm, val model: ProcessingUnit, val workload: SimWorkload, val pCPU: Int) { + private inner class VCpu(val vm: SimVm, val ctx: SimExecutionContext, val model: ProcessingUnit, val workload: SimWorkload, val pCPU: Int) { /** * The processing speed of the vCPU. */ @@ -267,7 +267,7 @@ public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listen * Interrupt the CPU. */ fun interrupt() { - ctx.interrupt(pCPU) + this@SimSpaceSharedHypervisor.ctx.interrupt(pCPU) } /** diff --git a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt index 948595b1..10f29f4e 100644 --- a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt +++ b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt @@ -58,12 +58,22 @@ public fun <T> EventFlow(): EventFlow<T> = EventFlowImpl() @OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class) private class EventFlowImpl<T> : EventFlow<T> { private var closed: Boolean = false - private val subscribers = HashMap<SendChannel<T>, Unit>() + private val subscribers = mutableListOf<SendChannel<T>>() override fun emit(event: T) { + if (closed) { + return + } + + val it = subscribers.iterator() synchronized(this) { - for ((chan, _) in subscribers) { - chan.offer(event) + while (it.hasNext()) { + val chan = it.next() + if (chan.isClosedForSend) { + it.remove() + } else { + chan.offer(event) + } } } } @@ -72,9 +82,11 @@ private class EventFlowImpl<T> : EventFlow<T> { synchronized(this) { closed = true - for ((chan, _) in subscribers) { + for (chan in subscribers) { chan.close() } + + subscribers.clear() } } @@ -87,9 +99,13 @@ private class EventFlowImpl<T> : EventFlow<T> { } channel = Channel(Channel.UNLIMITED) - subscribers[channel] = Unit + subscribers.add(channel) + } + try { + channel.consumeAsFlow().collect(collector) + } finally { + channel.close() } - channel.consumeAsFlow().collect(collector) } override fun toString(): String = "EventFlow" diff --git a/simulator/opendc-workflows/build.gradle.kts b/simulator/opendc-workflows/build.gradle.kts index b4ffac7d..b6a2fc45 100644 --- a/simulator/opendc-workflows/build.gradle.kts +++ b/simulator/opendc-workflows/build.gradle.kts @@ -29,15 +29,16 @@ plugins { } dependencies { + api(platform(project(":opendc-platform"))) api(project(":opendc-core")) - api(project(":opendc-compute:opendc-compute-core")) + api(project(":opendc-compute:opendc-compute-api")) api(project(":opendc-trace:opendc-trace-core")) implementation(project(":opendc-utils")) - implementation("io.github.microutils:kotlin-logging:${versions.kotlinLogging}") + implementation("io.github.microutils:kotlin-logging") testImplementation(project(":opendc-simulator:opendc-simulator-core")) testImplementation(project(":opendc-compute:opendc-compute-simulator")) testImplementation(project(":opendc-format")) testImplementation("com.fasterxml.jackson.module:jackson-module-kotlin:${versions["jackson-module-kotlin"]}") - testRuntimeOnly("org.slf4j:slf4j-simple:${versions.slf4j}") + testRuntimeOnly("org.apache.logging.log4j:log4j-slf4j-impl") } diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt index e04c8a4c..6b348ed4 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt @@ -25,16 +25,10 @@ package org.opendc.workflows.service import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.map -import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import mu.KotlinLogging -import org.opendc.compute.core.Flavor -import org.opendc.compute.core.Server -import org.opendc.compute.core.ServerEvent -import org.opendc.compute.core.ServerState -import org.opendc.compute.core.virt.service.VirtProvisioningService +import org.opendc.compute.api.* import org.opendc.trace.core.EventTracer import org.opendc.trace.core.consumeAsFlow import org.opendc.trace.core.enable @@ -55,13 +49,13 @@ public class StageWorkflowService( internal val coroutineScope: CoroutineScope, internal val clock: Clock, internal val tracer: EventTracer, - private val provisioningService: VirtProvisioningService, + private val computeClient: ComputeClient, mode: WorkflowSchedulerMode, jobAdmissionPolicy: JobAdmissionPolicy, jobOrderPolicy: JobOrderPolicy, taskEligibilityPolicy: TaskEligibilityPolicy, taskOrderPolicy: TaskOrderPolicy -) : WorkflowService { +) : WorkflowService, ServerWatcher { /** * The logger instance to use. */ @@ -103,12 +97,6 @@ public class StageWorkflowService( internal val taskByServer = mutableMapOf<Server, TaskState>() /** - * The load of the system. - */ - internal val load: Double - get() = (activeTasks.size / provisioningService.hostCount.toDouble()) - - /** * The root listener of this scheduler. */ private val rootListener = object : StageWorkflowSchedulerListener { @@ -205,7 +193,7 @@ public class StageWorkflowService( /** * Indicate to the scheduler that a scheduling cycle is needed. */ - private suspend fun requestCycle() = mode.requestCycle() + private fun requestCycle() = mode.requestCycle() /** * Perform a scheduling cycle immediately. @@ -273,15 +261,13 @@ public class StageWorkflowService( val flavor = Flavor(cores, 1000) // TODO How to determine memory usage for workflow task val image = instance.task.image coroutineScope.launch { - val server = provisioningService.deploy(instance.task.name, image, flavor) + val server = computeClient.newServer(instance.task.name, image, flavor) instance.state = TaskStatus.ACTIVE instance.server = server taskByServer[server] = instance - server.events - .onEach { event -> if (event is ServerEvent.StateChanged) stateChanged(event.server) } - .launchIn(coroutineScope) + server.watch(this@StageWorkflowService) } activeTasks += instance @@ -290,8 +276,8 @@ public class StageWorkflowService( } } - private suspend fun stateChanged(server: Server) { - when (server.state) { + public override fun onStateChanged(server: Server, newState: ServerState) { + when (newState) { ServerState.ACTIVE -> { val task = taskByServer.getValue(server) task.startedAt = clock.millis() diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt index d1eb6704..ef9714c2 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt @@ -22,7 +22,7 @@ package org.opendc.workflows.service -import org.opendc.compute.core.Server +import org.opendc.compute.api.Server import org.opendc.workflows.workload.Task public class TaskState(public val job: JobState, public val task: Task) { diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt index d03adc61..cf8f92e0 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt @@ -24,7 +24,6 @@ package org.opendc.workflows.service import kotlinx.coroutines.delay import kotlinx.coroutines.launch -import kotlinx.coroutines.yield import org.opendc.workflows.service.stage.StagePolicy /** @@ -38,7 +37,7 @@ public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Lo /** * Request a new scheduling cycle to be performed. */ - public suspend fun requestCycle() + public fun requestCycle() } /** @@ -46,9 +45,8 @@ public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Lo */ public object Interactive : WorkflowSchedulerMode() { override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic { - override suspend fun requestCycle() { - yield() - scheduler.schedule() + override fun requestCycle() { + scheduler.coroutineScope.launch { scheduler.schedule() } } } @@ -62,7 +60,7 @@ public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Lo private var next: kotlinx.coroutines.Job? = null override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic { - override suspend fun requestCycle() { + override fun requestCycle() { if (next == null) { // In batch mode, we assume that the scheduler runs at a fixed slot every time // quantum (e.g t=0, t=60, t=120). We calculate here the delay until the next scheduling slot. @@ -88,7 +86,7 @@ public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Lo private var next: kotlinx.coroutines.Job? = null override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic { - override suspend fun requestCycle() { + override fun requestCycle() { if (next == null) { val delay = random.nextInt(200).toLong() diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/LoadJobAdmissionPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/LoadJobAdmissionPolicy.kt deleted file mode 100644 index 4f0c269a..00000000 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/LoadJobAdmissionPolicy.kt +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.workflows.service.stage.job - -import org.opendc.workflows.service.JobState -import org.opendc.workflows.service.StageWorkflowService - -/** - * A [JobAdmissionPolicy] that limits the amount of jobs based on the average system load. - * - * @property limit The maximum load before stopping admission. - */ -public data class LoadJobAdmissionPolicy(public val limit: Double) : JobAdmissionPolicy { - override fun invoke(scheduler: StageWorkflowService): JobAdmissionPolicy.Logic = object : JobAdmissionPolicy.Logic { - override fun invoke( - job: JobState - ): JobAdmissionPolicy.Advice = - if (scheduler.load < limit) - JobAdmissionPolicy.Advice.ADMIT - else - JobAdmissionPolicy.Advice.STOP - } - - override fun toString(): String = "Limit-Load($limit)" -} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt deleted file mode 100644 index a80a8c63..00000000 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.workflows.service.stage.task - -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.service.TaskState - -/** - * A [TaskEligibilityPolicy] that limits the number of active tasks in the system based on the average system load. - */ -public data class LoadTaskEligibilityPolicy(val limit: Double) : TaskEligibilityPolicy { - override fun invoke(scheduler: StageWorkflowService): TaskEligibilityPolicy.Logic = object : TaskEligibilityPolicy.Logic { - override fun invoke( - task: TaskState - ): TaskEligibilityPolicy.Advice = - if (scheduler.load < limit) - TaskEligibilityPolicy.Advice.ADMIT - else - TaskEligibilityPolicy.Advice.STOP - } - - override fun toString(): String = "Limit-Load($limit)" -} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Task.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Task.kt index 1834a4c8..4c6d2842 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Task.kt +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Task.kt @@ -24,7 +24,7 @@ package org.opendc.workflows.workload -import org.opendc.compute.core.image.Image +import org.opendc.compute.api.Image import org.opendc.core.Identity import java.util.* diff --git a/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt b/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt index 2bfcba35..4207cdfd 100644 --- a/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt +++ b/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt @@ -36,11 +36,12 @@ import org.junit.jupiter.api.Assertions.assertNotEquals import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll -import org.opendc.compute.core.metal.service.ProvisioningService -import org.opendc.compute.simulator.SimVirtProvisioningService -import org.opendc.compute.simulator.allocation.NumberOfActiveServersAllocationPolicy +import org.opendc.compute.service.ComputeService +import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy +import org.opendc.compute.simulator.SimHostProvisioner import org.opendc.format.environment.sc18.Sc18EnvironmentReader import org.opendc.format.trace.gwf.GwfTraceReader +import org.opendc.metal.service.ProvisioningService import org.opendc.simulator.compute.SimSpaceSharedHypervisorProvider import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.trace.core.EventTracer @@ -80,7 +81,11 @@ internal class StageWorkflowSchedulerIntegrationTest { // Wait for the bare metal nodes to be spawned delay(10) - val provisioner = SimVirtProvisioningService(testScope, clock, bareMetal, NumberOfActiveServersAllocationPolicy(), tracer, SimSpaceSharedHypervisorProvider(), schedulingQuantum = 1000) + val provisioner = SimHostProvisioner(testScope.coroutineContext, bareMetal, SimSpaceSharedHypervisorProvider()) + val hosts = provisioner.provisionAll() + val compute = ComputeService(testScope.coroutineContext, clock, tracer, NumberOfActiveServersAllocationPolicy(), schedulingQuantum = 1000) + + hosts.forEach { compute.addHost(it) } // Wait for the hypervisors to be spawned delay(10) @@ -89,7 +94,7 @@ internal class StageWorkflowSchedulerIntegrationTest { testScope, clock, tracer, - provisioner, + compute.newClient(), mode = WorkflowSchedulerMode.Batch(100), jobAdmissionPolicy = NullJobAdmissionPolicy, jobOrderPolicy = SubmissionTimeJobOrderPolicy(), diff --git a/simulator/opendc-workflows/src/test/resources/log4j2.xml b/simulator/opendc-workflows/src/test/resources/log4j2.xml new file mode 100644 index 00000000..70a0eacc --- /dev/null +++ b/simulator/opendc-workflows/src/test/resources/log4j2.xml @@ -0,0 +1,35 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Copyright (c) 2021 AtLarge Research + ~ + ~ Permission is hereby granted, free of charge, to any person obtaining a copy + ~ of this software and associated documentation files (the "Software"), to deal + ~ in the Software without restriction, including without limitation the rights + ~ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + ~ copies of the Software, and to permit persons to whom the Software is + ~ furnished to do so, subject to the following conditions: + ~ + ~ The above copyright notice and this permission notice shall be included in all + ~ copies or substantial portions of the Software. + ~ + ~ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + ~ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + ~ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + ~ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + ~ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + ~ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + ~ SOFTWARE. + --> + +<Configuration status="WARN" packages="org.apache.logging.log4j.core"> + <Appenders> + <Console name="Console" target="SYSTEM_OUT"> + <PatternLayout pattern="%d{HH:mm:ss.SSS} [%highlight{%-5level}] %logger{36} - %msg%n" disableAnsi="false"/> + </Console> + </Appenders> + <Loggers> + <Root level="warn"> + <AppenderRef ref="Console"/> + </Root> + </Loggers> +</Configuration> diff --git a/simulator/settings.gradle.kts b/simulator/settings.gradle.kts index 77d78318..7a82adcd 100644 --- a/simulator/settings.gradle.kts +++ b/simulator/settings.gradle.kts @@ -23,8 +23,10 @@ rootProject.name = "opendc-simulator" include(":opendc-platform") include(":opendc-core") -include(":opendc-compute:opendc-compute-core") +include(":opendc-compute:opendc-compute-api") +include(":opendc-compute:opendc-compute-service") include(":opendc-compute:opendc-compute-simulator") +include(":opendc-metal") include(":opendc-workflows") include(":opendc-format") include(":opendc-experiments:opendc-experiments-sc18") |
