diff options
16 files changed, 560 insertions, 327 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/RunRequest.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ProcessorContext.kt index cba8ca78..c081acd5 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/RunRequest.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ProcessorContext.kt @@ -22,8 +22,27 @@ * SOFTWARE. */ -package com.atlarge.opendc.compute.virt +package com.atlarge.opendc.compute.core.execution -import kotlin.coroutines.Continuation +import com.atlarge.opendc.compute.core.ProcessingUnit -data class RunRequest(val req: LongArray, val reqDuration: Long, val continuation: Continuation<LongArray>) +/** + * An interface for managing a single processing core (CPU) of a (virtual) machine. + */ +public interface ProcessorContext { + /** + * The information about the processing unit. + */ + public val info: ProcessingUnit + + /** + * Request the specified burst time from the processor and suspend execution until the processor finishes + * processing of the requested burst. + * + * @param burst The burst time to request from the processor. + * @param maxUsage The maximum usage in terms of MHz that the processing core may use while running the burst. + * @param deadline The instant at which this request needs to be fulfilled. + * @return The remaining burst time in case the method was cancelled or zero if the processor finished running. + */ + public suspend fun run(burst: Long, maxUsage: Double, deadline: Long): Long +} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/Protocol.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/Protocol.kt deleted file mode 100644 index fa40d4a3..00000000 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/Protocol.kt +++ /dev/null @@ -1,152 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.compute.core.execution - -import com.atlarge.odcsim.ReceivePort -import com.atlarge.odcsim.SendPort -import com.atlarge.odcsim.processContext -import com.atlarge.opendc.compute.core.Server -import kotlinx.coroutines.isActive -import kotlinx.coroutines.launch - -/** - * Request that are accepted by a [ServerContext] instance. - */ -public sealed class ServerRequest { - /** - * Request the context to be initialized. - */ - public object Initialize : ServerRequest() - - /** - * Request for each core the specified amount of cpu time to run from the server. - */ - public data class Run(public val req: LongArray, public val reqDuration: Long) : ServerRequest() - - /** - * Terminate the execution of the server. - */ - public data class Exit(public val cause: Throwable? = null) : ServerRequest() -} - -/** - * Messages sent in response to [ServerRequest] objects. - */ -public sealed class ServerResponse { - /** - * The server that sent this response. - */ - public abstract val server: Server - - /** - * The amount cpu time granted on this server. - */ - public abstract val rec: LongArray? - - /** - * Indicate that this request was processed successfully. - */ - public data class Ok( - public override val server: Server, - public override val rec: LongArray? = null - ) : ServerResponse() -} - -/** - * Serialize the specified [ServerManagementContext] instance in order to safely send this object across logical - * processes. - */ -public suspend fun ServerManagementContext.serialize(): ServerManagementContext { - val ctx = processContext - val input = ctx.open<ServerRequest>() - val output = ctx.open<ServerResponse>() - - ctx.launch { - val outlet = processContext.connect(output.send) - val inlet = processContext.listen(input.receive) - - while (isActive) { - when (val msg = inlet.receive()) { - is ServerRequest.Initialize -> { - init() - outlet.send(ServerResponse.Ok(server)) - } - is ServerRequest.Run -> { - val rec = run(msg.req, msg.reqDuration) - outlet.send(ServerResponse.Ok(server, rec)) - } - is ServerRequest.Exit -> { - exit(msg.cause) - outlet.send(ServerResponse.Ok(server)) - } - } - } - } - - return object : ServerManagementContext { - private lateinit var inlet: ReceivePort<ServerResponse> - private lateinit var outlet: SendPort<ServerRequest> - - override var server: Server = this@serialize.server - - override suspend fun run(req: LongArray, reqDuration: Long): LongArray { - outlet.send(ServerRequest.Run(req, reqDuration)) - - when (val res = inlet.receive()) { - is ServerResponse.Ok -> { - server = res.server - return res.rec ?: error("Received should be defined in this type of request.") - } - } - } - - override suspend fun exit(cause: Throwable?) { - outlet.send(ServerRequest.Exit(cause)) - - when (val res = inlet.receive()) { - is ServerResponse.Ok -> { - server = res.server - } - } - } - - override suspend fun init() { - if (!this::outlet.isInitialized) { - outlet = processContext.connect(input.send) - } - - if (!this::inlet.isInitialized) { - inlet = processContext.listen(output.receive) - } - - outlet.send(ServerRequest.Initialize) - when (val res = inlet.receive()) { - is ServerResponse.Ok -> { - server = res.server - } - } - } - } -} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt index 539a991b..53b00aa6 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt @@ -38,14 +38,9 @@ public interface ServerContext { public val server: Server /** - * Request for each core the specified amount of cpu time to run from the server and wait until all the threads have - * finished processing. If none of the cores are non-zero, the method will return immediately. - * - * @param req An array specifying for each core the amount of cpu time to request. - * @param reqDuration A [Long] specifying the duration in which this request needs to be fulfilled. - * @return An array specifying for each core the amount of cpu time it actually received. + * A list of available processor context instances to use. */ - public suspend fun run(req: LongArray, reqDuration: Long): LongArray + public val cpus: List<ProcessorContext> /** * Publishes the given [service] with key [serviceKey] in the server's registry. diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt index 4af7c706..f09adc84 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt @@ -26,6 +26,8 @@ package com.atlarge.opendc.compute.core.image import com.atlarge.opendc.compute.core.execution.ServerContext import com.atlarge.opendc.core.resource.TagContainer +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.launch import java.util.UUID import kotlin.math.min @@ -50,6 +52,8 @@ class FlopsApplicationImage( ) : Image { init { require(flops >= 0) { "Negative number of flops" } + require(cores > 0) { "Negative number of cores or no cores" } + require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" } } /** @@ -57,7 +61,12 @@ class FlopsApplicationImage( */ override suspend fun invoke(ctx: ServerContext) { val cores = min(this.cores, ctx.server.flavor.cpus.sumBy { it.cores }) - val req = (flops * (1 / utilization) / cores).toLong() - ctx.run(LongArray(cores) { req }, req) + val req = flops / cores + + coroutineScope { + for (cpu in ctx.cpus.take(cores)) { + launch { cpu.run(req, cpu.info.clockRate * utilization, Long.MAX_VALUE) } + } + } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt index be24aa00..b7eacd88 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt @@ -2,13 +2,12 @@ package com.atlarge.opendc.compute.core.image import com.atlarge.opendc.compute.core.execution.ServerContext import com.atlarge.opendc.core.resource.TagContainer +import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay +import kotlinx.coroutines.launch import java.util.UUID -import kotlin.math.ceil import kotlin.math.min -public val VM_SCHEDULING_SLICE_DURATION = 5 * 60 * 1000L - class VmImage( public override val uid: UUID, public override val name: String, @@ -16,16 +15,20 @@ class VmImage( public val flopsHistory: List<FlopsHistoryFragment>, public val cores: Int ) : Image { + override suspend fun invoke(ctx: ServerContext) { flopsHistory.forEach { fragment -> if (fragment.flops == 0L) { delay(fragment.duration) } else { - for (time in fragment.tick until fragment.tick + fragment.duration step VM_SCHEDULING_SLICE_DURATION) { - val cores = min(this.cores, ctx.server.flavor.cpus.sumBy { it.cores }) - val req = - (fragment.flops / (ceil(fragment.duration.toDouble() / VM_SCHEDULING_SLICE_DURATION)) / cores).toLong() - ctx.run(LongArray(cores) { req }, VM_SCHEDULING_SLICE_DURATION) + val cores = min(this.cores, ctx.server.flavor.cpus.sumBy { it.cores }) + val req = fragment.flops / cores + + coroutineScope { + for (cpu in ctx.cpus.take(cores)) { + val usage = req / (fragment.duration * 1_000_000L).toDouble() + launch { cpu.run(req, usage, fragment.tick + fragment.duration) } + } } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt index 827c1d38..29573007 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt @@ -24,19 +24,25 @@ package com.atlarge.opendc.compute.metal.driver +import com.atlarge.odcsim.ProcessContext import com.atlarge.odcsim.processContext +import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerFlavor import com.atlarge.opendc.compute.core.ServerState +import com.atlarge.opendc.compute.core.execution.ProcessorContext import com.atlarge.opendc.compute.core.execution.ServerManagementContext import com.atlarge.opendc.compute.core.image.EmptyImage import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.metal.Node import com.atlarge.opendc.compute.metal.PowerState +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.delay import java.util.UUID +import kotlin.math.ceil import kotlin.math.max +import kotlin.math.min /** * A basic implementation of the [BareMetalDriver] that simulates an [Image] running on a bare-metal machine. @@ -114,8 +120,33 @@ public class SimpleBareMetalDriver( } } + private data class ProcessorContextImpl(override val info: ProcessingUnit) : ProcessorContext { + override suspend fun run(burst: Long, maxUsage: Double, deadline: Long): Long { + val start = processContext.clock.millis() + val usage = min(maxUsage, info.clockRate) + + try { + val duration = min(max(0, deadline - start), ceil(burst / usage).toLong()) + delay(duration) + } catch (_: CancellationException) { + // On cancellation, we compute and return the remaining burst + } + val end = processContext.clock.millis() + val granted = ceil((end - start) * usage * 1_000_000).toLong() + return max(0, burst - granted) + } + } + private val serverCtx = object : ServerManagementContext { private var initialized: Boolean = false + private lateinit var ctx: ProcessContext + + override val cpus: List<ProcessorContextImpl> = flavor.cpus + .asSequence() + .flatMap { cpu -> + generateSequence { ProcessorContextImpl(cpu) }.take(cpu.cores) + } + .toList() override var server: Server get() = node.server!! @@ -131,7 +162,7 @@ public class SimpleBareMetalDriver( val previousState = server.state server = server.copy(state = ServerState.ACTIVE) monitor.onUpdate(server, previousState) - + ctx = processContext initialized = true } @@ -142,12 +173,5 @@ public class SimpleBareMetalDriver( monitor.onUpdate(server, previousState) initialized = false } - - override suspend fun run(req: LongArray, reqDuration: Long): LongArray { - // TODO Properly implement this for multiple CPUs - val time = max(0, req.max() ?: 0) / (flavor.cpus[0].clockRate * 1000) - delay(max(time.toLong(), reqDuration)) - return req - } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt deleted file mode 100644 index 863626ad..00000000 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt +++ /dev/null @@ -1,123 +0,0 @@ -package com.atlarge.opendc.compute.virt.driver - -import com.atlarge.odcsim.ProcessContext -import com.atlarge.odcsim.processContext -import com.atlarge.opendc.compute.core.Server -import com.atlarge.opendc.compute.core.ServerFlavor -import com.atlarge.opendc.compute.core.ServerState -import com.atlarge.opendc.compute.core.execution.ServerContext -import com.atlarge.opendc.compute.core.execution.ServerManagementContext -import com.atlarge.opendc.compute.core.image.Image -import com.atlarge.opendc.compute.core.image.VM_SCHEDULING_SLICE_DURATION -import com.atlarge.opendc.compute.core.monitor.ServerMonitor -import com.atlarge.opendc.compute.virt.RunRequest -import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor -import kotlinx.coroutines.async -import kotlinx.coroutines.awaitAll -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.isActive -import kotlinx.coroutines.launch -import kotlinx.coroutines.suspendCancellableCoroutine -import java.util.UUID -import kotlin.coroutines.resume -import kotlin.math.min - -class SimpleVirtDriver( - private val ctx: ProcessContext, - private val hostContext: ServerContext, - private val hypervisorMonitor: HypervisorMonitor -) : VirtDriver { - /** - * The contexts of all VMs running on this hypervisor. - */ - private val serverContexts: MutableSet<VmServerContext> = mutableSetOf() - - init { - ctx.launch { - while (isActive) { - val serverFlavor = hostContext.server.flavor - - val requests = serverContexts.map { ctx.async { it.channel.receive() } }.awaitAll() - require(requests.all { it.reqDuration == VM_SCHEDULING_SLICE_DURATION }) - - if (requests.isEmpty()) { - hostContext.run(LongArray(serverFlavor.cpus[0].cores) { 0 }, 5 * 60 * 1000) - } else { - val totalRequested = requests.map { it.req.sum() }.sum() - val capacity = (serverFlavor.cpus[0].cores * serverFlavor.cpus[0].clockRate * 1_000_000L).toLong() - - hypervisorMonitor.onSliceFinish( - processContext.clock.millis(), - totalRequested, - capacity, - serverContexts.size, - hostContext.server - ) - - val satisfiedCapacity = min(capacity, totalRequested) - requests.forEach { request -> - val individualAssignedCapacity = ( - satisfiedCapacity * (request.req.sum().toDouble() / totalRequested) / - request.req.size).toLong() - - request.continuation.resume( - hostContext.run( - LongArray(request.req.size) { individualAssignedCapacity }, - VM_SCHEDULING_SLICE_DURATION - ) - ) - } - } - } - } - } - - override suspend fun spawn(image: Image, monitor: ServerMonitor, flavor: ServerFlavor): Server { - val server = Server(UUID.randomUUID(), "<unnamed>", emptyMap(), flavor, image, ServerState.BUILD) - val context = VmServerContext(server, monitor, flavor, hostContext, Channel(Channel.CONFLATED)) - serverContexts.add(context) - context.init() - processContext.launch { image(context) } - return server - } - - override suspend fun getNumberOfSpawnedImages(): Int { - return serverContexts.size - } - - class VmServerContext( - override var server: Server, - val monitor: ServerMonitor, - val flavor: ServerFlavor, - val hostContext: ServerContext, - val channel: Channel<RunRequest> - ) : - ServerManagementContext { - private var initialized: Boolean = false - - override suspend fun init() { - if (initialized) { - throw IllegalStateException() - } - - val previousState = server.state - server = server.copy(state = ServerState.ACTIVE) - monitor.onUpdate(server, previousState) - initialized = true - } - - override suspend fun exit(cause: Throwable?) { - val previousState = server.state - val state = if (cause == null) ServerState.SHUTOFF else ServerState.ERROR - server = server.copy(state = state) - monitor.onUpdate(server, previousState) - initialized = false - } - - override suspend fun run(req: LongArray, reqDuration: Long): LongArray { - return suspendCancellableCoroutine { cont -> - channel.offer(RunRequest(req, reqDuration, cont)) - } - } - } -} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt index b66e04d2..fc7322db 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt @@ -22,12 +22,10 @@ * SOFTWARE. */ -package com.atlarge.opendc.compute.virt +package com.atlarge.opendc.compute.virt.driver.hypervisor -import com.atlarge.odcsim.processContext import com.atlarge.opendc.compute.core.execution.ServerContext import com.atlarge.opendc.compute.core.image.Image -import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver import com.atlarge.opendc.compute.virt.driver.VirtDriver import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor import com.atlarge.opendc.core.resource.TagContainer @@ -45,7 +43,7 @@ class HypervisorImage( override val tags: TagContainer = emptyMap() override suspend fun invoke(ctx: ServerContext) { - val driver = SimpleVirtDriver(processContext, ctx, hypervisorMonitor) + val driver = HypervisorVirtDriver(VmSchedulerImpl(ctx, hypervisorMonitor)) ctx.publishService(VirtDriver.Key, driver) diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt new file mode 100644 index 00000000..9ae71f14 --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt @@ -0,0 +1,99 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.compute.virt.driver.hypervisor + +import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.processContext +import com.atlarge.opendc.compute.core.Server +import com.atlarge.opendc.compute.core.ServerFlavor +import com.atlarge.opendc.compute.core.ServerState +import com.atlarge.opendc.compute.core.execution.ProcessorContext +import com.atlarge.opendc.compute.core.execution.ServerManagementContext +import com.atlarge.opendc.compute.core.image.Image +import com.atlarge.opendc.compute.core.monitor.ServerMonitor +import com.atlarge.opendc.compute.virt.driver.VirtDriver +import kotlinx.coroutines.Job +import kotlinx.coroutines.launch +import java.util.UUID + +/** + * A [VirtDriver] that is backed by a simple hypervisor implementation. + */ +class HypervisorVirtDriver(private val scheduler: VmScheduler) : VirtDriver { + /** + * A set for tracking the VM context objects. + */ + internal val vms: MutableSet<VmServerContext> = mutableSetOf() + + override suspend fun spawn(image: Image, monitor: ServerMonitor, flavor: ServerFlavor): Server { + val server = Server(UUID.randomUUID(), "<unnamed>", emptyMap(), flavor, image, ServerState.BUILD) + vms.add(VmServerContext(server, monitor, flavor, processContext)) + return server + } + + override suspend fun getNumberOfSpawnedImages(): Int { + return vms.size + } + + internal inner class VmServerContext( + override var server: Server, + val monitor: ServerMonitor, + flavor: ServerFlavor, + ctx: ProcessContext + ) : ServerManagementContext { + private var initialized: Boolean = false + + internal val job: Job = ctx.launch { + init() + try { + server.image(this@VmServerContext) + exit() + } catch (cause: Throwable) { + exit(cause) + } + } + + override val cpus: List<ProcessorContext> = scheduler.createVirtualCpus(flavor) + + override suspend fun init() { + if (initialized) { + throw IllegalStateException() + } + + val previousState = server.state + server = server.copy(state = ServerState.ACTIVE) + monitor.onUpdate(server, previousState) + initialized = true + } + + override suspend fun exit(cause: Throwable?) { + val previousState = server.state + val state = if (cause == null) ServerState.SHUTOFF else ServerState.ERROR + server = server.copy(state = state) + monitor.onUpdate(server, previousState) + initialized = false + } + } +} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorState.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmScheduler.kt index 9d76927f..f02ac2b3 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorState.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmScheduler.kt @@ -22,19 +22,17 @@ * SOFTWARE. */ -package com.atlarge.opendc.compute.virt +package com.atlarge.opendc.compute.virt.driver.hypervisor + +import com.atlarge.opendc.compute.core.ServerFlavor +import com.atlarge.opendc.compute.core.execution.ProcessorContext /** - * The power state of a compute node. + * A scheduler that assigns virtual CPUs to virtual machines and maps them to physical CPUs. */ -public enum class HypervisorState { - /** - * Hypervisor is running. - */ - RUNNING, - +public interface VmScheduler { /** - * Hypervisor is destroyed. + * Create the virtual CPUs for the specified [flavor]. */ - DESTROYED, + fun createVirtualCpus(flavor: ServerFlavor): List<ProcessorContext> } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmSchedulerImpl.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmSchedulerImpl.kt new file mode 100644 index 00000000..b6be935e --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmSchedulerImpl.kt @@ -0,0 +1,190 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.compute.virt.driver.hypervisor + +import com.atlarge.odcsim.processContext +import com.atlarge.opendc.compute.core.ProcessingUnit +import com.atlarge.opendc.compute.core.ServerFlavor +import com.atlarge.opendc.compute.core.execution.ProcessorContext +import com.atlarge.opendc.compute.core.execution.ServerContext +import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.Job +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.launch +import kotlin.math.ceil +import kotlin.math.max +import kotlin.math.min + +/** + * A basic implementation of the [VmScheduler] interface. + * + * @property hostContext The [ServerContext] of the host. + * @property hypervisorMonitor The [HypervisorMonitor] to inform with hypervisor scheduling events. + */ +public class VmSchedulerImpl( + private val hostContext: ServerContext, + private val hypervisorMonitor: HypervisorMonitor +) : VmScheduler { + /** + * The available physical CPUs to schedule. + */ + private val cpus = hostContext.cpus.map { HostProcessorContext(it, hostContext, hypervisorMonitor) } + + override fun createVirtualCpus(flavor: ServerFlavor): List<ProcessorContext> { + return cpus.asSequence() + .take(flavor.cpus.sumBy { it.cores }) + .map { VirtualProcessorContext(it) } + .toList() + } + + /** + * A wrapper around a host [ProcessorContext] that carries additional information about the vCPUs scheduled on the + * processor. + */ + internal class HostProcessorContext( + delegate: ProcessorContext, + private val hostContext: ServerContext, + private val hypervisorMonitor: HypervisorMonitor + ) : ProcessorContext by delegate { + /** + * The set of vCPUs scheduled on this processor. + */ + var vcpus: MutableSet<VirtualProcessorContext> = mutableSetOf() + + /** + * The deferred run call. + */ + var call: Job? = null + + /** + * Schedule the vCPUs on the physical CPU. + */ + suspend fun reschedule() { + flush() + + val vcpus = HashSet(vcpus) // Create snapshot of the vCPUs that were scheduled at this moment + val call = processContext.launch { + var duration: Long = Long.MAX_VALUE + var deadline: Long = Long.MAX_VALUE + + for (vcpu in vcpus) { + // Limit each vCPU to at most an equal share of the host CPU + vcpu.actualUsage = min(vcpu.requestedUsage, info.clockRate / vcpus.size) + + // The duration that we want to run is that of the shortest request from a vCPU + duration = min(duration, ceil(vcpu.requestedBurst / vcpu.actualUsage).toLong()) + deadline = min(deadline, vcpu.requestedDeadline) + } + + var burst: Long = 0 + var usage: Double = 0.0 + + for (vcpu in vcpus) { + vcpu.actualBurst = (duration * vcpu.actualUsage).toLong() + burst += vcpu.actualBurst + usage += vcpu.actualUsage + } + + // We run the total burst on the host processor. Note that this call may be cancelled at any moment in + // time, so not all of the burst may be executed. + val remainder = run(burst, usage, deadline) + val time = processContext.clock.millis() + var totalGrantedBurst: Long = 0 + + // Compute for each vCPU the + for (vcpu in vcpus) { + // Compute the fraction of compute time allocated to the VM + val fraction = vcpu.actualUsage / usage + // Compute the burst time that the VM was actually granted + val grantedBurst = max(0, vcpu.actualBurst - ceil(remainder * fraction).toLong()) + totalGrantedBurst += grantedBurst + // Compute remaining burst time to be executed for the request + vcpu.requestedBurst = max(0, vcpu.requestedBurst - grantedBurst) + + if (vcpu.requestedBurst == 0L || vcpu.requestedDeadline <= time) { + // Return vCPU `run` call: the requested burst was completed or deadline was exceeded + vcpu.chan.send(Unit) + } + } + + hypervisorMonitor.onSliceFinish( + time, + burst, + totalGrantedBurst, + vcpus.size, + hostContext.server + ) + } + + this.call = call + call.invokeOnCompletion { this.call = null } + } + + /** + * Flush the progress of the current active VMs. + */ + fun flush() { + val call = call ?: return // If there is no active call, there is nothing to flush + // The progress is actually flushed in the coroutine when it notices we cancel it and wait for its + // completion. + call.cancel() + } + } + + /** + * An implementation of [ProcessorContext] that delegates the work to a physical CPU. + */ + internal class VirtualProcessorContext(val host: HostProcessorContext) : ProcessorContext { + var actualBurst: Long = 0 + var actualUsage: Double = 0.0 + var requestedBurst: Long = 0 + var requestedUsage: Double = 0.0 + var requestedDeadline: Long = 0 + var chan = Channel<Unit>(Channel.RENDEZVOUS) + + override val info: ProcessingUnit + get() = host.info + + override suspend fun run(burst: Long, maxUsage: Double, deadline: Long): Long { + requestedBurst = burst + requestedUsage = maxUsage + requestedDeadline = deadline + + // Wait until the burst has been run or the coroutine is cancelled + try { + host.vcpus.add(this) + host.reschedule() + chan.receive() + } catch (_: CancellationException) { + // On cancellation, we compute and return the remaining burst + } + + host.vcpus.remove(this) + host.reschedule() + return requestedBurst + } + } +} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/monitor/HypervisorMonitor.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/monitor/HypervisorMonitor.kt index f034cc3f..d4d71aaa 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/monitor/HypervisorMonitor.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/monitor/HypervisorMonitor.kt @@ -10,15 +10,15 @@ interface HypervisorMonitor { * Invoked after a scheduling slice has finished processed. * * @param time The current time (in ms). - * @param totalRequestedCpuTime The total requested CPU time (can be above capacity). - * @param totalCpuTimeCapacity The actual total capacity of the machine managed by this hypervisor. + * @param totalRequestedBurst The total requested CPU time (can be above capacity). + * @param totalGrantedBurst The actual total granted capacity. * @param numberOfDeployedImages The number of images deployed on this hypervisor. * @param hostServer The server hosting this hypervisor. */ fun onSliceFinish( time: Long, - totalRequestedCpuTime: Long, - totalCpuTimeCapacity: Long, + totalRequestedBurst: Long, + totalGrantedBurst: Long, numberOfDeployedImages: Int, hostServer: Server ) diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt index 2b1eed1e..baf3f9ef 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt @@ -7,7 +7,7 @@ import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.metal.Node import com.atlarge.opendc.compute.metal.service.ProvisioningService -import com.atlarge.opendc.compute.virt.HypervisorImage +import com.atlarge.opendc.compute.virt.driver.hypervisor.HypervisorImage import com.atlarge.opendc.compute.virt.driver.VirtDriver import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor import kotlinx.coroutines.launch @@ -51,7 +51,10 @@ class SimpleVirtProvisioningService( ctx.launch { val provisionedNodes = provisioningService.nodes().toList() val deployedNodes = provisionedNodes.map { node -> - val hypervisorImage = HypervisorImage(hypervisorMonitor) + val hypervisorImage = + HypervisorImage( + hypervisorMonitor + ) hypervisorByNode[node] = hypervisorImage provisioningService.deploy(node, hypervisorImage, this@SimpleVirtProvisioningService) } diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt new file mode 100644 index 00000000..417db77d --- /dev/null +++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt @@ -0,0 +1,78 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.compute.core.image + +import org.junit.jupiter.api.DisplayName +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import java.util.UUID + +/** + * Test suite for [FlopsApplicationImage] + */ +@DisplayName("FlopsApplicationImage") +internal class FlopsApplicationImageTest { + @Test + fun `flops must be non-negative`() { + assertThrows<IllegalArgumentException>("FLOPs must be non-negative") { + FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), -1, 1) + } + } + + @Test + fun `cores cannot be zero`() { + assertThrows<IllegalArgumentException>("Cores cannot be zero") { + FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1, 0) + } + } + + @Test + fun `cores cannot be negative`() { + assertThrows<IllegalArgumentException>("Cores cannot be negative") { + FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1, -1) + } + } + + @Test + fun `utilization cannot be zero`() { + assertThrows<IllegalArgumentException>("Utilization cannot be zero") { + FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1, 1, 0.0) + } + } + + @Test + fun `utilization cannot be negative`() { + assertThrows<IllegalArgumentException>("Utilization cannot be negative") { + FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1, 1, -1.0) + } + } + + @Test + fun `utilization cannot be larger than one`() { + assertThrows<IllegalArgumentException>("Utilization cannot be larger than one") { + FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1, 1, 2.0) + } + } +} diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt new file mode 100644 index 00000000..ce0ed10d --- /dev/null +++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt @@ -0,0 +1,92 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.compute.virt.driver.hypervisor + +import com.atlarge.odcsim.SimulationEngineProvider +import com.atlarge.odcsim.processContext +import com.atlarge.opendc.compute.core.ProcessingUnit +import com.atlarge.opendc.compute.core.Server +import com.atlarge.opendc.compute.core.ServerFlavor +import com.atlarge.opendc.compute.core.ServerState +import com.atlarge.opendc.compute.core.image.FlopsApplicationImage +import com.atlarge.opendc.compute.core.monitor.ServerMonitor +import com.atlarge.opendc.compute.metal.PowerState +import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver +import com.atlarge.opendc.compute.virt.driver.VirtDriver +import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor +import kotlinx.coroutines.delay +import kotlinx.coroutines.runBlocking +import org.junit.jupiter.api.Test +import java.util.ServiceLoader +import java.util.UUID + +/** + * Basic test-suite for the hypervisor. + */ +internal class HypervisorTest { + /** + * A smoke test for the bare-metal driver. + */ + @Test + fun smoke() { + val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() + val system = provider({ _ -> + val metalFlavor = ServerFlavor(listOf(ProcessingUnit("Intel", "Xeon", "amd64", 2000.0, 1))) + val vmm = HypervisorImage(object : HypervisorMonitor { + override fun onSliceFinish( + time: Long, + totalRequestedBurst: Long, + totalGrantedBurst: Long, + numberOfDeployedImages: Int, + hostServer: Server + ) { + println("Hello World!") + } + }) + val workloadA = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1_000_000, 1) + val workloadB = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 2_000_000, 1) + val monitor = object : ServerMonitor { + override suspend fun onUpdate(server: Server, previousState: ServerState) { + println("[${processContext.clock.millis()}]: $server") + } + } + val metalDriver = SimpleBareMetalDriver(UUID.randomUUID(), "test", metalFlavor) + + metalDriver.init(monitor) + metalDriver.setImage(vmm) + metalDriver.setPower(PowerState.POWER_ON) + delay(5) + + val vmDriver = metalDriver.refresh().server!!.serviceRegistry[VirtDriver] + vmDriver.spawn(workloadA, monitor, metalFlavor) + vmDriver.spawn(workloadB, monitor, metalFlavor) + }, name = "sim") + + runBlocking { + system.run() + system.terminate() + } + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20HypervisorMonitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20HypervisorMonitor.kt index 5a277dff..7b1c2dbf 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20HypervisorMonitor.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20HypervisorMonitor.kt @@ -8,16 +8,16 @@ class Sc20HypervisorMonitor : HypervisorMonitor { private val outputFile = File("sc20-experiment-results.csv") init { - outputFile.appendText("time,totalRequestedCpuTime,totalCpuTimeCapacity,numberOfDeployedImages,server\n") + outputFile.writeText("time,totalRequestedBurst,totalGrantedBurst,numberOfDeployedImages,server\n") } override fun onSliceFinish( time: Long, - totalRequestedCpuTime: Long, - totalCpuTimeCapacity: Long, + totalRequestedBurst: Long, + totalGrantedBurst: Long, numberOfDeployedImages: Int, hostServer: Server ) { - outputFile.appendText("$time,$totalRequestedCpuTime,$totalCpuTimeCapacity,$numberOfDeployedImages,$numberOfDeployedImages,${hostServer.uid}\n") + outputFile.appendText("$time,$totalRequestedBurst,$totalGrantedBurst,$numberOfDeployedImages,$numberOfDeployedImages,${hostServer.uid}\n") } } |
