diff options
41 files changed, 567 insertions, 366 deletions
diff --git a/simulator/opendc-compute/opendc-compute-core/build.gradle.kts b/simulator/opendc-compute/opendc-compute-core/build.gradle.kts index 3f56f410..9aa444e3 100644 --- a/simulator/opendc-compute/opendc-compute-core/build.gradle.kts +++ b/simulator/opendc-compute/opendc-compute-core/build.gradle.kts @@ -31,6 +31,7 @@ dependencies { api(platform(project(":opendc-platform"))) api(project(":opendc-core")) api(project(":opendc-compute:opendc-compute-api")) + api(project(":opendc-compute:opendc-compute-service")) api(project(":opendc-trace:opendc-trace-core")) implementation(project(":opendc-utils")) diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/InsufficientMemoryOnServerException.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/InsufficientMemoryOnServerException.kt deleted file mode 100644 index 0f7b5826..00000000 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/InsufficientMemoryOnServerException.kt +++ /dev/null @@ -1,3 +0,0 @@ -package org.opendc.compute.core.virt - -public class InsufficientMemoryOnServerException : IllegalStateException("Insufficient memory left on server.") diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/VirtProvisioningService.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/VirtProvisioningService.kt deleted file mode 100644 index a83b68dc..00000000 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/VirtProvisioningService.kt +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.core.virt.service - -import kotlinx.coroutines.flow.Flow -import org.opendc.compute.api.ComputeClient -import org.opendc.compute.core.virt.Host - -/** - * A service for VM provisioning on a cloud. - */ -public interface VirtProvisioningService { - /** - * The events emitted by the service. - */ - public val events: Flow<VirtProvisioningEvent> - - /** - * Obtain the active hypervisors for this provisioner. - */ - public suspend fun drivers(): Set<Host> - - /** - * The number of hosts available in the system. - */ - public val hostCount: Int - - /** - * Create a new [ComputeClient] to control the compute service. - */ - public fun newClient(): ComputeClient - - /** - * Terminate the provisioning service releasing all the leased bare-metal machines. - */ - public suspend fun terminate() -} diff --git a/simulator/opendc-compute/opendc-compute-service/build.gradle.kts b/simulator/opendc-compute/opendc-compute-service/build.gradle.kts new file mode 100644 index 00000000..1b09ef6d --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-service/build.gradle.kts @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "OpenDC Compute Service implementation" + +/* Build configuration */ +plugins { + `kotlin-library-conventions` + `testing-conventions` +} + +dependencies { + api(platform(project(":opendc-platform"))) + api(project(":opendc-compute:opendc-compute-api")) + api(project(":opendc-trace:opendc-trace-core")) + implementation(project(":opendc-utils")) + implementation("io.github.microutils:kotlin-logging") + + testImplementation(project(":opendc-simulator:opendc-simulator-core")) + testRuntimeOnly("org.slf4j:slf4j-simple:${versions.slf4j}") +} diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt new file mode 100644 index 00000000..593e4b56 --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service + +import kotlinx.coroutines.flow.Flow +import org.opendc.compute.api.ComputeClient +import org.opendc.compute.service.driver.Host +import org.opendc.compute.service.internal.ComputeServiceImpl +import org.opendc.compute.service.scheduler.AllocationPolicy +import org.opendc.trace.core.EventTracer +import java.time.Clock +import kotlin.coroutines.CoroutineContext + +/** + * The [ComputeService] hosts the API implementation of the OpenDC Compute service. + */ +public interface ComputeService : AutoCloseable { + /** + * The events emitted by the service. + */ + public val events: Flow<ComputeServiceEvent> + + /** + * The hosts that are used by the compute service. + */ + public val hosts: Set<Host> + + /** + * The number of hosts available in the system. + */ + public val hostCount: Int + + /** + * Create a new [ComputeClient] to control the compute service. + */ + public fun newClient(): ComputeClient + + /** + * Add a [host] to the scheduling pool of the compute service. + */ + public fun addHost(host: Host) + + /** + * Remove a [host] from the scheduling pool of the compute service. + */ + public fun removeHost(host: Host) + + /** + * Terminate the lifecycle of the compute service, stopping all running instances. + */ + public override fun close() + + public companion object { + /** + * Construct a new [ComputeService] implementation. + * + * @param context The [CoroutineContext] to use in the service. + * @param clock The clock instance to use. + * @param tracer The event tracer to use. + * @param allocationPolicy The allocation policy to use. + */ + public operator fun invoke( + context: CoroutineContext, + clock: Clock, + tracer: EventTracer, + allocationPolicy: AllocationPolicy, + schedulingQuantum: Long = 300000, + ): ComputeService { + return ComputeServiceImpl(context, clock, tracer, allocationPolicy, schedulingQuantum) + } + } +} diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/VirtProvisioningEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeServiceEvent.kt index abd2fc95..193008a7 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/VirtProvisioningEvent.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeServiceEvent.kt @@ -1,7 +1,5 @@ /* - * MIT License - * - * Copyright (c) 2020 atlarge-research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,22 +20,22 @@ * SOFTWARE. */ -package org.opendc.compute.core.virt.service +package org.opendc.compute.service /** - * An event that is emitted by the [VirtProvisioningService]. + * An event that is emitted by the [ComputeService]. */ -public sealed class VirtProvisioningEvent { +public sealed class ComputeServiceEvent { /** * The service that has emitted the event. */ - public abstract val provisioner: VirtProvisioningService + public abstract val provisioner: ComputeService /** * An event emitted for writing metrics. */ public data class MetricsAvailable( - override val provisioner: VirtProvisioningService, + override val provisioner: ComputeService, public val totalHostCount: Int, public val availableHostCount: Int, public val totalVmCount: Int, @@ -45,5 +43,5 @@ public sealed class VirtProvisioningEvent { public val inactiveVmCount: Int, public val waitingVmCount: Int, public val failedVmCount: Int - ) : VirtProvisioningEvent() + ) : ComputeServiceEvent() } diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/Host.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt index 9a96ff6f..2cd91144 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/Host.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.compute.core.virt +package org.opendc.compute.service.driver import kotlinx.coroutines.flow.Flow import org.opendc.compute.api.Server @@ -36,6 +36,16 @@ public interface Host { public val uid: UUID /** + * The name of this host. + */ + public val name: String + + /** + * The machine model of the host. + */ + public val model: HostModel + + /** * The state of the host. */ public val state: HostState diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HostEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostEvent.kt index a07523e8..97350679 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HostEvent.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostEvent.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,9 +20,7 @@ * SOFTWARE. */ -package org.opendc.compute.core.virt - -import org.opendc.compute.core.metal.Node +package org.opendc.compute.service.driver /** * An event that is emitted by a [Host]. @@ -70,6 +68,5 @@ public sealed class HostEvent { public val cpuUsage: Double, public val cpuDemand: Double, public val numberOfDeployedImages: Int, - public val host: Node ) : HostEvent() } diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HostListener.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostListener.kt index 4ebb9066..f076cae3 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HostListener.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostListener.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.compute.core.virt +package org.opendc.compute.service.driver import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState @@ -32,10 +32,10 @@ public interface HostListener { /** * This method is invoked when the state of an [instance][server] on [host] changes. */ - public fun onStateChange(host: Host, server: Server, newState: ServerState) {} + public fun onStateChanged(host: Host, server: Server, newState: ServerState) {} /** * This method is invoked when the state of a [Host] has changed. */ - public fun onStateChange(host: Host, newState: HostState) {} + public fun onStateChanged(host: Host, newState: HostState) {} } diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt new file mode 100644 index 00000000..5632a55e --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service.driver + +/** + * Describes the static machine properties of the host. + * + * @property vcpuCount The number of logical processing cores available for this host. + * @property memorySize The amount of memory available for this host in MB. + */ +public data class HostModel(public val cpuCount: Int, public val memorySize: Long) diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HostState.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostState.kt index 7f87f5f7..6d85ee2d 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HostState.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostState.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.compute.core.virt +package org.opendc.compute.service.driver /** * The state of a host. diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/HypervisorAvailableEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/HypervisorAvailableEvent.kt index c1802e64..a7974062 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/HypervisorAvailableEvent.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/HypervisorAvailableEvent.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.compute.core.virt.service.events +package org.opendc.compute.service.events import org.opendc.trace.core.Event import java.util.* diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/HypervisorUnavailableEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/HypervisorUnavailableEvent.kt index 1fea21ea..75bb09ed 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/HypervisorUnavailableEvent.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/HypervisorUnavailableEvent.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.compute.core.virt.service.events +package org.opendc.compute.service.events import org.opendc.trace.core.Event import java.util.* diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmScheduledEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmScheduledEvent.kt index 662068dd..f59c74b7 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmScheduledEvent.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmScheduledEvent.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.compute.core.virt.service.events +package org.opendc.compute.service.events import org.opendc.trace.core.Event diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmStoppedEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmStoppedEvent.kt index 96103129..eaf0736b 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmStoppedEvent.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmStoppedEvent.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.compute.core.virt.service.events +package org.opendc.compute.service.events import org.opendc.trace.core.Event diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmSubmissionEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmSubmissionEvent.kt index e31c0864..fa0a8a13 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmSubmissionEvent.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmSubmissionEvent.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.compute.core.virt.service.events +package org.opendc.compute.service.events import org.opendc.compute.api.Flavor import org.opendc.compute.api.Image diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmSubmissionInvalidEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmSubmissionInvalidEvent.kt index d0e5c102..52b91616 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmSubmissionInvalidEvent.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmSubmissionInvalidEvent.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.compute.core.virt.service.events +package org.opendc.compute.service.events import org.opendc.trace.core.Event diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ClientServer.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt index 642587da..f84b7435 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ClientServer.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.compute.simulator +package org.opendc.compute.service.internal import org.opendc.compute.api.Flavor import org.opendc.compute.api.Image diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index bdea93e3..69d6bb59 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,63 +20,69 @@ * SOFTWARE. */ -package org.opendc.compute.simulator +package org.opendc.compute.service.internal -import kotlinx.coroutines.* +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.cancel import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.launch +import kotlinx.coroutines.suspendCancellableCoroutine import mu.KotlinLogging import org.opendc.compute.api.* -import org.opendc.compute.core.* -import org.opendc.compute.core.metal.Node -import org.opendc.compute.core.metal.NodeEvent -import org.opendc.compute.core.metal.NodeState -import org.opendc.compute.core.metal.service.ProvisioningService -import org.opendc.compute.core.virt.* -import org.opendc.compute.core.virt.service.VirtProvisioningEvent -import org.opendc.compute.core.virt.service.VirtProvisioningService -import org.opendc.compute.core.virt.service.events.* -import org.opendc.compute.simulator.allocation.AllocationPolicy -import org.opendc.simulator.compute.SimHypervisorProvider +import org.opendc.compute.service.ComputeService +import org.opendc.compute.service.ComputeServiceEvent +import org.opendc.compute.service.driver.Host +import org.opendc.compute.service.driver.HostListener +import org.opendc.compute.service.driver.HostState +import org.opendc.compute.service.events.* +import org.opendc.compute.service.scheduler.AllocationPolicy import org.opendc.trace.core.EventTracer import org.opendc.utils.TimerScheduler import org.opendc.utils.flow.EventFlow import java.time.Clock import java.util.* import kotlin.coroutines.Continuation +import kotlin.coroutines.CoroutineContext import kotlin.coroutines.resume import kotlin.math.max -@OptIn(ExperimentalCoroutinesApi::class) -public class SimVirtProvisioningService( - private val coroutineScope: CoroutineScope, +/** + * Internal implementation of the OpenDC Compute service. + * + * @param context The [CoroutineContext] to use. + * @param clock The clock instance to keep track of time. + */ +public class ComputeServiceImpl( + private val context: CoroutineContext, private val clock: Clock, - private val provisioningService: ProvisioningService, - public val allocationPolicy: AllocationPolicy, private val tracer: EventTracer, - private val hypervisor: SimHypervisorProvider, - private val schedulingQuantum: Long = 300000, // 5 minutes in milliseconds -) : VirtProvisioningService, HostListener { + private val allocationPolicy: AllocationPolicy, + private val schedulingQuantum: Long +) : ComputeService, HostListener { + /** + * The [CoroutineScope] of the service bounded by the lifecycle of the service. + */ + private val scope = CoroutineScope(context) + /** - * The logger instance to use. + * The logger instance of this server. */ private val logger = KotlinLogging.logger {} /** - * A mapping from host to hypervisor view. + * The [Random] instance used to generate unique identifiers for the objects. */ - private val hostToHv = mutableMapOf<Host, HypervisorView>() + private val random = Random(0) /** - * The hypervisors that have been launched by the service. + * A mapping from host to host view. */ - private val hypervisors: MutableMap<Node, HypervisorView> = mutableMapOf() + private val hostToView = mutableMapOf<Host, HostView>() /** * The available hypervisors. */ - private val availableHypervisors: MutableSet<HypervisorView> = mutableSetOf() + private val availableHosts: MutableSet<HostView> = mutableSetOf() /** * The servers that should be launched by the service. @@ -88,11 +94,6 @@ public class SimVirtProvisioningService( */ private val activeServers: MutableSet<Server> = mutableSetOf() - /** - * The [Random] instance used to generate unique identifiers for the objects. - */ - private val random = Random(0) - public var submittedVms: Int = 0 public var queuedVms: Int = 0 public var runningVms: Int = 0 @@ -107,39 +108,20 @@ public class SimVirtProvisioningService( */ private val allocationLogic = allocationPolicy() - override val events: Flow<VirtProvisioningEvent> + override val events: Flow<ComputeServiceEvent> get() = _events - private val _events = EventFlow<VirtProvisioningEvent>() + private val _events = EventFlow<ComputeServiceEvent>() /** * The [TimerScheduler] to use for scheduling the scheduler cycles. */ - private var scheduler: TimerScheduler<Unit> = TimerScheduler(coroutineScope, clock) - - init { - coroutineScope.launch { - val provisionedNodes = provisioningService.nodes() - provisionedNodes.forEach { node -> - val workload = SimHost(UUID(random.nextLong(), random.nextLong()), coroutineScope, hypervisor) - workload.addListener(this@SimVirtProvisioningService) - val hypervisorImage = Image(UUID.randomUUID(), "vmm", mapOf("workload" to workload)) - launch { - val deployedNode = provisioningService.deploy(node, hypervisorImage) - deployedNode.events.onEach { event -> - when (event) { - is NodeEvent.StateChanged -> stateChanged(event.node, workload) - } - }.launchIn(this) - } - } - } - } + private var scheduler: TimerScheduler<Unit> = TimerScheduler(scope, clock) - override suspend fun drivers(): Set<Host> { - return availableHypervisors.map { it.driver }.toSet() - } + override val hosts: Set<Host> + get() = hostToView.keys - override val hostCount: Int = hypervisors.size + override val hostCount: Int + get() = hostToView.size override fun newClient(): ComputeClient = object : ComputeClient { private var isClosed: Boolean = false @@ -149,10 +131,10 @@ public class SimVirtProvisioningService( tracer.commit(VmSubmissionEvent(name, image, flavor)) _events.emit( - VirtProvisioningEvent.MetricsAvailable( - this@SimVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, + ComputeServiceEvent.MetricsAvailable( + this@ComputeServiceImpl, + hostCount, + availableHosts.size, ++submittedVms, runningVms, finishedVms, @@ -175,9 +157,30 @@ public class SimVirtProvisioningService( override fun toString(): String = "ComputeClient" } - override suspend fun terminate() { - val provisionedNodes = provisioningService.nodes() - provisionedNodes.forEach { node -> provisioningService.stop(node) } + override fun addHost(host: Host) { + // Check if host is already known + if (host in hostToView) { + return + } + + val hv = HostView(host) + maxCores = max(maxCores, host.model.cpuCount) + maxMemory = max(maxMemory, host.model.memorySize) + hostToView[host] = hv + + if (host.state == HostState.UP) { + availableHosts += hv + } + + host.addListener(this) + } + + override fun removeHost(host: Host) { + host.removeListener(this) + } + + override fun close() { + scope.cancel() } private fun createServer( @@ -213,19 +216,19 @@ public class SimVirtProvisioningService( while (queue.isNotEmpty()) { val (server, cont) = queue.peekFirst() val requiredMemory = server.flavor.memorySize - val selectedHv = allocationLogic.select(availableHypervisors, server) + val selectedHv = allocationLogic.select(availableHosts, server) - if (selectedHv == null || !selectedHv.driver.canFit(server)) { + if (selectedHv == null || !selectedHv.host.canFit(server)) { logger.trace { "Server $server selected for scheduling but no capacity available for it." } if (requiredMemory > maxMemory || server.flavor.cpuCount > maxCores) { tracer.commit(VmSubmissionInvalidEvent(server.name)) _events.emit( - VirtProvisioningEvent.MetricsAvailable( - this@SimVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, + ComputeServiceEvent.MetricsAvailable( + this@ComputeServiceImpl, + hostCount, + availableHosts.size, submittedVms, runningVms, finishedVms, @@ -244,88 +247,63 @@ public class SimVirtProvisioningService( } } - try { - logger.info { "[${clock.millis()}] Spawning $server on ${selectedHv.node.uid} ${selectedHv.node.name} ${selectedHv.node.flavor}" } - queue.poll() - - // Speculatively update the hypervisor view information to prevent other images in the queue from - // deciding on stale values. - selectedHv.numberOfActiveServers++ - selectedHv.provisionedCores += server.flavor.cpuCount - selectedHv.availableMemory -= requiredMemory // XXX Temporary hack - - coroutineScope.launch { - try { - cont.resume(ClientServer(server)) - selectedHv.driver.spawn(server) - activeServers += server - - tracer.commit(VmScheduledEvent(server.name)) - _events.emit( - VirtProvisioningEvent.MetricsAvailable( - this@SimVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, - submittedVms, - ++runningVms, - finishedVms, - --queuedVms, - unscheduledVms - ) + logger.info { "[${clock.millis()}] Spawning $server on ${selectedHv.host.uid} ${selectedHv.host.name} ${selectedHv.host.model}" } + queue.poll() + + // Speculatively update the hypervisor view information to prevent other images in the queue from + // deciding on stale values. + selectedHv.numberOfActiveServers++ + selectedHv.provisionedCores += server.flavor.cpuCount + selectedHv.availableMemory -= requiredMemory // XXX Temporary hack + + scope.launch { + try { + cont.resume(ClientServer(server)) + selectedHv.host.spawn(server) + activeServers += server + + tracer.commit(VmScheduledEvent(server.name)) + _events.emit( + ComputeServiceEvent.MetricsAvailable( + this@ComputeServiceImpl, + hostCount, + availableHosts.size, + submittedVms, + ++runningVms, + finishedVms, + --queuedVms, + unscheduledVms ) - } catch (e: InsufficientMemoryOnServerException) { - logger.error("Failed to deploy VM", e) + ) + } catch (e: Throwable) { + logger.error("Failed to deploy VM", e) - selectedHv.numberOfActiveServers-- - selectedHv.provisionedCores -= server.flavor.cpuCount - selectedHv.availableMemory += requiredMemory - } + selectedHv.numberOfActiveServers-- + selectedHv.provisionedCores -= server.flavor.cpuCount + selectedHv.availableMemory += requiredMemory } - } catch (e: Throwable) { - logger.error("Failed to deploy VM", e) } } } - private fun stateChanged(node: Node, hypervisor: SimHost) { - when (node.state) { - NodeState.ACTIVE -> { - logger.debug { "[${clock.millis()}] Server ${node.uid} available: ${node.state}" } + override fun onStateChanged(host: Host, newState: HostState) { + when (newState) { + HostState.UP -> { + logger.debug { "[${clock.millis()}] Host ${host.uid} state changed: $newState" } - if (node in hypervisors) { + val hv = hostToView[host] + if (hv != null) { // Corner case for when the hypervisor already exists - availableHypervisors += hypervisors.getValue(node) - } else { - val hv = HypervisorView( - node.uid, - node, - 0, - node.flavor.memorySize, - 0 - ) - hv.driver = hypervisor - hv.driver.events - .onEach { event -> - if (event is HostEvent.VmsUpdated) { - hv.numberOfActiveServers = event.numberOfActiveServers - hv.availableMemory = event.availableMemory - } - }.launchIn(coroutineScope) - - maxCores = max(maxCores, node.flavor.cpuCount) - maxMemory = max(maxMemory, node.flavor.memorySize) - hypervisors[node] = hv - hostToHv[hypervisor] = hv - availableHypervisors += hv + availableHosts += hv } - tracer.commit(HypervisorAvailableEvent(node.uid)) + tracer.commit(HypervisorAvailableEvent(host.uid)) _events.emit( - VirtProvisioningEvent.MetricsAvailable( - this@SimVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, + ComputeServiceEvent.MetricsAvailable( + this@ComputeServiceImpl, + hostCount, + availableHosts.size, submittedVms, runningVms, finishedVms, @@ -339,18 +317,19 @@ public class SimVirtProvisioningService( requestCycle() } } - NodeState.SHUTOFF, NodeState.ERROR -> { - logger.debug { "[${clock.millis()}] Server ${node.uid} unavailable: ${node.state}" } - val hv = hypervisors[node] ?: return - availableHypervisors -= hv + HostState.DOWN -> { + logger.debug { "[${clock.millis()}] Host ${host.uid} state changed: $newState" } + + val hv = hostToView[host] ?: return + availableHosts -= hv tracer.commit(HypervisorUnavailableEvent(hv.uid)) _events.emit( - VirtProvisioningEvent.MetricsAvailable( - this@SimVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, + ComputeServiceEvent.MetricsAvailable( + this@ComputeServiceImpl, + hostCount, + availableHosts.size, submittedVms, runningVms, finishedVms, @@ -363,11 +342,10 @@ public class SimVirtProvisioningService( requestCycle() } } - else -> throw IllegalStateException() } } - override fun onStateChange(host: Host, server: Server, newState: ServerState) { + override fun onStateChanged(host: Host, server: Server, newState: ServerState) { val serverImpl = server as ServerImpl serverImpl.state = newState serverImpl.watchers.forEach { it.onStateChanged(server, newState) } @@ -378,10 +356,10 @@ public class SimVirtProvisioningService( tracer.commit(VmStoppedEvent(server.name)) _events.emit( - VirtProvisioningEvent.MetricsAvailable( - this@SimVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, + ComputeServiceEvent.MetricsAvailable( + this@ComputeServiceImpl, + hostCount, + availableHosts.size, submittedVms, --runningVms, ++finishedVms, @@ -391,9 +369,11 @@ public class SimVirtProvisioningService( ) activeServers -= server - val hv = hostToHv[host] + val hv = hostToView[host] if (hv != null) { hv.provisionedCores -= server.flavor.cpuCount + hv.numberOfActiveServers-- + hv.availableMemory += server.flavor.memorySize } else { logger.error { "Unknown host $host" } } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/HypervisorView.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt index a650144b..1bdfdf1a 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/HypervisorView.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt @@ -20,18 +20,16 @@ * SOFTWARE. */ -package org.opendc.compute.simulator +package org.opendc.compute.service.internal -import org.opendc.compute.core.metal.Node -import org.opendc.compute.core.virt.Host +import org.opendc.compute.service.driver.Host import java.util.UUID -public class HypervisorView( - public val uid: UUID, - public var node: Node, - public var numberOfActiveServers: Int, - public var availableMemory: Long, - public var provisionedCores: Int -) { - public lateinit var driver: Host +public class HostView(public val host: Host) { + public val uid: UUID + get() = host.uid + + public var numberOfActiveServers: Int = 0 + public var availableMemory: Long = host.model.memorySize + public var provisionedCores: Int = 0 } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/AllocationPolicy.kt index f01e4064..5ee4c70f 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AllocationPolicy.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/AllocationPolicy.kt @@ -20,11 +20,10 @@ * SOFTWARE. */ -package org.opendc.compute.simulator.allocation +package org.opendc.compute.service.scheduler import org.opendc.compute.api.Server -import org.opendc.compute.core.metal.Node -import org.opendc.compute.simulator.HypervisorView +import org.opendc.compute.service.internal.HostView /** * A policy for selecting the [Node] an image should be deployed to, @@ -38,9 +37,9 @@ public interface AllocationPolicy { * Select the node on which the server should be scheduled. */ public fun select( - hypervisors: Set<HypervisorView>, + hypervisors: Set<HostView>, server: Server - ): HypervisorView? + ): HostView? } /** diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AvailableCoreMemoryAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/AvailableCoreMemoryAllocationPolicy.kt index 5e044282..ad422415 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AvailableCoreMemoryAllocationPolicy.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/AvailableCoreMemoryAllocationPolicy.kt @@ -20,9 +20,9 @@ * SOFTWARE. */ -package org.opendc.compute.simulator.allocation +package org.opendc.compute.service.scheduler -import org.opendc.compute.simulator.HypervisorView +import org.opendc.compute.service.internal.HostView /** * An [AllocationPolicy] that selects the machine with the highest/lowest amount of memory per core. @@ -31,8 +31,8 @@ import org.opendc.compute.simulator.HypervisorView */ public class AvailableCoreMemoryAllocationPolicy(private val reversed: Boolean = false) : AllocationPolicy { override fun invoke(): AllocationPolicy.Logic = object : ComparableAllocationPolicyLogic { - override val comparator: Comparator<HypervisorView> = - compareBy<HypervisorView> { -it.availableMemory / it.node.flavor.cpuCount } + override val comparator: Comparator<HostView> = + compareBy<HostView> { -it.availableMemory / it.host.model.cpuCount } .run { if (reversed) reversed() else this } } } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AvailableMemoryAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/AvailableMemoryAllocationPolicy.kt index e87abd7b..6712b8a2 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AvailableMemoryAllocationPolicy.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/AvailableMemoryAllocationPolicy.kt @@ -20,9 +20,9 @@ * SOFTWARE. */ -package org.opendc.compute.simulator.allocation +package org.opendc.compute.service.scheduler -import org.opendc.compute.simulator.HypervisorView +import org.opendc.compute.service.internal.HostView /** * Allocation policy that selects the node with the most available memory. @@ -31,7 +31,7 @@ import org.opendc.compute.simulator.HypervisorView */ public class AvailableMemoryAllocationPolicy(public val reversed: Boolean = false) : AllocationPolicy { override fun invoke(): AllocationPolicy.Logic = object : ComparableAllocationPolicyLogic { - override val comparator: Comparator<HypervisorView> = compareBy<HypervisorView> { -it.availableMemory } + override val comparator: Comparator<HostView> = compareBy<HostView> { -it.availableMemory } .run { if (reversed) reversed() else this } } } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ComparableAllocationPolicyLogic.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ComparableAllocationPolicyLogic.kt index a1a1f984..265d514d 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ComparableAllocationPolicyLogic.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ComparableAllocationPolicyLogic.kt @@ -20,10 +20,10 @@ * SOFTWARE. */ -package org.opendc.compute.simulator.allocation +package org.opendc.compute.service.scheduler import org.opendc.compute.api.Server -import org.opendc.compute.simulator.HypervisorView +import org.opendc.compute.service.internal.HostView /** * The logic for an [AllocationPolicy] that uses a [Comparator] to select the appropriate node. @@ -32,18 +32,18 @@ public interface ComparableAllocationPolicyLogic : AllocationPolicy.Logic { /** * The comparator to use. */ - public val comparator: Comparator<HypervisorView> + public val comparator: Comparator<HostView> override fun select( - hypervisors: Set<HypervisorView>, + hypervisors: Set<HostView>, server: Server - ): HypervisorView? { + ): HostView? { return hypervisors.asSequence() .filter { hv -> val fitsMemory = hv.availableMemory >= (server.flavor.memorySize) - val fitsCpu = hv.node.flavor.cpuCount >= server.flavor.cpuCount + val fitsCpu = hv.host.model.cpuCount >= server.flavor.cpuCount fitsMemory && fitsCpu } - .minWithOrNull(comparator.thenBy { it.node.uid }) + .minWithOrNull(comparator.thenBy { it.host.uid }) } } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/NumberOfActiveServersAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/NumberOfActiveServersAllocationPolicy.kt index 5e2b895c..29eba782 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/NumberOfActiveServersAllocationPolicy.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/NumberOfActiveServersAllocationPolicy.kt @@ -20,9 +20,9 @@ * SOFTWARE. */ -package org.opendc.compute.simulator.allocation +package org.opendc.compute.service.scheduler -import org.opendc.compute.simulator.HypervisorView +import org.opendc.compute.service.internal.HostView /** * Allocation policy that selects the node with the least amount of active servers. @@ -31,7 +31,7 @@ import org.opendc.compute.simulator.HypervisorView */ public class NumberOfActiveServersAllocationPolicy(public val reversed: Boolean = false) : AllocationPolicy { override fun invoke(): AllocationPolicy.Logic = object : ComparableAllocationPolicyLogic { - override val comparator: Comparator<HypervisorView> = compareBy<HypervisorView> { it.numberOfActiveServers } + override val comparator: Comparator<HostView> = compareBy<HostView> { it.numberOfActiveServers } .run { if (reversed) reversed() else this } } } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ProvisionedCoresAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ProvisionedCoresAllocationPolicy.kt index 91441ecd..4c196953 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ProvisionedCoresAllocationPolicy.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ProvisionedCoresAllocationPolicy.kt @@ -20,9 +20,9 @@ * SOFTWARE. */ -package org.opendc.compute.simulator.allocation +package org.opendc.compute.service.scheduler -import org.opendc.compute.simulator.HypervisorView +import org.opendc.compute.service.internal.HostView /** * An [AllocationPolicy] that takes into account the number of vCPUs that have been provisioned on this machine @@ -33,8 +33,8 @@ import org.opendc.compute.simulator.HypervisorView */ public class ProvisionedCoresAllocationPolicy(private val reversed: Boolean = false) : AllocationPolicy { override fun invoke(): AllocationPolicy.Logic = object : ComparableAllocationPolicyLogic { - override val comparator: Comparator<HypervisorView> = - compareBy<HypervisorView> { it.provisionedCores / it.node.flavor.cpuCount } + override val comparator: Comparator<HostView> = + compareBy<HostView> { it.provisionedCores / it.host.model.cpuCount } .run { if (reversed) reversed() else this } } } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/RandomAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/RandomAllocationPolicy.kt index e6d9e1ce..3facb182 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/RandomAllocationPolicy.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/RandomAllocationPolicy.kt @@ -20,10 +20,10 @@ * SOFTWARE. */ -package org.opendc.compute.simulator.allocation +package org.opendc.compute.service.scheduler import org.opendc.compute.api.Server -import org.opendc.compute.simulator.HypervisorView +import org.opendc.compute.service.internal.HostView import kotlin.random.Random /** @@ -33,13 +33,13 @@ public class RandomAllocationPolicy(private val random: Random = Random(0)) : Al @OptIn(ExperimentalStdlibApi::class) override fun invoke(): AllocationPolicy.Logic = object : AllocationPolicy.Logic { override fun select( - hypervisors: Set<HypervisorView>, + hypervisors: Set<HostView>, server: Server - ): HypervisorView? { + ): HostView? { return hypervisors.asIterable() .filter { hv -> val fitsMemory = hv.availableMemory >= (server.image.tags["required-memory"] as Long) - val fitsCpu = hv.node.flavor.cpuCount >= server.flavor.cpuCount + val fitsCpu = hv.host.model.cpuCount >= server.flavor.cpuCount fitsMemory && fitsCpu } .randomOrNull(random) diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ReplayAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ReplayAllocationPolicy.kt index 9f8fa544..ed1dc662 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ReplayAllocationPolicy.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ReplayAllocationPolicy.kt @@ -24,7 +24,8 @@ package org.opendc.compute.simulator.allocation import mu.KotlinLogging import org.opendc.compute.api.Server -import org.opendc.compute.simulator.HypervisorView +import org.opendc.compute.service.internal.HostView +import org.opendc.compute.service.scheduler.AllocationPolicy private val logger = KotlinLogging.logger {} @@ -37,12 +38,12 @@ private val logger = KotlinLogging.logger {} public class ReplayAllocationPolicy(private val vmPlacements: Map<String, String>) : AllocationPolicy { override fun invoke(): AllocationPolicy.Logic = object : AllocationPolicy.Logic { override fun select( - hypervisors: Set<HypervisorView>, + hypervisors: Set<HostView>, server: Server - ): HypervisorView? { + ): HostView? { val clusterName = vmPlacements[server.name] ?: throw IllegalStateException("Could not find placement data in VM placement file for VM ${server.name}") - val machinesInCluster = hypervisors.filter { it.node.name.contains(clusterName) } + val machinesInCluster = hypervisors.filter { it.host.name.contains(clusterName) } if (machinesInCluster.isEmpty()) { logger.info { "Could not find any machines belonging to cluster $clusterName for image ${server.name}, assigning randomly." } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index 9808cf50..801f73dd 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -30,7 +30,7 @@ import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState import org.opendc.compute.core.* import org.opendc.compute.core.metal.Node -import org.opendc.compute.core.virt.* +import org.opendc.compute.service.driver.* import org.opendc.simulator.compute.* import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL import org.opendc.simulator.compute.interference.PerformanceInterferenceModel @@ -45,7 +45,7 @@ import kotlin.coroutines.resume * A [Host] that is simulates virtual machines on a physical machine using [SimHypervisor]. */ public class SimHost( - override val uid: UUID, + public val node: Node, private val coroutineScope: CoroutineScope, hypervisor: SimHypervisorProvider ) : Host, SimWorkload { @@ -96,8 +96,7 @@ public class SimHost( interferedWork, cpuUsage, cpuDemand, - guests.size, - node + guests.size ) ) } @@ -107,19 +106,22 @@ public class SimHost( /** * The virtual machines running on the hypervisor. */ - private val guests = HashMap<Server, SimGuest>() + private val guests = HashMap<Server, Guest>() - /** - * The node on which the hypervisor runs. - */ - public val node: Node - get() = ctx.meta["node"] as Node + override val uid: UUID + get() = node.uid + + override val name: String + get() = node.name + + override val model: HostModel + get() = HostModel(node.flavor.cpuCount, node.flavor.memorySize) override val state: HostState get() = _state private var _state: HostState = HostState.UP set(value) { - listeners.forEach { it.onStateChange(this, value) } + listeners.forEach { it.onStateChanged(this, value) } field = value } @@ -138,7 +140,7 @@ public class SimHost( } require(canFit(server)) { "Server does not fit" } - val guest = SimGuest(server, hypervisor.createMachine(server.flavor.toMachineModel())) + val guest = Guest(server, hypervisor.createMachine(server.flavor.toMachineModel())) guests[server] = guest if (start) { @@ -187,24 +189,24 @@ public class SimHost( return SimMachineModel(processingUnits, memoryUnits) } - private fun onGuestStart(vm: SimGuest) { + private fun onGuestStart(vm: Guest) { guests.forEach { _, guest -> if (guest.state == ServerState.ACTIVE) { vm.performanceInterferenceModel?.onStart(vm.server.image.name) } } - listeners.forEach { it.onStateChange(this, vm.server, vm.state) } + listeners.forEach { it.onStateChanged(this, vm.server, vm.state) } } - private fun onGuestStop(vm: SimGuest) { + private fun onGuestStop(vm: Guest) { guests.forEach { _, guest -> if (guest.state == ServerState.ACTIVE) { vm.performanceInterferenceModel?.onStop(vm.server.image.name) } } - listeners.forEach { it.onStateChange(this, vm.server, vm.state) } + listeners.forEach { it.onStateChanged(this, vm.server, vm.state) } _events.emit(HostEvent.VmsUpdated(this@SimHost, guests.count { it.value.state == ServerState.ACTIVE }, availableMemory)) } @@ -212,7 +214,7 @@ public class SimHost( /** * A virtual machine instance that the driver manages. */ - private inner class SimGuest(val server: Server, val machine: SimMachine) { + private inner class Guest(val server: Server, val machine: SimMachine) { val performanceInterferenceModel: PerformanceInterferenceModel? = server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? var state: ServerState = ServerState.SHUTOFF diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHostProvisioner.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHostProvisioner.kt new file mode 100644 index 00000000..1c51162d --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHostProvisioner.kt @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator + +import kotlinx.coroutines.* +import org.opendc.compute.api.Image +import org.opendc.compute.core.metal.Node +import org.opendc.compute.core.metal.service.ProvisioningService +import org.opendc.compute.service.driver.Host +import org.opendc.simulator.compute.SimHypervisorProvider +import kotlin.coroutines.CoroutineContext + +/** + * A helper class to provision [SimHost]s on top of bare-metal machines using the [ProvisioningService]. + * + * @param context The [CoroutineContext] to use. + * @param metal The [ProvisioningService] to use. + * @param hypervisor The type of hypervisor to use. + */ +public class SimHostProvisioner( + private val context: CoroutineContext, + private val metal: ProvisioningService, + private val hypervisor: SimHypervisorProvider +) : AutoCloseable { + /** + * The [CoroutineScope] of the service bounded by the lifecycle of the service. + */ + private val scope = CoroutineScope(context) + + /** + * Provision all machines with a host. + */ + public suspend fun provisionAll(): List<Host> = coroutineScope { + metal.nodes().map { node -> async { provision(node) } }.awaitAll() + } + + /** + * Provision the specified [Node]. + */ + public suspend fun provision(node: Node): Host = coroutineScope { + val host = SimHost(node, scope, hypervisor) + metal.deploy(node, Image(node.uid, node.name, mapOf("workload" to host))) + host + } + + override fun close() { + scope.cancel() + } +} diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index 7b654473..d3b6c4bc 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -24,6 +24,7 @@ package org.opendc.compute.simulator import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.emptyFlow import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch @@ -37,7 +38,9 @@ import org.opendc.compute.api.Image import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState import org.opendc.compute.api.ServerWatcher -import org.opendc.compute.core.virt.HostEvent +import org.opendc.compute.core.metal.Node +import org.opendc.compute.core.metal.NodeState +import org.opendc.compute.service.driver.HostEvent import org.opendc.simulator.compute.SimFairShareHypervisorProvider import org.opendc.simulator.compute.SimMachineModel import org.opendc.simulator.compute.model.MemoryUnit @@ -79,8 +82,13 @@ internal class SimHostTest { var grantedWork = 0L var overcommittedWork = 0L + val node = Node( + UUID.randomUUID(), "name", emptyMap(), NodeState.SHUTOFF, + Flavor(machineModel.cpus.size, machineModel.memory.map { it.size }.sum()), Image.EMPTY, emptyFlow() + ) + scope.launch { - val virtDriver = SimHost(UUID.randomUUID(), this, SimFairShareHypervisorProvider()) + val virtDriver = SimHost(node, this, SimFairShareHypervisorProvider()) val vmm = Image(UUID.randomUUID(), "vmm", mapOf("workload" to virtDriver)) val duration = 5 * 60L val vmImageA = Image( diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index 2be3fe99..eb819b58 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -39,13 +39,15 @@ import org.opendc.compute.api.ServerWatcher import org.opendc.compute.core.metal.NODE_CLUSTER import org.opendc.compute.core.metal.NodeEvent import org.opendc.compute.core.metal.service.ProvisioningService -import org.opendc.compute.core.virt.HostEvent -import org.opendc.compute.core.virt.service.VirtProvisioningEvent import org.opendc.compute.core.workload.VmWorkload +import org.opendc.compute.service.ComputeService +import org.opendc.compute.service.ComputeServiceEvent +import org.opendc.compute.service.driver.HostEvent +import org.opendc.compute.service.internal.ComputeServiceImpl +import org.opendc.compute.service.scheduler.AllocationPolicy import org.opendc.compute.simulator.SimBareMetalDriver import org.opendc.compute.simulator.SimHost -import org.opendc.compute.simulator.SimVirtProvisioningService -import org.opendc.compute.simulator.allocation.AllocationPolicy +import org.opendc.compute.simulator.SimHostProvisioner import org.opendc.experiments.capelin.monitor.ExperimentMonitor import org.opendc.experiments.capelin.trace.Sc20StreamingParquetTraceReader import org.opendc.format.environment.EnvironmentReader @@ -137,6 +139,12 @@ public fun createTraceReader( ) } +public data class ProvisionerResult( + val metal: ProvisioningService, + val provisioner: SimHostProvisioner, + val compute: ComputeServiceImpl +) + /** * Construct the environment for a VM provisioner and return the provisioner instance. */ @@ -146,33 +154,40 @@ public suspend fun createProvisioner( environmentReader: EnvironmentReader, allocationPolicy: AllocationPolicy, eventTracer: EventTracer -): Pair<ProvisioningService, SimVirtProvisioningService> { +): ProvisionerResult { val environment = environmentReader.use { it.construct(coroutineScope, clock) } val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService] // Wait for the bare metal nodes to be spawned delay(10) - val scheduler = SimVirtProvisioningService(coroutineScope, clock, bareMetalProvisioner, allocationPolicy, eventTracer, SimFairShareHypervisorProvider()) + val provisioner = SimHostProvisioner(coroutineScope.coroutineContext, bareMetalProvisioner, SimFairShareHypervisorProvider()) + val hosts = provisioner.provisionAll() + + val scheduler = ComputeService(coroutineScope.coroutineContext, clock, eventTracer, allocationPolicy) as ComputeServiceImpl + + for (host in hosts) { + scheduler.addHost(host) + } // Wait for the hypervisors to be spawned delay(10) - return bareMetalProvisioner to scheduler + return ProvisionerResult(bareMetalProvisioner, provisioner, scheduler) } /** * Attach the specified monitor to the VM provisioner. */ @OptIn(ExperimentalCoroutinesApi::class) -public suspend fun attachMonitor( +public fun attachMonitor( coroutineScope: CoroutineScope, clock: Clock, - scheduler: SimVirtProvisioningService, + scheduler: ComputeService, monitor: ExperimentMonitor ) { - val hypervisors = scheduler.drivers() + val hypervisors = scheduler.hosts // Monitor hypervisor events for (hypervisor in hypervisors) { @@ -201,7 +216,7 @@ public suspend fun attachMonitor( event.cpuUsage, event.cpuDemand, event.numberOfDeployedImages, - event.host + (event.driver as SimHost).node ) } } @@ -216,7 +231,7 @@ public suspend fun attachMonitor( scheduler.events .onEach { event -> when (event) { - is VirtProvisioningEvent.MetricsAvailable -> + is ComputeServiceEvent.MetricsAvailable -> monitor.reportProvisionerMetrics(clock.millis(), event) } } @@ -230,7 +245,7 @@ public suspend fun processTrace( coroutineScope: CoroutineScope, clock: Clock, reader: TraceReader<VmWorkload>, - scheduler: SimVirtProvisioningService, + scheduler: ComputeService, chan: Channel<Unit>, monitor: ExperimentMonitor ) { @@ -265,7 +280,7 @@ public suspend fun processTrace( scheduler.events .takeWhile { when (it) { - is VirtProvisioningEvent.MetricsAvailable -> + is ComputeServiceEvent.MetricsAvailable -> it.inactiveVmCount + it.failedVmCount != submitted } } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 75b0d735..ff0a026d 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -28,6 +28,12 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.launch import kotlinx.coroutines.test.TestCoroutineScope import mu.KotlinLogging +import org.opendc.compute.service.scheduler.AllocationPolicy +import org.opendc.compute.service.scheduler.AvailableCoreMemoryAllocationPolicy +import org.opendc.compute.service.scheduler.AvailableMemoryAllocationPolicy +import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy +import org.opendc.compute.service.scheduler.ProvisionedCoresAllocationPolicy +import org.opendc.compute.service.scheduler.RandomAllocationPolicy import org.opendc.compute.simulator.allocation.* import org.opendc.experiments.capelin.experiment.attachMonitor import org.opendc.experiments.capelin.experiment.createFailureDomain @@ -151,7 +157,7 @@ public abstract class Portfolio(name: String) : Experiment(name) { ) testScope.launch { - val (bareMetalProvisioner, scheduler) = createProvisioner( + val (bareMetalProvisioner, provisioner, scheduler) = createProvisioner( this, clock, environment, @@ -190,7 +196,8 @@ public abstract class Portfolio(name: String) : Experiment(name) { logger.debug("FINISHED=${scheduler.finishedVms}") failureDomain?.cancel() - scheduler.terminate() + scheduler.close() + provisioner.close() } try { diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt index 9e4adcc5..6039289f 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt @@ -25,8 +25,8 @@ package org.opendc.experiments.capelin.monitor import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState import org.opendc.compute.core.metal.Node -import org.opendc.compute.core.virt.Host -import org.opendc.compute.core.virt.service.VirtProvisioningEvent +import org.opendc.compute.service.ComputeServiceEvent +import org.opendc.compute.service.driver.Host import java.io.Closeable /** @@ -73,5 +73,5 @@ public interface ExperimentMonitor : Closeable { /** * This method is invoked for a provisioner event. */ - public fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) {} + public fun reportProvisionerMetrics(time: Long, event: ComputeServiceEvent.MetricsAvailable) {} } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt index 0912c8ae..b879399c 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt @@ -26,8 +26,8 @@ import mu.KotlinLogging import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState import org.opendc.compute.core.metal.Node -import org.opendc.compute.core.virt.Host -import org.opendc.compute.core.virt.service.VirtProvisioningEvent +import org.opendc.compute.service.ComputeServiceEvent +import org.opendc.compute.service.driver.Host import org.opendc.experiments.capelin.telemetry.HostEvent import org.opendc.experiments.capelin.telemetry.ProvisionerEvent import org.opendc.experiments.capelin.telemetry.parquet.ParquetHostEventWriter @@ -176,7 +176,7 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: } } - override fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) { + override fun reportProvisionerMetrics(time: Long, event: ComputeServiceEvent.MetricsAvailable) { provisionerWriter.write( ProvisionerEvent( time, diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index fca523cd..73525ae2 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -34,8 +34,8 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll import org.opendc.compute.core.metal.Node import org.opendc.compute.core.workload.VmWorkload -import org.opendc.compute.simulator.SimVirtProvisioningService -import org.opendc.compute.simulator.allocation.AvailableCoreMemoryAllocationPolicy +import org.opendc.compute.service.internal.ComputeServiceImpl +import org.opendc.compute.service.scheduler.AvailableCoreMemoryAllocationPolicy import org.opendc.experiments.capelin.experiment.attachMonitor import org.opendc.experiments.capelin.experiment.createFailureDomain import org.opendc.experiments.capelin.experiment.createProvisioner @@ -97,7 +97,7 @@ class CapelinIntegrationTest { val allocationPolicy = AvailableCoreMemoryAllocationPolicy() val traceReader = createTestTraceReader() val environmentReader = createTestEnvironmentReader() - lateinit var scheduler: SimVirtProvisioningService + lateinit var scheduler: ComputeServiceImpl val tracer = EventTracer(clock) testScope.launch { @@ -108,8 +108,8 @@ class CapelinIntegrationTest { allocationPolicy, tracer ) - val bareMetalProvisioner = res.first - scheduler = res.second + val bareMetalProvisioner = res.metal + scheduler = res.compute val failureDomain = if (failures) { println("ENABLING failures") @@ -138,8 +138,9 @@ class CapelinIntegrationTest { println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") failureDomain?.cancel() - scheduler.terminate() + scheduler.close() monitor.close() + res.provisioner.close() } runSimulation() @@ -148,9 +149,9 @@ class CapelinIntegrationTest { assertAll( { assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") }, { assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") }, - { assertEquals(1679510908774, monitor.totalRequestedBurst) }, - { assertEquals(384100282091, monitor.totalGrantedBurst) }, - { assertEquals(1282152242721, monitor.totalOvercommissionedBurst) }, + { assertEquals(1678587333640, monitor.totalRequestedBurst) }, + { assertEquals(438118200924, monitor.totalGrantedBurst) }, + { assertEquals(1220323969993, monitor.totalOvercommissionedBurst) }, { assertEquals(0, monitor.totalInterferedBurst) } ) } @@ -162,19 +163,16 @@ class CapelinIntegrationTest { val allocationPolicy = AvailableCoreMemoryAllocationPolicy() val traceReader = createTestTraceReader(0.5, seed) val environmentReader = createTestEnvironmentReader("single") - lateinit var scheduler: SimVirtProvisioningService val tracer = EventTracer(clock) testScope.launch { - val res = createProvisioner( + val (_, provisioner, scheduler) = createProvisioner( this, clock, environmentReader, allocationPolicy, tracer ) - scheduler = res.second - attachMonitor(this, clock, scheduler, monitor) processTrace( this, @@ -187,17 +185,18 @@ class CapelinIntegrationTest { println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") - scheduler.terminate() + scheduler.close() monitor.close() + provisioner.close() } runSimulation() // Note that these values have been verified beforehand assertAll( - { assertEquals(710487768664, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, - { assertEquals(118846235815, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, - { assertEquals(584211294239, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, + { assertEquals(705128393965, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, + { assertEquals(173489747029, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, + { assertEquals(526858997740, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, { assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } } ) } diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt index 6e888a3e..66fdfb41 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt +++ b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt @@ -25,8 +25,9 @@ package org.opendc.experiments.sc18 import kotlinx.coroutines.* import kotlinx.coroutines.test.TestCoroutineScope import org.opendc.compute.core.metal.service.ProvisioningService -import org.opendc.compute.simulator.SimVirtProvisioningService -import org.opendc.compute.simulator.allocation.NumberOfActiveServersAllocationPolicy +import org.opendc.compute.service.ComputeService +import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy +import org.opendc.compute.simulator.SimHostProvisioner import org.opendc.format.environment.sc18.Sc18EnvironmentReader import org.opendc.format.trace.gwf.GwfTraceReader import org.opendc.harness.dsl.Experiment @@ -91,16 +92,17 @@ public class UnderspecificationExperiment : Experiment("underspecification") { // Wait for the bare metal nodes to be spawned delay(10) - val provisioner = SimVirtProvisioningService( - testScope, + val provisioner = SimHostProvisioner(testScope.coroutineContext, bareMetal, SimSpaceSharedHypervisorProvider()) + val hosts = provisioner.provisionAll() + val compute = ComputeService( + testScope.coroutineContext, clock, - bareMetal, - NumberOfActiveServersAllocationPolicy(), tracer, - SimSpaceSharedHypervisorProvider(), - schedulingQuantum = 1000 + NumberOfActiveServersAllocationPolicy(), ) + hosts.forEach { compute.addHost(it) } + // Wait for the hypervisors to be spawned delay(10) @@ -108,7 +110,7 @@ public class UnderspecificationExperiment : Experiment("underspecification") { testScope, clock, tracer, - provisioner.newClient(), + compute.newClient(), mode = WorkflowSchedulerMode.Batch(100), jobAdmissionPolicy = NullJobAdmissionPolicy, jobOrderPolicy = SubmissionTimeJobOrderPolicy(), diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt index 533bf321..482fe754 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt @@ -40,6 +40,11 @@ import kotlinx.coroutines.test.TestCoroutineScope import mu.KotlinLogging import org.bson.Document import org.bson.types.ObjectId +import org.opendc.compute.service.scheduler.AvailableCoreMemoryAllocationPolicy +import org.opendc.compute.service.scheduler.AvailableMemoryAllocationPolicy +import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy +import org.opendc.compute.service.scheduler.ProvisionedCoresAllocationPolicy +import org.opendc.compute.service.scheduler.RandomAllocationPolicy import org.opendc.compute.simulator.allocation.* import org.opendc.experiments.capelin.experiment.attachMonitor import org.opendc.experiments.capelin.experiment.createFailureDomain @@ -242,7 +247,7 @@ public class RunnerCli : CliktCommand(name = "runner") { val tracer = EventTracer(clock) testScope.launch { - val (bareMetalProvisioner, scheduler) = createProvisioner( + val (bareMetalProvisioner, provisioner, scheduler) = createProvisioner( this, clock, environment, @@ -281,7 +286,8 @@ public class RunnerCli : CliktCommand(name = "runner") { logger.debug("FINISHED=${scheduler.finishedVms}") failureDomain?.cancel() - scheduler.terminate() + scheduler.close() + provisioner.close() } try { diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt index f0c6297f..6209cff2 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt @@ -27,8 +27,8 @@ import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState import org.opendc.compute.core.metal.Node import org.opendc.compute.core.metal.NodeState -import org.opendc.compute.core.virt.Host -import org.opendc.compute.core.virt.service.VirtProvisioningEvent +import org.opendc.compute.service.ComputeServiceEvent +import org.opendc.compute.service.driver.Host import org.opendc.experiments.capelin.monitor.ExperimentMonitor import org.opendc.experiments.capelin.telemetry.HostEvent import kotlin.math.max @@ -210,7 +210,7 @@ public class WebExperimentMonitor : ExperimentMonitor { private var provisionerMetrics: AggregateProvisionerMetrics = AggregateProvisionerMetrics() - override fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) { + override fun reportProvisionerMetrics(time: Long, event: ComputeServiceEvent.MetricsAvailable) { provisionerMetrics = AggregateProvisionerMetrics( max(event.totalVmCount, provisionerMetrics.vmTotalCount), max(event.waitingVmCount, provisionerMetrics.vmWaitingCount), diff --git a/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt b/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt index 14289c67..f71cd7d7 100644 --- a/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt +++ b/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt @@ -37,8 +37,9 @@ import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll import org.opendc.compute.core.metal.service.ProvisioningService -import org.opendc.compute.simulator.SimVirtProvisioningService -import org.opendc.compute.simulator.allocation.NumberOfActiveServersAllocationPolicy +import org.opendc.compute.service.ComputeService +import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy +import org.opendc.compute.simulator.SimHostProvisioner import org.opendc.format.environment.sc18.Sc18EnvironmentReader import org.opendc.format.trace.gwf.GwfTraceReader import org.opendc.simulator.compute.SimSpaceSharedHypervisorProvider @@ -80,7 +81,11 @@ internal class StageWorkflowSchedulerIntegrationTest { // Wait for the bare metal nodes to be spawned delay(10) - val provisioner = SimVirtProvisioningService(testScope, clock, bareMetal, NumberOfActiveServersAllocationPolicy(), tracer, SimSpaceSharedHypervisorProvider(), schedulingQuantum = 1000) + val provisioner = SimHostProvisioner(testScope.coroutineContext, bareMetal, SimSpaceSharedHypervisorProvider()) + val hosts = provisioner.provisionAll() + val compute = ComputeService(testScope.coroutineContext, clock, tracer, NumberOfActiveServersAllocationPolicy(), schedulingQuantum = 1000) + + hosts.forEach { compute.addHost(it) } // Wait for the hypervisors to be spawned delay(10) @@ -89,7 +94,7 @@ internal class StageWorkflowSchedulerIntegrationTest { testScope, clock, tracer, - provisioner.newClient(), + compute.newClient(), mode = WorkflowSchedulerMode.Batch(100), jobAdmissionPolicy = NullJobAdmissionPolicy, jobOrderPolicy = SubmissionTimeJobOrderPolicy(), diff --git a/simulator/settings.gradle.kts b/simulator/settings.gradle.kts index 6e6a9d29..d2bc92a0 100644 --- a/simulator/settings.gradle.kts +++ b/simulator/settings.gradle.kts @@ -24,6 +24,7 @@ rootProject.name = "opendc-simulator" include(":opendc-platform") include(":opendc-core") include(":opendc-compute:opendc-compute-api") +include(":opendc-compute:opendc-compute-service") include(":opendc-compute:opendc-compute-core") include(":opendc-compute:opendc-compute-simulator") include(":opendc-workflows") |
