diff options
Diffstat (limited to 'simulator')
50 files changed, 259 insertions, 2231 deletions
diff --git a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/MemoryUnit.kt b/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/MemoryUnit.kt deleted file mode 100644 index f41c41c4..00000000 --- a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/MemoryUnit.kt +++ /dev/null @@ -1,40 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.core - -/** - * A memory unit of a compute resource, either virtual or physical. - * - * @property vendor The vendor string of the memory. - * @property modelName The name of the memory model. - * @property speed The access speed of the memory in MHz. - * @property size The size of the memory unit in MBs. - */ -public data class MemoryUnit( - public val vendor: String, - public val modelName: String, - public val speed: Double, - public val size: Long -) diff --git a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/ProcessingNode.kt b/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/ProcessingNode.kt deleted file mode 100644 index 7e4694d4..00000000 --- a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/ProcessingNode.kt +++ /dev/null @@ -1,40 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.core - -/** - * A processing node/package/socket containing possibly several CPU cores. - * - * @property vendor The vendor string of the processor node. - * @property modelName The name of the processor node. - * @property arch The micro-architecture of the processor node. - * @property coreCount The number of logical CPUs in the processor node. - */ -public data class ProcessingNode( - public val vendor: String, - public val arch: String, - public val modelName: String, - public val coreCount: Int -) diff --git a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/ProcessingUnit.kt b/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/ProcessingUnit.kt deleted file mode 100644 index e6ee7f9a..00000000 --- a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/ProcessingUnit.kt +++ /dev/null @@ -1,38 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.core - -/** - * A single logical compute unit of processor node, either virtual or physical. - * - * @property node The processing node containing the CPU core. - * @property id The identifier of the CPU core within the processing node. - * @property frequency The clock rate of the CPU in MHz. - */ -public data class ProcessingUnit( - public val node: ProcessingNode, - public val id: Int, - public val frequency: Double -) diff --git a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/ServerEvent.kt b/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/ServerEvent.kt index fbef8a7d..e9212832 100644 --- a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/ServerEvent.kt +++ b/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/ServerEvent.kt @@ -22,8 +22,6 @@ package org.opendc.compute.core -import org.opendc.core.services.ServiceKey - /** * An event that is emitted by a [Server]. */ @@ -40,12 +38,4 @@ public sealed class ServerEvent { * @property previousState The previous state of the server. */ public data class StateChanged(override val server: Server, val previousState: ServerState) : ServerEvent() - - /** - * This event is emitted when a server publishes a service. - * - * @property server The server that published the service. - * @property key The service key of the service that was published. - */ - public data class ServicePublished(override val server: Server, val key: ServiceKey<*>) : ServerEvent() } diff --git a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/execution/ServerContext.kt b/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/execution/ServerContext.kt deleted file mode 100644 index 82cec00a..00000000 --- a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/execution/ServerContext.kt +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.core.execution - -import kotlinx.coroutines.selects.SelectClause0 -import kotlinx.coroutines.selects.select -import org.opendc.compute.core.ProcessingUnit -import org.opendc.compute.core.Server -import org.opendc.compute.core.image.Image -import org.opendc.core.services.ServiceKey -import java.time.Clock - -/** - * Represents the execution context in which a bootable [Image] runs on a [Server]. - */ -public interface ServerContext { - /** - * The virtual clock. - */ - public val clock: Clock - - /** - * The server on which the image runs. - */ - public val server: Server - - /** - * A list of processing units available to use. - */ - public val cpus: List<ProcessingUnit> - - /** - * Publish the specified [service] at the given [ServiceKey]. - */ - public suspend fun <T : Any> publishService(key: ServiceKey<T>, service: T) - - /** - * Ask the processor cores to run the specified [slice] and suspend execution until the trigger condition is met as - * specified by [triggerMode]. - * - * After the method returns, [Slice.burst] will contain the remaining burst length for each of the cores (which - * may be zero). These changes may happen anytime during execution of this method and callers should not rely on - * the timing of this change. - * - * @param slice The representation of work to run on the processors. - * @param triggerMode The trigger condition to resume execution. - */ - public suspend fun run(slice: Slice, triggerMode: TriggerMode = TriggerMode.FIRST): Unit = - select { onRun(slice, triggerMode).invoke {} } - - /** - * Ask the processors cores to run the specified [batch] of work slices and suspend execution until the trigger - * condition is met as specified by [triggerMode]. - * - * After the method returns, [Slice.burst] will contain the remaining burst length for each of the cores (which - * may be zero). These changes may happen anytime during execution of this method and callers should not rely on - * the timing of this change. - * - * In case slices in the batch do not finish processing before their deadline, [merge] is called to merge these - * slices with the next slice to be executed. - * - * @param batch The batch of work to run on the processors. - * @param triggerMode The trigger condition to resume execution. - * @param merge The merge function for consecutive slices in case the last slice was not completed within its - * deadline. - */ - public suspend fun run( - batch: Sequence<Slice>, - triggerMode: TriggerMode = TriggerMode.FIRST, - merge: (Slice, Slice) -> Slice = { _, r -> r } - ): Unit = select { onRun(batch, triggerMode, merge).invoke {} } - - /** - * Ask the processor cores to run the specified [slice] and select when the trigger condition is met as specified - * by [triggerMode]. - * - * After the method returns, [Slice.burst] will contain the remaining burst length for each of the cores (which - * may be zero). These changes may happen anytime during execution of this method and callers should not rely on - * the timing of this change. - * - * @param slice The representation of work to request from the processors. - * @param triggerMode The trigger condition to resume execution. - */ - public fun onRun(slice: Slice, triggerMode: TriggerMode = TriggerMode.FIRST): SelectClause0 = - onRun(sequenceOf(slice), triggerMode) - - /** - * Ask the processors cores to run the specified [batch] of work slices and select when the trigger condition is met - * as specified by [triggerMode]. - * - * After the method returns, [Slice.burst] will contain the remaining burst length for each of the cores (which - * may be zero). These changes may happen anytime during execution of this method and callers should not rely on - * the timing of this change. - * - * In case slices in the batch do not finish processing before their deadline, [merge] is called to merge these - * slices with the next slice to be executed. - * - * @param batch The batch of work to run on the processors. - * @param triggerMode The trigger condition to resume execution during the **last** slice. - * @param merge The merge function for consecutive slices in case the last slice was not completed within its - * deadline. - */ - public fun onRun( - batch: Sequence<Slice>, - triggerMode: TriggerMode = TriggerMode.FIRST, - merge: (Slice, Slice) -> Slice = { _, r -> r } - ): SelectClause0 - - /** - * A request to the host machine for a slice of CPU time from the processor cores. - * - * Both [burst] and [limit] must be of the same size and in any other case the method will throw an - * [IllegalArgumentException]. - * - * - * @param burst The burst time to request from each of the processor cores. - * @param limit The maximum usage in terms of MHz that the processing core may use while running the burst. - * @param deadline The instant at which this slice needs to be fulfilled. - */ - public class Slice(public val burst: LongArray, public val limit: DoubleArray, public val deadline: Long) { - init { - require(burst.size == limit.size) { "Incompatible array dimensions" } - } - } - - /** - * The modes for triggering a machine exit from the machine. - */ - public enum class TriggerMode { - /** - * A machine exit occurs when either the first processor finishes processing a **non-zero** burst or the - * deadline is reached. - */ - FIRST, - - /** - * A machine exit occurs when either the last processor finishes processing a **non-zero** burst or the deadline - * is reached. - */ - LAST, - - /** - * A machine exit occurs only when the deadline is reached. - */ - DEADLINE - } -} diff --git a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/execution/ServerManagementContext.kt b/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/execution/ServerManagementContext.kt deleted file mode 100644 index 51727e43..00000000 --- a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/execution/ServerManagementContext.kt +++ /dev/null @@ -1,40 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.core.execution - -/** - * An extended [ServerContext] providing several methods for managing the execution context. - */ -public interface ServerManagementContext : ServerContext { - /** - * Initialize the management context. - */ - public suspend fun init() - - /** - * Terminate the execution of the server. - */ - public suspend fun exit(cause: Throwable? = null) -} diff --git a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/execution/ShutdownException.kt b/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/execution/ShutdownException.kt deleted file mode 100644 index d751fb5e..00000000 --- a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/execution/ShutdownException.kt +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.core.execution - -import kotlinx.coroutines.CancellationException - -/** - * This exception is thrown by the underlying [ServerContext] to indicate that a shutdown flow - * has been sent to the server. - */ -public class ShutdownException(message: String? = null, override val cause: Throwable? = null) : - CancellationException(message) - -/** - * This method terminates the current active coroutine if the specified [CancellationException] is caused - * by a shutdown. - */ -public fun CancellationException.assertShutdown() { - if (this is ShutdownException) { - throw this - } -} - -/** - * This method terminates the current active coroutine if the specified [CancellationException] is caused - * by a failure. - */ -public fun CancellationException.assertFailure() { - if (this is ShutdownException && cause != null) { - throw this - } -} diff --git a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/image/EmptyImage.kt b/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/image/EmptyImage.kt index 2b9158bf..01f86a1b 100644 --- a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/image/EmptyImage.kt +++ b/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/image/EmptyImage.kt @@ -22,7 +22,6 @@ package org.opendc.compute.core.image -import org.opendc.compute.core.execution.ServerContext import org.opendc.core.resource.TagContainer import java.util.UUID @@ -33,6 +32,4 @@ public object EmptyImage : Image { override val uid: UUID = UUID.randomUUID() override val name: String = "empty" override val tags: TagContainer = emptyMap() - - override suspend fun invoke(ctx: ServerContext) {} } diff --git a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/image/FlopsApplicationImage.kt b/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/image/FlopsApplicationImage.kt deleted file mode 100644 index 58c00e13..00000000 --- a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/image/FlopsApplicationImage.kt +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.core.image - -import org.opendc.compute.core.execution.ServerContext -import org.opendc.core.resource.TagContainer -import java.util.UUID -import kotlin.math.min - -/** - * An application [Image] that models applications performing a static number of floating point operations ([flops]) on - * a compute resource. - * - * @property uid The unique identifier of this image. - * @property name The name of this image. - * @property tags The tags attached to the image. - * @property flops The number of floating point operations to perform for this task in MFLOPs. - * @property cores The number of cores that the image is able to utilize. - * @property utilization A model of the CPU utilization of the application. - */ -public data class FlopsApplicationImage( - public override val uid: UUID, - public override val name: String, - public override val tags: TagContainer, - public val flops: Long, - public val cores: Int, - public val utilization: Double = 0.8 -) : Image { - init { - require(flops >= 0) { "Negative number of flops" } - require(cores > 0) { "Negative number of cores or no cores" } - require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" } - } - - /** - * Execute the runtime behavior based on a number of floating point operations to execute. - */ - override suspend fun invoke(ctx: ServerContext) { - val cores = min(this.cores, ctx.server.flavor.cpuCount) - val burst = LongArray(cores) { flops / cores } - val maxUsage = DoubleArray(cores) { i -> ctx.cpus[i].frequency * utilization } - - ctx.run(ServerContext.Slice(burst, maxUsage, Long.MAX_VALUE), triggerMode = ServerContext.TriggerMode.LAST) - } -} diff --git a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/image/FlopsHistoryFragment.kt b/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/image/FlopsHistoryFragment.kt deleted file mode 100644 index 5472e4e0..00000000 --- a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/image/FlopsHistoryFragment.kt +++ /dev/null @@ -1,3 +0,0 @@ -package org.opendc.compute.core.image - -public data class FlopsHistoryFragment(val tick: Long, val flops: Long, val duration: Long, val usage: Double, val cores: Int) diff --git a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/image/Image.kt b/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/image/Image.kt index d04920c3..e481fcc3 100644 --- a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/image/Image.kt +++ b/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/image/Image.kt @@ -22,7 +22,6 @@ package org.opendc.compute.core.image -import org.opendc.compute.core.execution.ServerContext import org.opendc.core.resource.Resource /** @@ -33,12 +32,4 @@ import org.opendc.core.resource.Resource * useful for backup purposes or for producing “gold” server images if you plan to deploy a particular server * configuration frequently. */ -public interface Image : Resource { - /** - * Launch the machine image in the specified [ServerContext]. - * - * This method should encapsulate and characterize the runtime behavior of the instance resulting from launching - * the image on some machine, in terms of the resource consumption on the machine. - */ - public suspend operator fun invoke(ctx: ServerContext) -} +public interface Image : Resource diff --git a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/image/SimWorkloadImage.kt b/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/image/SimWorkloadImage.kt index 7caedf6d..1dd9a34b 100644 --- a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/image/SimWorkloadImage.kt +++ b/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/image/SimWorkloadImage.kt @@ -22,7 +22,6 @@ package org.opendc.compute.core.image -import org.opendc.compute.core.execution.ServerContext import org.opendc.core.resource.TagContainer import org.opendc.simulator.compute.workload.SimWorkload import java.util.* @@ -40,6 +39,4 @@ public data class SimWorkloadImage( public override val name: String, public override val tags: TagContainer, public val workload: SimWorkload -) : Image { - override suspend fun invoke(ctx: ServerContext): Unit = TODO() -} +) : Image diff --git a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/image/VmImage.kt b/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/image/VmImage.kt deleted file mode 100644 index 72efbe0b..00000000 --- a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/image/VmImage.kt +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.core.image - -import org.opendc.compute.core.execution.ServerContext -import org.opendc.core.resource.TagContainer -import java.util.* -import kotlin.math.min - -public class VmImage( - public override val uid: UUID, - public override val name: String, - public override val tags: TagContainer, - public val flopsHistory: Sequence<FlopsHistoryFragment>, - public val maxCores: Int, - public val requiredMemory: Long -) : Image { - - override suspend fun invoke(ctx: ServerContext) { - var offset = ctx.clock.millis() - - val batch = flopsHistory.map { fragment -> - val cores = min(fragment.cores, ctx.server.flavor.cpuCount) - val burst = LongArray(cores) { fragment.flops / cores } - val usage = DoubleArray(cores) { fragment.usage / cores } - offset += fragment.duration - ServerContext.Slice(burst, usage, offset) - } - - ctx.run(batch) - } - - override fun toString(): String = "VmImage(uid=$uid, name=$name, cores=$maxCores, requiredMemory=$requiredMemory)" -} diff --git a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/workload/PerformanceInterferenceModel.kt b/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/workload/PerformanceInterferenceModel.kt index 7956c095..51f62197 100644 --- a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/workload/PerformanceInterferenceModel.kt +++ b/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/workload/PerformanceInterferenceModel.kt @@ -22,7 +22,6 @@ package org.opendc.compute.core.workload -import org.opendc.compute.core.Server import java.util.* import kotlin.random.Random @@ -43,16 +42,40 @@ public class PerformanceInterferenceModel( private var intersectingItems: List<PerformanceInterferenceModelItem> = emptyList() private val colocatedWorkloads = TreeMap<String, Int>() - internal fun vmStarted(server: Server) { - colocatedWorkloads.merge(server.image.name, 1, Int::plus) + /** + * Indicate that a VM has started. + */ + public fun onStart(name: String) { + colocatedWorkloads.merge(name, 1, Int::plus) intersectingItems = items.filter { item -> doesMatch(item) } } - internal fun vmStopped(server: Server) { - colocatedWorkloads.computeIfPresent(server.image.name) { _, v -> (v - 1).takeUnless { it == 0 } } + /** + * Indicate that a VM has stopped. + */ + public fun onStop(name: String) { + colocatedWorkloads.computeIfPresent(name) { _, v -> (v - 1).takeUnless { it == 0 } } intersectingItems = items.filter { item -> doesMatch(item) } } + /** + * Compute the performance interference based on the current server load. + */ + public fun apply(currentServerLoad: Double): Double { + if (intersectingItems.isEmpty()) { + return 1.0 + } + val score = intersectingItems + .firstOrNull { it.minServerLoad <= currentServerLoad } + + // Apply performance penalty to (on average) only one of the VMs + return if (score != null && random.nextInt(score.workloadNames.size) == 0) { + score.performanceScore + } else { + 1.0 + } + } + private fun doesMatch(item: PerformanceInterferenceModelItem): Boolean { var count = 0 for ( @@ -67,21 +90,6 @@ public class PerformanceInterferenceModel( } return false } - - internal fun apply(currentServerLoad: Double): Double { - if (intersectingItems.isEmpty()) { - return 1.0 - } - val score = intersectingItems - .firstOrNull { it.minServerLoad <= currentServerLoad } - - // Apply performance penalty to (on average) only one of the VMs - return if (score != null && random.nextInt(score.workloadNames.size) == 0) { - score.performanceScore - } else { - 1.0 - } - } } /** diff --git a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/workload/VmWorkload.kt b/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/workload/VmWorkload.kt index 6ab8da2a..6c724277 100644 --- a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/workload/VmWorkload.kt +++ b/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/core/workload/VmWorkload.kt @@ -22,7 +22,7 @@ package org.opendc.compute.core.workload -import org.opendc.compute.core.image.VmImage +import org.opendc.compute.core.image.Image import org.opendc.core.User import org.opendc.core.workload.Workload import java.util.UUID @@ -39,7 +39,7 @@ public data class VmWorkload( override val uid: UUID, override val name: String, override val owner: User, - val image: VmImage + val image: Image ) : Workload { override fun equals(other: Any?): Boolean = other is VmWorkload && uid == other.uid diff --git a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/metal/driver/SimBareMetalDriver.kt b/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/metal/driver/SimBareMetalDriver.kt index 4f440342..4f72d599 100644 --- a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/metal/driver/SimBareMetalDriver.kt +++ b/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/metal/driver/SimBareMetalDriver.kt @@ -29,7 +29,6 @@ import org.opendc.compute.core.Server import org.opendc.compute.core.ServerEvent import org.opendc.compute.core.ServerState import org.opendc.compute.core.execution.ComputeSimExecutionContext -import org.opendc.compute.core.execution.ShutdownException import org.opendc.compute.core.image.EmptyImage import org.opendc.compute.core.image.Image import org.opendc.compute.core.image.SimWorkloadImage @@ -89,6 +88,11 @@ public class SimBareMetalDriver( private val nodeState = StateFlow(Node(uid, name, metadata + ("driver" to this), NodeState.SHUTOFF, EmptyImage, null, events)) + /** + * The [SimBareMetalMachine] we use to run the workload. + */ + private val machine = SimBareMetalMachine(coroutineScope, clock, machine) + override val node: Flow<Node> = nodeState override val usage: Flow<Double> @@ -102,11 +106,6 @@ public class SimBareMetalDriver( private val random = Random(uid.leastSignificantBits xor uid.mostSignificantBits) /** - * The [SimBareMetalMachine] we use to run the workload. - */ - private val machine = SimBareMetalMachine(coroutineScope, clock, machine) - - /** * The [Job] that runs the simulated workload. */ private var job: Job? = null @@ -174,12 +173,12 @@ public class SimBareMetalDriver( private fun exitMachine(cause: Throwable?) { val newServerState = - if (cause == null || (cause is ShutdownException && cause.cause == null)) + if (cause == null) ServerState.SHUTOFF else ServerState.ERROR val newNodeState = - if (cause == null || (cause is ShutdownException && cause.cause != null)) + if (cause == null) nodeState.value.state else NodeState.ERROR diff --git a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/metal/driver/SimpleBareMetalDriver.kt deleted file mode 100644 index c4897197..00000000 --- a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/metal/driver/SimpleBareMetalDriver.kt +++ /dev/null @@ -1,476 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.metal.driver - -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Delay -import kotlinx.coroutines.DisposableHandle -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.FlowPreview -import kotlinx.coroutines.InternalCoroutinesApi -import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.MutableStateFlow -import kotlinx.coroutines.intrinsics.startCoroutineCancellable -import kotlinx.coroutines.launch -import kotlinx.coroutines.selects.SelectClause0 -import kotlinx.coroutines.selects.SelectInstance -import org.opendc.compute.core.Flavor -import org.opendc.compute.core.MemoryUnit -import org.opendc.compute.core.ProcessingUnit -import org.opendc.compute.core.Server -import org.opendc.compute.core.ServerEvent -import org.opendc.compute.core.ServerState -import org.opendc.compute.core.execution.ServerContext -import org.opendc.compute.core.execution.ServerManagementContext -import org.opendc.compute.core.execution.ShutdownException -import org.opendc.compute.core.image.EmptyImage -import org.opendc.compute.core.image.Image -import org.opendc.compute.metal.Node -import org.opendc.compute.metal.NodeEvent -import org.opendc.compute.metal.NodeState -import org.opendc.compute.metal.power.ConstantPowerModel -import org.opendc.core.power.PowerModel -import org.opendc.core.services.ServiceKey -import org.opendc.core.services.ServiceRegistry -import org.opendc.utils.flow.EventFlow -import org.opendc.utils.flow.StateFlow -import java.lang.Exception -import java.time.Clock -import java.util.UUID -import kotlin.coroutines.ContinuationInterceptor -import kotlin.math.ceil -import kotlin.math.max -import kotlin.math.min -import kotlin.random.Random - -/** - * A basic implementation of the [BareMetalDriver] that simulates an [Image] running on a bare-metal machine. - * - * @param coroutineScope The [CoroutineScope] the driver runs in. - * @param clock The virtual clock to keep track of time. - * @param uid The unique identifier of the machine. - * @param name An optional name of the machine. - * @param metadata The initial metadata of the node. - * @param cpus The CPUs available to the bare metal machine. - * @param memoryUnits The memory units in this machine. - * @param powerModel The power model of this machine. - */ -@OptIn(ExperimentalCoroutinesApi::class) -public class SimpleBareMetalDriver( - private val coroutineScope: CoroutineScope, - private val clock: Clock, - uid: UUID, - name: String, - metadata: Map<String, Any>, - private val cpus: List<ProcessingUnit>, - private val memoryUnits: List<MemoryUnit>, - powerModel: PowerModel<SimpleBareMetalDriver> = ConstantPowerModel( - 0.0 - ) -) : BareMetalDriver { - /** - * The flavor that corresponds to this machine. - */ - private val flavor = - Flavor(cpus.size, memoryUnits.map { it.size }.sum()) - - /** - * The current active server context. - */ - private var serverContext: BareMetalServerContext? = null - - /** - * The events of the machine. - */ - private val events = EventFlow<NodeEvent>() - - /** - * The flow containing the load of the server. - */ - private val usageState = MutableStateFlow(0.0) - - /** - * The machine state. - */ - private val nodeState = - StateFlow(Node(uid, name, metadata + ("driver" to this), NodeState.SHUTOFF, EmptyImage, null, events)) - - override val node: Flow<Node> = nodeState - - @OptIn(FlowPreview::class) - override val usage: Flow<Double> = usageState - - override val powerDraw: Flow<Double> = powerModel(this) - - /** - * The internal random instance. - */ - private val random = Random(uid.leastSignificantBits xor uid.mostSignificantBits) - - override suspend fun init(): Node { - return nodeState.value - } - - override suspend fun start(): Node { - val node = nodeState.value - if (node.state != NodeState.SHUTOFF) { - return node - } - - val events = EventFlow<ServerEvent>() - val server = Server( - UUID(random.nextLong(), random.nextLong()), - node.name, - emptyMap(), - flavor, - node.image, - ServerState.BUILD, - ServiceRegistry().put(BareMetalDriver, this@SimpleBareMetalDriver), - events - ) - - setNode(node.copy(state = NodeState.BOOT, server = server)) - serverContext = BareMetalServerContext(events) - return nodeState.value - } - - override suspend fun stop(): Node { - val node = nodeState.value - if (node.state == NodeState.SHUTOFF) { - return node - } - - // We terminate the image running on the machine - serverContext!!.cancel(fail = false) - serverContext = null - - setNode(node.copy(state = NodeState.SHUTOFF, server = null)) - return node - } - - override suspend fun reboot(): Node { - stop() - return start() - } - - override suspend fun setImage(image: Image): Node { - setNode(nodeState.value.copy(image = image)) - return nodeState.value - } - - override suspend fun refresh(): Node = nodeState.value - - private fun setNode(value: Node) { - val field = nodeState.value - if (field.state != value.state) { - events.emit(NodeEvent.StateChanged(value, field.state)) - } - - if (field.server != null && value.server != null && field.server.state != value.server.state) { - serverContext!!.events.emit(ServerEvent.StateChanged(value.server, field.server.state)) - } - - nodeState.value = value - } - - private inner class BareMetalServerContext(val events: EventFlow<ServerEvent>) : ServerManagementContext { - private var finalized: Boolean = false - - // A state in which the machine is still available, but does not run any of the work requested by the - // image - var unavailable = false - - override val cpus: List<ProcessingUnit> = this@SimpleBareMetalDriver.cpus - - override val server: Server - get() = nodeState.value.server!! - - override val clock: Clock - get() = this@SimpleBareMetalDriver.clock - - private val job = coroutineScope.launch { - delay(1) // TODO Introduce boot time - init() - try { - server.image(this@BareMetalServerContext) - exit() - } catch (cause: Throwable) { - exit(cause) - } - } - - /** - * Cancel the image running on the machine. - */ - suspend fun cancel(fail: Boolean) { - if (fail) - job.cancel(ShutdownException(cause = Exception("Random failure"))) - else - job.cancel(ShutdownException()) - job.join() - } - - override suspend fun <T : Any> publishService(key: ServiceKey<T>, service: T) { - val server = server.copy(services = server.services.put(key, service)) - setNode(nodeState.value.copy(server = server)) - events.emit(ServerEvent.ServicePublished(server, key)) - } - - override suspend fun init() { - assert(!finalized) { "Machine is already finalized" } - - val server = server.copy(state = ServerState.ACTIVE) - setNode(nodeState.value.copy(state = NodeState.ACTIVE, server = server)) - } - - override suspend fun exit(cause: Throwable?) { - finalized = true - - val newServerState = - if (cause == null || (cause is ShutdownException && cause.cause == null)) - ServerState.SHUTOFF - else - ServerState.ERROR - val newNodeState = - if (cause == null || (cause is ShutdownException && cause.cause != null)) - nodeState.value.state - else - NodeState.ERROR - val server = server.copy(state = newServerState) - setNode(nodeState.value.copy(state = newNodeState, server = server)) - } - - /** - * A disposable to prevent resetting the usage state for subsequent calls to onRun. - */ - private var usageFlush: DisposableHandle? = null - - /** - * Cache the [Delay] instance for timing. - * - * XXX We need to cache this before the call to [onRun] since doing this in [onRun] is too heavy. - * XXX Note however that this is an ugly hack which may break in the future. - */ - @OptIn(InternalCoroutinesApi::class) - private val delay = coroutineScope.coroutineContext[ContinuationInterceptor] as Delay - - @OptIn(InternalCoroutinesApi::class) - override fun onRun( - batch: Sequence<ServerContext.Slice>, - triggerMode: ServerContext.TriggerMode, - merge: (ServerContext.Slice, ServerContext.Slice) -> ServerContext.Slice - ): SelectClause0 { - assert(!finalized) { "Server instance is already finalized" } - - return object : SelectClause0 { - @InternalCoroutinesApi - override fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R) { - // Do not reset the usage state: we will set it ourselves - usageFlush?.dispose() - usageFlush = null - - val queue = batch.iterator() - var start = Long.MIN_VALUE - var currentWork: SliceWork? = null - var currentDisposable: DisposableHandle? = null - - fun schedule(slice: ServerContext.Slice) { - start = clock.millis() - - val isLastSlice = !queue.hasNext() - val work = SliceWork(slice) - val candidateDuration = when (triggerMode) { - ServerContext.TriggerMode.FIRST -> work.minExit - ServerContext.TriggerMode.LAST -> work.maxExit - ServerContext.TriggerMode.DEADLINE -> slice.deadline - start - } - - // Check whether the deadline is exceeded during the run of the slice. - val duration = min(candidateDuration, slice.deadline - start) - - val action = Runnable { - currentWork = null - - // Flush all the work that was performed - val hasFinished = work.stop(duration) - - if (!isLastSlice) { - val candidateSlice = queue.next() - val nextSlice = - // If our previous slice exceeds its deadline, merge it with the next candidate slice - if (hasFinished) - candidateSlice - else - merge(candidateSlice, slice) - schedule(nextSlice) - } else if (select.trySelect()) { - block.startCoroutineCancellable(select.completion) - } - } - - // Schedule the flush after the entire slice has finished - currentDisposable = delay.invokeOnTimeout(duration, action) - - // Start the slice work - currentWork = work - work.start() - } - - // Schedule the first work - if (queue.hasNext()) { - schedule(queue.next()) - - // A DisposableHandle to flush the work in case the call is cancelled - val disposable = DisposableHandle { - val end = clock.millis() - val duration = end - start - - currentWork?.stop(duration) - currentDisposable?.dispose() - - // Schedule reset the usage of the machine since the call is returning - usageFlush = delay.invokeOnTimeout(1) { - usageState.value = 0.0 - usageFlush = null - } - } - - select.disposeOnSelect(disposable) - } else if (select.trySelect()) { - // No work has been given: select immediately - block.startCoroutineCancellable(select.completion) - } - } - } - } - - /** - * A slice to be processed. - */ - private inner class SliceWork(val slice: ServerContext.Slice) { - /** - * The duration after which the first processor finishes processing this slice. - */ - public val minExit: Long - - /** - * The duration after which the last processor finishes processing this slice. - */ - public val maxExit: Long - - /** - * A flag to indicate that the slice will exceed the deadline. - */ - public val exceedsDeadline: Boolean - get() = slice.deadline < maxExit - - /** - * The total amount of CPU usage. - */ - public val totalUsage: Double - - /** - * A flag to indicate that this slice is empty. - */ - public val isEmpty: Boolean - - init { - var totalUsage = 0.0 - var minExit = Long.MAX_VALUE - var maxExit = 0L - var nonEmpty = false - - // Determine the duration of the first/last CPU to finish - for (i in 0 until min(cpus.size, slice.burst.size)) { - val cpu = cpus[i] - val usage = min(slice.limit[i], cpu.frequency) - val cpuDuration = - ceil(slice.burst[i] / usage * 1000).toLong() // Convert from seconds to milliseconds - - totalUsage += usage / cpu.frequency - - if (cpuDuration != 0L) { // We only wait for processor cores with a non-zero burst - minExit = min(minExit, cpuDuration) - maxExit = max(maxExit, cpuDuration) - nonEmpty = true - } - } - - this.isEmpty = !nonEmpty - this.totalUsage = totalUsage - this.minExit = minExit - this.maxExit = maxExit - } - - /** - * Indicate that the work on the slice has started. - */ - public fun start() { - usageState.value = totalUsage / cpus.size - } - - /** - * Flush the work performed on the slice. - */ - public fun stop(duration: Long): Boolean { - var hasFinished = true - - // Only flush the work if the machine is available - if (!unavailable) { - for (i in 0 until min(cpus.size, slice.burst.size)) { - val usage = min(slice.limit[i], cpus[i].frequency) - val granted = ceil(duration / 1000.0 * usage).toLong() - val res = max(0, slice.burst[i] - granted) - slice.burst[i] = res - - if (res != 0L) { - hasFinished = false - } - } - } - - return hasFinished - } - } - } - - override val scope: CoroutineScope - get() = coroutineScope - - override suspend fun fail() { - serverContext?.unavailable = true - - val server = nodeState.value.server?.copy(state = ServerState.ERROR) - setNode(nodeState.value.copy(state = NodeState.ERROR, server = server)) - } - - override suspend fun recover() { - serverContext?.unavailable = false - - val server = nodeState.value.server?.copy(state = ServerState.ACTIVE) - setNode(nodeState.value.copy(state = NodeState.ACTIVE, server = server)) - } - - override fun toString(): String = "SimpleBareMetalDriver(node = ${nodeState.value.uid})" -} diff --git a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/virt/HypervisorImage.kt b/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/virt/HypervisorImage.kt deleted file mode 100644 index ff88f0dc..00000000 --- a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/virt/HypervisorImage.kt +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.virt - -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.suspendCancellableCoroutine -import org.opendc.compute.core.execution.ServerContext -import org.opendc.compute.core.image.Image -import org.opendc.compute.virt.driver.SimpleVirtDriver -import org.opendc.compute.virt.driver.VirtDriver -import org.opendc.core.resource.TagContainer -import java.util.UUID - -/** - * A hypervisor managing the VMs of a node. - */ -public object HypervisorImage : Image { - override val uid: UUID = UUID.randomUUID() - override val name: String = "vmm" - override val tags: TagContainer = emptyMap() - - override suspend fun invoke(ctx: ServerContext) { - coroutineScope { - val driver = SimpleVirtDriver(ctx, this) - ctx.publishService(VirtDriver.Key, driver) - - // Suspend image until it is cancelled - try { - suspendCancellableCoroutine<Unit> {} - } finally { - driver.cancel() - } - } - } -} diff --git a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/virt/driver/SimVirtDriver.kt b/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/virt/driver/SimVirtDriver.kt index 758315aa..6e0df93f 100644 --- a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/virt/driver/SimVirtDriver.kt +++ b/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/virt/driver/SimVirtDriver.kt @@ -28,7 +28,6 @@ import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.launch import org.opendc.compute.core.* import org.opendc.compute.core.execution.ComputeSimExecutionContext -import org.opendc.compute.core.execution.ShutdownException import org.opendc.compute.core.image.Image import org.opendc.compute.core.image.SimWorkloadImage import org.opendc.compute.virt.HypervisorEvent @@ -51,6 +50,12 @@ public class SimVirtDriver( ) : VirtDriver { /** + * The server hosting this hypervisor. + */ + public val server: Server + get() = (ctx as ComputeSimExecutionContext).server + + /** * The [EventFlow] to emit the events. */ internal val eventFlow = EventFlow<HypervisorEvent>() @@ -164,7 +169,7 @@ public class SimVirtDriver( private fun exit(cause: Throwable?) { val serverState = - if (cause == null || (cause is ShutdownException && cause.cause == null)) + if (cause == null) ServerState.SHUTOFF else ServerState.ERROR diff --git a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/virt/driver/SimpleVirtDriver.kt b/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/virt/driver/SimpleVirtDriver.kt deleted file mode 100644 index d581e651..00000000 --- a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/virt/driver/SimpleVirtDriver.kt +++ /dev/null @@ -1,640 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.virt.driver - -import kotlinx.coroutines.* -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.intrinsics.startCoroutineCancellable -import kotlinx.coroutines.selects.SelectClause0 -import kotlinx.coroutines.selects.SelectInstance -import kotlinx.coroutines.selects.select -import mu.KotlinLogging -import org.opendc.compute.core.Flavor -import org.opendc.compute.core.ProcessingUnit -import org.opendc.compute.core.Server -import org.opendc.compute.core.ServerEvent -import org.opendc.compute.core.ServerState -import org.opendc.compute.core.execution.ServerContext -import org.opendc.compute.core.execution.ServerManagementContext -import org.opendc.compute.core.execution.ShutdownException -import org.opendc.compute.core.image.Image -import org.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL -import org.opendc.compute.core.workload.PerformanceInterferenceModel -import org.opendc.compute.virt.HypervisorEvent -import org.opendc.core.services.ServiceKey -import org.opendc.core.services.ServiceRegistry -import org.opendc.utils.flow.EventFlow -import java.time.Clock -import java.util.UUID -import kotlin.math.ceil -import kotlin.math.max -import kotlin.math.min - -/** - * The logging instance to use. - */ -private val logger = KotlinLogging.logger {} - -/** - * A [VirtDriver] that is backed by a simple hypervisor implementation. - */ -@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class) -public class SimpleVirtDriver( - private val hostContext: ServerContext, - scope: CoroutineScope -) : VirtDriver, CoroutineScope by scope { - /** - * The [Server] on which this hypervisor runs. - */ - public val server: Server - get() = hostContext.server - - /** - * A set for tracking the VM context objects. - */ - private val vms: MutableSet<VmServerContext> = mutableSetOf() - - /** - * Current total memory use of the images on this hypervisor. - */ - private var availableMemory: Long = hostContext.server.flavor.memorySize - - /** - * The [EventFlow] to emit the events. - */ - internal val eventFlow = EventFlow<HypervisorEvent>() - - override val events: Flow<HypervisorEvent> = eventFlow - - init { - launch { - try { - // Yield first to allow class variables to initialize - yield() - scheduler() - } catch (e: Exception) { - if (e !is CancellationException) { - logger.error("Hypervisor scheduler failed", e) - } - throw e - } - } - } - - override suspend fun spawn( - name: String, - image: Image, - flavor: org.opendc.compute.core.Flavor - ): Server { - val requiredMemory = flavor.memorySize - if (availableMemory - requiredMemory < 0) { - throw InsufficientMemoryOnServerException() - } - require(flavor.cpuCount <= hostContext.server.flavor.cpuCount) { "Machine does not fit" } - - val events = EventFlow<ServerEvent>() - val server = Server( - UUID.randomUUID(), - name, - emptyMap(), - flavor, - image, - ServerState.BUILD, - ServiceRegistry(), - events - ) - availableMemory -= requiredMemory - vms.add(VmServerContext(server, events)) - vmStarted(server) - eventFlow.emit(HypervisorEvent.VmsUpdated(this, vms.size, availableMemory)) - return server - } - - internal fun cancel() { - eventFlow.close() - } - - private fun vmStarted(server: Server) { - vms.forEach { - val performanceModel = - it.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? - performanceModel?.vmStarted(server) - } - } - - private fun vmStopped(server: Server) { - vms.forEach { - val performanceModel = - it.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? - performanceModel?.vmStopped(server) - } - } - - /** - * A scheduling command processed by the scheduler. - */ - private sealed class SchedulerCommand { - /** - * Schedule the specified VM on the hypervisor. - */ - data class Schedule(val vm: Vm) : SchedulerCommand() - - /** - * De-schedule the specified VM on the hypervisor. - */ - data class Deschedule(val vm: Vm) : SchedulerCommand() - - /** - * Interrupt the scheduler. - */ - object Interrupt : SchedulerCommand() - } - - /** - * A flag to indicate the driver is stopped. - */ - private var stopped: Boolean = false - - /** - * The channel for scheduling new CPU requests. - */ - private val schedulingQueue = Channel<SchedulerCommand>(Channel.UNLIMITED) - - /** - * The scheduling process of the hypervisor. - */ - private suspend fun scheduler() { - val clock = hostContext.clock - val maxUsage = hostContext.cpus.sumByDouble { it.frequency } - val pCPUs = hostContext.cpus.indices.sortedBy { hostContext.cpus[it].frequency } - - val vms = mutableSetOf<Vm>() - val vcpus = mutableListOf<VCpu>() - - val usage = DoubleArray(hostContext.cpus.size) - val burst = LongArray(hostContext.cpus.size) - - fun process(command: SchedulerCommand) { - when (command) { - is SchedulerCommand.Schedule -> { - vms += command.vm - vcpus.addAll(command.vm.vcpus) - } - is SchedulerCommand.Deschedule -> { - vms -= command.vm - vcpus.removeAll(command.vm.vcpus) - } - is SchedulerCommand.Interrupt -> { - } - } - } - - fun processRemaining() { - var command = schedulingQueue.poll() - while (command != null) { - process(command) - command = schedulingQueue.poll() - } - } - - while (!stopped) { - // Wait for a request to be submitted if we have no work yet. - if (vcpus.isEmpty()) { - process(schedulingQueue.receive()) - } - - processRemaining() - - val start = clock.millis() - - val vmCount = vms.size - var duration: Double = Double.POSITIVE_INFINITY - var deadline: Long = Long.MAX_VALUE - var availableUsage = maxUsage - var totalRequestedUsage = 0.0 - var totalRequestedBurst = 0L - - // Sort the vCPUs based on their requested usage - // Profiling shows that it is faster to sort every slice instead of maintaining some kind of sorted set - vcpus.sort() - - // Divide the available host capacity fairly across the vCPUs using max-min fair sharing - for ((i, req) in vcpus.withIndex()) { - val remaining = vcpus.size - i - val availableShare = availableUsage / remaining - val grantedUsage = min(req.limit, availableShare) - - // Take into account the minimum deadline of this slice before we possible continue - deadline = min(deadline, req.vm.deadline) - - // Ignore empty CPUs - if (grantedUsage <= 0 || req.burst <= 0) { - req.allocatedLimit = 0.0 - continue - } - - totalRequestedUsage += req.limit - totalRequestedBurst += req.burst - - req.allocatedLimit = grantedUsage - availableUsage -= grantedUsage - - // The duration that we want to run is that of the shortest request from a vCPU - duration = min(duration, req.burst / grantedUsage) - } - - // XXX We set the minimum duration to 5 minutes here to prevent the rounding issues that are occurring with the FLOPs. - duration = 300.0 - - val totalAllocatedUsage = maxUsage - availableUsage - var totalAllocatedBurst = 0L - availableUsage = totalAllocatedUsage - val serverLoad = totalAllocatedUsage / maxUsage - - // Divide the requests over the available capacity of the pCPUs fairly - for (i in pCPUs) { - val maxCpuUsage = hostContext.cpus[i].frequency - val fraction = maxCpuUsage / maxUsage - val grantedUsage = min(maxCpuUsage, totalAllocatedUsage * fraction) - val grantedBurst = ceil(duration * grantedUsage).toLong() - - usage[i] = grantedUsage - burst[i] = grantedBurst - totalAllocatedBurst += grantedBurst - availableUsage -= grantedUsage - } - - // We run the total burst on the host processor. Note that this call may be cancelled at any moment in - // time, so not all of the burst may be executed. - select<Boolean> { - schedulingQueue.onReceive { schedulingQueue.offer(it); true } - hostContext.onRun(ServerContext.Slice(burst, usage, deadline), ServerContext.TriggerMode.DEADLINE) - .invoke { false } - } - - val end = clock.millis() - - // No work was performed - if ((end - start) <= 0) { - continue - } - - // The total requested burst that the VMs wanted to run in the time-frame that we ran. - val totalRequestedSubBurst = - vcpus.map { ceil((duration * 1000) / (it.vm.deadline - start) * it.burst).toLong() }.sum() - val totalRemainder = burst.sum() - val totalGrantedBurst = totalAllocatedBurst - totalRemainder - - // The burst that was lost due to overcommissioning of CPU resources - var totalOvercommissionedBurst = 0L - // The burst that was lost due to interference. - var totalInterferedBurst = 0L - - val vmIterator = vms.iterator() - while (vmIterator.hasNext()) { - val vm = vmIterator.next() - - // Apply performance interference model - val performanceModel = - vm.ctx.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? - val performanceScore = performanceModel?.apply(serverLoad) ?: 1.0 - var hasFinished = false - - for (vcpu in vm.vcpus) { - // Compute the fraction of compute time allocated to the VM - val fraction = vcpu.allocatedLimit / totalAllocatedUsage - - // Compute the burst time that the VM was actually granted - val grantedBurst = ceil(totalGrantedBurst * fraction).toLong() - - // The burst that was actually used by the VM - val usedBurst = ceil(grantedBurst * performanceScore).toLong() - - totalInterferedBurst += grantedBurst - usedBurst - - // Compute remaining burst time to be executed for the request - if (vcpu.consume(usedBurst)) { - hasFinished = true - } else if (vm.deadline <= end && hostContext.server.state != ServerState.ERROR) { - // Request must have its entire burst consumed or otherwise we have overcommission - // Note that we count the overcommissioned burst if the hypervisor has failed. - totalOvercommissionedBurst += vcpu.burst - } - } - - if (hasFinished || vm.deadline <= end) { - // Mark the VM as finished and deschedule the VMs if needed - if (vm.finish()) { - vmIterator.remove() - vcpus.removeAll(vm.vcpus) - } - } - } - - eventFlow.emit( - HypervisorEvent.SliceFinished( - this@SimpleVirtDriver, - totalRequestedBurst, - min(totalRequestedSubBurst, totalGrantedBurst), // We can run more than requested due to timing - totalOvercommissionedBurst, - totalInterferedBurst, // Might be smaller than zero due to FP rounding errors, - min( - totalAllocatedUsage, - totalRequestedUsage - ), // The allocated usage might be slightly higher due to FP rounding - totalRequestedUsage, - vmCount, // Some VMs might already have finished, so keep initial VM count - server - ) - ) - } - } - - /** - * A virtual machine running on the hypervisor. - * - * @param ctx The execution context the vCPU runs in. - * @param triggerMode The mode when to trigger the VM exit. - * @param merge The function to merge consecutive slices on spillover. - * @param select The function to select on finish. - */ - @OptIn(InternalCoroutinesApi::class) - private data class Vm( - val ctx: VmServerContext, - var triggerMode: ServerContext.TriggerMode = ServerContext.TriggerMode.FIRST, - var merge: (ServerContext.Slice, ServerContext.Slice) -> ServerContext.Slice = { _, r -> r }, - var select: () -> Unit = {} - ) { - /** - * The vCPUs of this virtual machine. - */ - val vcpus: List<VCpu> - - /** - * The slices that the VM wants to run. - */ - var queue: Iterator<ServerContext.Slice> = emptyList<ServerContext.Slice>().iterator() - - /** - * The current active slice. - */ - var activeSlice: ServerContext.Slice? = null - - /** - * The current deadline of the VM. - */ - val deadline: Long - get() = activeSlice?.deadline ?: Long.MAX_VALUE - - /** - * A flag to indicate that the VM is idle. - */ - val isIdle: Boolean - get() = activeSlice == null - - init { - vcpus = ctx.cpus.mapIndexed { i, model -> VCpu(this, model, i) } - } - - /** - * Schedule the given slices on this vCPU, replacing the existing slices. - */ - fun schedule(slices: Sequence<ServerContext.Slice>) { - queue = slices.iterator() - - if (queue.hasNext()) { - activeSlice = queue.next() - vcpus.forEach { it.refresh() } - } - } - - /** - * Cancel the existing workload on the VM. - */ - fun cancel() { - queue = emptyList<ServerContext.Slice>().iterator() - activeSlice = null - vcpus.forEach { it.refresh() } - } - - /** - * Finish the current slice of the VM. - * - * @return `true` if the vCPUs may be descheduled, `false` otherwise. - */ - fun finish(): Boolean { - val activeSlice = activeSlice ?: return true - - return if (queue.hasNext()) { - val needsMerge = activeSlice.burst.any { it > 0 } - val candidateSlice = queue.next() - val slice = if (needsMerge) merge(activeSlice, candidateSlice) else candidateSlice - - this.activeSlice = slice - - // Update the vCPU cache - vcpus.forEach { it.refresh() } - - false - } else { - this.activeSlice = null - select() - true - } - } - } - - /** - * A virtual CPU that can be scheduled on a physical CPU. - * - * @param vm The VM of which this vCPU is part. - * @param model The model of CPU that this vCPU models. - * @param id The id of the vCPU with respect to the VM. - */ - private data class VCpu( - val vm: Vm, - val model: ProcessingUnit, - val id: Int - ) : Comparable<VCpu> { - /** - * The current limit on the vCPU. - */ - var limit: Double = 0.0 - - /** - * The limit allocated by the hypervisor. - */ - var allocatedLimit: Double = 0.0 - - /** - * The current burst running on the vCPU. - */ - var burst: Long = 0L - - /** - * Consume the specified burst on this vCPU. - */ - fun consume(burst: Long): Boolean { - this.burst = max(0, this.burst - burst) - - // Flush the result to the slice if it exists - vm.activeSlice?.burst?.takeIf { id < it.size }?.set(id, this.burst) - - return allocatedLimit > 0.0 && this.burst == 0L - } - - /** - * Refresh the information of this vCPU based on the current slice. - */ - fun refresh() { - limit = vm.activeSlice?.limit?.takeIf { id < it.size }?.get(id) ?: 0.0 - burst = vm.activeSlice?.burst?.takeIf { id < it.size }?.get(id) ?: 0 - } - - /** - * Compare to another vCPU based on the current load of the vCPU. - */ - override fun compareTo(other: VCpu): Int { - var cmp = limit.compareTo(other.limit) - - if (cmp != 0) { - return cmp - } - - cmp = vm.ctx.server.uid.compareTo(other.vm.ctx.server.uid) - - if (cmp != 0) { - return cmp - } - - return id.compareTo(other.id) - } - - /** - * Create a string representation of the vCPU. - */ - override fun toString(): String = - "vCPU(vm=${vm.ctx.server.uid},id=$id,burst=$burst,limit=$limit,allocatedLimit=$allocatedLimit)" - } - - /** - * The execution context in which a VM runs. - * - * @param server The details of the VM. - * @param events The event stream to publish to. - */ - private inner class VmServerContext(server: Server, val events: EventFlow<ServerEvent>) : - ServerManagementContext, - DisposableHandle { - private var finalized: Boolean = false - private var initialized: Boolean = false - private val vm: Vm - - internal val job: Job = launch { - delay(1) // TODO Introduce boot time - init() - try { - server.image(this@VmServerContext) - exit() - } catch (cause: Throwable) { - exit(cause) - } - } - - override var server: Server = server - set(value) { - if (field.state != value.state) { - events.emit(ServerEvent.StateChanged(value, field.state)) - } - - field = value - } - - override val cpus: List<ProcessingUnit> = hostContext.cpus.take(server.flavor.cpuCount) - - override val clock: Clock - get() = hostContext.clock - - init { - vm = Vm(this) - } - - override suspend fun <T : Any> publishService(key: ServiceKey<T>, service: T) { - server = server.copy(services = server.services.put(key, service)) - events.emit(ServerEvent.ServicePublished(server, key)) - } - - override suspend fun init() { - assert(!finalized) { "VM is already finalized" } - - server = server.copy(state = ServerState.ACTIVE) - initialized = true - } - - override suspend fun exit(cause: Throwable?) { - finalized = true - - val serverState = - if (cause == null || (cause is ShutdownException && cause.cause == null)) - ServerState.SHUTOFF - else - ServerState.ERROR - server = server.copy(state = serverState) - availableMemory += server.flavor.memorySize - vms.remove(this) - vmStopped(server) - eventFlow.emit(HypervisorEvent.VmsUpdated(this@SimpleVirtDriver, vms.size, availableMemory)) - events.close() - } - - @OptIn(InternalCoroutinesApi::class) - override fun onRun( - batch: Sequence<ServerContext.Slice>, - triggerMode: ServerContext.TriggerMode, - merge: (ServerContext.Slice, ServerContext.Slice) -> ServerContext.Slice - ): SelectClause0 = object : SelectClause0 { - @InternalCoroutinesApi - override fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R) { - vm.triggerMode = triggerMode - vm.merge = merge - vm.select = { - if (select.trySelect()) { - block.startCoroutineCancellable(select.completion) - } - } - vm.schedule(batch) - // Indicate to the hypervisor that the VM should be re-scheduled - schedulingQueue.offer(SchedulerCommand.Schedule(vm)) - select.disposeOnSelect(this@VmServerContext) - } - } - - override fun dispose() { - if (!vm.isIdle) { - vm.cancel() - schedulingQueue.offer(SchedulerCommand.Deschedule(vm)) - } - } - } -} diff --git a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/virt/service/SimpleVirtProvisioningService.kt index f021d3f0..e7c2b8cc 100644 --- a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/virt/service/SimpleVirtProvisioningService.kt +++ b/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/virt/service/SimpleVirtProvisioningService.kt @@ -32,16 +32,17 @@ import org.opendc.compute.core.Server import org.opendc.compute.core.ServerEvent import org.opendc.compute.core.ServerState import org.opendc.compute.core.image.Image -import org.opendc.compute.core.image.VmImage +import org.opendc.compute.core.image.SimWorkloadImage import org.opendc.compute.metal.service.ProvisioningService import org.opendc.compute.virt.HypervisorEvent -import org.opendc.compute.virt.HypervisorImage import org.opendc.compute.virt.driver.InsufficientMemoryOnServerException +import org.opendc.compute.virt.driver.SimVirtDriver +import org.opendc.compute.virt.driver.SimVirtDriverWorkload import org.opendc.compute.virt.driver.VirtDriver import org.opendc.compute.virt.service.allocation.AllocationPolicy -import org.opendc.core.services.ServiceKey import org.opendc.utils.flow.EventFlow import java.time.Clock +import java.util.* import kotlin.coroutines.Continuation import kotlin.coroutines.resume import kotlin.math.max @@ -100,14 +101,25 @@ public class SimpleVirtProvisioningService( coroutineScope.launch { val provisionedNodes = provisioningService.nodes() provisionedNodes.forEach { node -> - val hypervisorImage = HypervisorImage - val node = provisioningService.deploy(node, hypervisorImage) - node.server!!.events.onEach { event -> - when (event) { - is ServerEvent.StateChanged -> stateChanged(event.server) - is ServerEvent.ServicePublished -> servicePublished(event.server, event.key) - } - }.launchIn(this) + val workload = SimVirtDriverWorkload() + val hypervisorImage = SimWorkloadImage(UUID.randomUUID(), "vmm", emptyMap(), workload) + launch { + var init = false + val deployedNode = provisioningService.deploy(node, hypervisorImage) + deployedNode.server!!.events.onEach { event -> + when (event) { + is ServerEvent.StateChanged -> { + if (!init) { + init = true + } + stateChanged(event.server) + } + } + }.launchIn(this) + + delay(1) + onHypervisorAvailable(deployedNode.server, workload.driver) + } } } } @@ -119,7 +131,7 @@ public class SimpleVirtProvisioningService( override suspend fun deploy( name: String, image: Image, - flavor: org.opendc.compute.core.Flavor + flavor: Flavor ): Server { eventFlow.emit( VirtProvisioningEvent.MetricsAvailable( @@ -171,7 +183,7 @@ public class SimpleVirtProvisioningService( val imagesToBeScheduled = incomingImages.toSet() for (imageInstance in imagesToBeScheduled) { - val requiredMemory = (imageInstance.image as VmImage).requiredMemory + val requiredMemory = imageInstance.image.tags["required-memory"] as Long val selectedHv = allocationLogic.select(availableHypervisors, imageInstance) if (selectedHv == null) { @@ -339,41 +351,39 @@ public class SimpleVirtProvisioningService( } } - private fun servicePublished(server: Server, key: ServiceKey<*>) { - if (key == VirtDriver.Key) { - val hv = hypervisors[server] ?: return - hv.driver = server.services[VirtDriver] - availableHypervisors += hv - - eventFlow.emit( - VirtProvisioningEvent.MetricsAvailable( - this@SimpleVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, - submittedVms, - runningVms, - finishedVms, - queuedVms, - unscheduledVms - ) + private fun onHypervisorAvailable(server: Server, hypervisor: SimVirtDriver) { + val hv = hypervisors[server] ?: return + hv.driver = hypervisor + availableHypervisors += hv + + eventFlow.emit( + VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + runningVms, + finishedVms, + queuedVms, + unscheduledVms ) + ) - hv.driver.events - .onEach { event -> - if (event is HypervisorEvent.VmsUpdated) { - hv.numberOfActiveServers = event.numberOfActiveServers - hv.availableMemory = event.availableMemory - } - }.launchIn(coroutineScope) + hv.driver.events + .onEach { event -> + if (event is HypervisorEvent.VmsUpdated) { + hv.numberOfActiveServers = event.numberOfActiveServers + hv.availableMemory = event.availableMemory + } + }.launchIn(coroutineScope) - requestCycle() - } + requestCycle() } public data class ImageView( public val name: String, public val image: Image, - public val flavor: org.opendc.compute.core.Flavor, + public val flavor: Flavor, public val continuation: Continuation<Server>, public var server: Server? = null ) diff --git a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/virt/service/allocation/ComparableAllocationPolicyLogic.kt b/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/virt/service/allocation/ComparableAllocationPolicyLogic.kt index 8803009d..c971cd7e 100644 --- a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/virt/service/allocation/ComparableAllocationPolicyLogic.kt +++ b/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/virt/service/allocation/ComparableAllocationPolicyLogic.kt @@ -22,7 +22,6 @@ package org.opendc.compute.virt.service.allocation -import org.opendc.compute.core.image.VmImage import org.opendc.compute.virt.service.HypervisorView import org.opendc.compute.virt.service.SimpleVirtProvisioningService @@ -41,7 +40,7 @@ public interface ComparableAllocationPolicyLogic : AllocationPolicy.Logic { ): HypervisorView? { return hypervisors.asSequence() .filter { hv -> - val fitsMemory = hv.availableMemory >= (image.image as VmImage).requiredMemory + val fitsMemory = hv.availableMemory >= (image.image.tags["required-memory"] as Long) val fitsCpu = hv.server.flavor.cpuCount >= image.flavor.cpuCount fitsMemory && fitsCpu } diff --git a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/virt/service/allocation/RandomAllocationPolicy.kt b/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/virt/service/allocation/RandomAllocationPolicy.kt index d143ffe6..26e61673 100644 --- a/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/virt/service/allocation/RandomAllocationPolicy.kt +++ b/simulator/opendc-compute/src/main/kotlin/org/opendc/compute/virt/service/allocation/RandomAllocationPolicy.kt @@ -22,7 +22,6 @@ package org.opendc.compute.virt.service.allocation -import org.opendc.compute.core.image.VmImage import org.opendc.compute.virt.service.HypervisorView import org.opendc.compute.virt.service.SimpleVirtProvisioningService import kotlin.random.Random @@ -39,7 +38,7 @@ public class RandomAllocationPolicy(private val random: Random = Random(0)) : Al ): HypervisorView? { return hypervisors.asIterable() .filter { hv -> - val fitsMemory = hv.availableMemory >= (image.image as VmImage).requiredMemory + val fitsMemory = hv.availableMemory >= (image.image.tags["required-memory"] as Long) val fitsCpu = hv.server.flavor.cpuCount >= image.flavor.cpuCount fitsMemory && fitsCpu } diff --git a/simulator/opendc-compute/src/test/kotlin/org/opendc/compute/core/image/FlopsApplicationImageTest.kt b/simulator/opendc-compute/src/test/kotlin/org/opendc/compute/core/image/FlopsApplicationImageTest.kt deleted file mode 100644 index 8870f138..00000000 --- a/simulator/opendc-compute/src/test/kotlin/org/opendc/compute/core/image/FlopsApplicationImageTest.kt +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.core.image - -import org.junit.jupiter.api.DisplayName -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertThrows -import java.util.UUID - -/** - * Test suite for [FlopsApplicationImage] - */ -@DisplayName("FlopsApplicationImage") -internal class FlopsApplicationImageTest { - @Test - fun `flops must be non-negative`() { - assertThrows<IllegalArgumentException>("FLOPs must be non-negative") { - FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), -1, 1) - } - } - - @Test - fun `cores cannot be zero`() { - assertThrows<IllegalArgumentException>("Cores cannot be zero") { - FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1, 0) - } - } - - @Test - fun `cores cannot be negative`() { - assertThrows<IllegalArgumentException>("Cores cannot be negative") { - FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1, -1) - } - } - - @Test - fun `utilization cannot be zero`() { - assertThrows<IllegalArgumentException>("Utilization cannot be zero") { - FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1, 1, 0.0) - } - } - - @Test - fun `utilization cannot be negative`() { - assertThrows<IllegalArgumentException>("Utilization cannot be negative") { - FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1, 1, -1.0) - } - } - - @Test - fun `utilization cannot be larger than one`() { - assertThrows<IllegalArgumentException>("Utilization cannot be larger than one") { - FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1, 1, 2.0) - } - } -} diff --git a/simulator/opendc-compute/src/test/kotlin/org/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt b/simulator/opendc-compute/src/test/kotlin/org/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt deleted file mode 100644 index 23c9408d..00000000 --- a/simulator/opendc-compute/src/test/kotlin/org/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.metal.driver - -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.flow.collect -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.launch -import kotlinx.coroutines.test.TestCoroutineScope -import kotlinx.coroutines.withContext -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Test -import org.opendc.compute.core.ProcessingNode -import org.opendc.compute.core.ProcessingUnit -import org.opendc.compute.core.ServerEvent -import org.opendc.compute.core.ServerState -import org.opendc.compute.core.image.FlopsApplicationImage -import org.opendc.simulator.utils.DelayControllerClockAdapter -import java.util.UUID - -@OptIn(ExperimentalCoroutinesApi::class) -internal class SimpleBareMetalDriverTest { - /** - * A smoke test for the bare-metal driver. - */ - @Test - fun smoke() { - val testScope = TestCoroutineScope() - val clock = DelayControllerClockAdapter(testScope) - - var finalState: ServerState = ServerState.BUILD - testScope.launch { - val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4) - val cpus = List(4) { ProcessingUnit(cpuNode, it, 2000.0) } - val driver = SimpleBareMetalDriver(this, clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) - val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 4_000, 2, utilization = 1.0) - - // Batch driver commands - withContext(coroutineContext) { - driver.init() - driver.setImage(image) - val server = driver.start().server!! - driver.usage - .onEach { println("${clock.millis()} $it") } - .launchIn(this) - server.events.collect { event -> - when (event) { - is ServerEvent.StateChanged -> { - println("${clock.millis()} $event") - finalState = event.server.state - } - } - } - } - } - - testScope.advanceUntilIdle() - assertEquals(ServerState.SHUTOFF, finalState) - } -} diff --git a/simulator/opendc-compute/src/test/kotlin/org/opendc/compute/virt/HypervisorTest.kt b/simulator/opendc-compute/src/test/kotlin/org/opendc/compute/virt/HypervisorTest.kt deleted file mode 100644 index 82a82044..00000000 --- a/simulator/opendc-compute/src/test/kotlin/org/opendc/compute/virt/HypervisorTest.kt +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.virt - -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.launch -import kotlinx.coroutines.test.TestCoroutineScope -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertAll -import org.opendc.compute.core.Flavor -import org.opendc.compute.core.ProcessingNode -import org.opendc.compute.core.ProcessingUnit -import org.opendc.compute.core.image.FlopsApplicationImage -import org.opendc.compute.core.image.FlopsHistoryFragment -import org.opendc.compute.core.image.VmImage -import org.opendc.compute.metal.driver.SimpleBareMetalDriver -import org.opendc.compute.virt.driver.VirtDriver -import org.opendc.simulator.utils.DelayControllerClockAdapter -import java.util.UUID - -/** - * Basic test-suite for the hypervisor. - */ -@OptIn(ExperimentalCoroutinesApi::class) -internal class HypervisorTest { - /** - * A smoke test for the bare-metal driver. - */ - @OptIn(ExperimentalCoroutinesApi::class) - @Test - fun smoke() { - val testScope = TestCoroutineScope() - val clock = DelayControllerClockAdapter(testScope) - - testScope.launch { - val vmm = HypervisorImage - val workloadA = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1_000, 1) - val workloadB = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 2_000, 1) - - val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 1) - val cpus = List(1) { ProcessingUnit(cpuNode, it, 2000.0) } - val metalDriver = - SimpleBareMetalDriver(this, clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) - - metalDriver.init() - metalDriver.setImage(vmm) - val node = metalDriver.start() - node.server?.events?.onEach { println(it) }?.launchIn(this) - - delay(5) - - val flavor = Flavor(1, 0) - val vmDriver = metalDriver.refresh().server!!.services[VirtDriver] - vmDriver.events.onEach { println(it) }.launchIn(this) - val vmA = vmDriver.spawn("a", workloadA, flavor) - vmA.events.onEach { println(it) }.launchIn(this) - val vmB = vmDriver.spawn("b", workloadB, flavor) - vmB.events.onEach { println(it) }.launchIn(this) - } - - testScope.advanceUntilIdle() - } - - /** - * Test overcommissioning of a hypervisor. - */ - @Test - fun overcommission() { - val testScope = TestCoroutineScope() - val clock = DelayControllerClockAdapter(testScope) - - var requestedBurst = 0L - var grantedBurst = 0L - var overcommissionedBurst = 0L - - testScope.launch { - val vmm = HypervisorImage - val duration = 5 * 60L - val vmImageA = VmImage( - UUID.randomUUID(), - "<unnamed>", - emptyMap(), - sequenceOf( - FlopsHistoryFragment(0, 28L * duration, duration * 1000, 28.0, 2), - FlopsHistoryFragment(0, 3500L * duration, duration * 1000, 3500.0, 2), - FlopsHistoryFragment(0, 0, duration * 1000, 0.0, 2), - FlopsHistoryFragment(0, 183L * duration, duration * 1000, 183.0, 2) - ), - 2, - 0 - ) - val vmImageB = VmImage( - UUID.randomUUID(), - "<unnamed>", - emptyMap(), - sequenceOf( - FlopsHistoryFragment(0, 28L * duration, duration * 1000, 28.0, 2), - FlopsHistoryFragment(0, 3100L * duration, duration * 1000, 3100.0, 2), - FlopsHistoryFragment(0, 0, duration * 1000, 0.0, 2), - FlopsHistoryFragment(0, 73L * duration, duration * 1000, 73.0, 2) - ), - 2, - 0 - ) - - val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) - val cpus = List(2) { ProcessingUnit(cpuNode, it, 3200.0) } - val metalDriver = - SimpleBareMetalDriver(this, clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) - - metalDriver.init() - metalDriver.setImage(vmm) - metalDriver.start() - - delay(5) - - val flavor = Flavor(2, 0) - val vmDriver = metalDriver.refresh().server!!.services[VirtDriver] - vmDriver.events - .onEach { event -> - when (event) { - is HypervisorEvent.SliceFinished -> { - requestedBurst += event.requestedBurst - grantedBurst += event.grantedBurst - overcommissionedBurst += event.overcommissionedBurst - } - } - } - .launchIn(this) - - vmDriver.spawn("a", vmImageA, flavor) - vmDriver.spawn("b", vmImageB, flavor) - } - - testScope.advanceUntilIdle() - - assertAll( - { assertEquals(2073600, requestedBurst, "Requested Burst does not match") }, - { assertEquals(2013600, grantedBurst, "Granted Burst does not match") }, - { assertEquals(60000, overcommissionedBurst, "Overcommissioned Burst does not match") } - ) - } -} diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts b/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts index b12b1ae3..9cf72f18 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts +++ b/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts @@ -37,7 +37,10 @@ dependencies { implementation(project(":opendc-format")) implementation(project(":opendc-workflows")) implementation(project(":opendc-simulator:opendc-simulator-core")) - implementation(kotlin("stdlib")) + implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.9.8") { + exclude("org.jetbrains.kotlin", module = "kotlin-reflect") + } + implementation(kotlin("reflect")) testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}") testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:${Library.JUNIT_JUPITER}") diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/build.gradle.kts b/simulator/opendc-experiments/opendc-experiments-sc20/build.gradle.kts index 1a7c2f78..c0992fc9 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc20/build.gradle.kts +++ b/simulator/opendc-experiments/opendc-experiments-sc20/build.gradle.kts @@ -37,6 +37,7 @@ dependencies { api(project(":opendc-core")) implementation(project(":opendc-format")) implementation(project(":opendc-simulator:opendc-simulator-core")) + implementation(project(":opendc-simulator:opendc-simulator-compute")) implementation("com.github.ajalt:clikt:2.6.0") implementation("me.tongfei:progressbar:0.8.1") diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/ExperimentHelpers.kt index 97d7d5da..7b6c4880 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/ExperimentHelpers.kt +++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/ExperimentHelpers.kt @@ -40,7 +40,7 @@ import org.opendc.compute.metal.NODE_CLUSTER import org.opendc.compute.metal.driver.BareMetalDriver import org.opendc.compute.metal.service.ProvisioningService import org.opendc.compute.virt.HypervisorEvent -import org.opendc.compute.virt.driver.SimpleVirtDriver +import org.opendc.compute.virt.driver.SimVirtDriver import org.opendc.compute.virt.service.SimpleVirtProvisioningService import org.opendc.compute.virt.service.VirtProvisioningEvent import org.opendc.compute.virt.service.allocation.AllocationPolicy @@ -171,8 +171,9 @@ public suspend fun attachMonitor( // Monitor hypervisor events for (hypervisor in hypervisors) { // TODO Do not expose VirtDriver directly but use Hypervisor class. - monitor.reportHostStateChange(clock.millis(), hypervisor, (hypervisor as SimpleVirtDriver).server) - hypervisor.server.events + val server = (hypervisor as SimVirtDriver).server + monitor.reportHostStateChange(clock.millis(), hypervisor, server) + server.events .onEach { event -> val time = clock.millis() when (event) { @@ -242,8 +243,8 @@ public suspend fun processTrace( workload.image.name, workload.image, Flavor( - workload.image.maxCores, - workload.image.requiredMemory + workload.image.tags["cores"] as Int, + workload.image.tags["required-memory"] as Long ) ) // Monitor server events diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt index 5a336865..39b00a61 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt +++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt @@ -22,7 +22,7 @@ package org.opendc.experiments.sc20.trace -import org.opendc.compute.core.image.VmImage +import org.opendc.compute.core.image.SimWorkloadImage import org.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL import org.opendc.compute.core.workload.PerformanceInterferenceModel import org.opendc.compute.core.workload.VmWorkload @@ -73,13 +73,11 @@ public class Sc20ParquetTraceReader( performanceInterferenceModel[id] ?: PerformanceInterferenceModel(TreeSet()) val newImage = - VmImage( + SimWorkloadImage( image.uid, image.name, - mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), - image.flopsHistory, - image.maxCores, - image.requiredMemory + image.tags + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), + (image as SimWorkloadImage).workload ) val newWorkload = entry.workload.copy(image = newImage) Sc20RawParquetTraceReader.TraceEntryImpl(entry.submissionTime, newWorkload) diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt index 8965cf43..b40cad7c 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt +++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt @@ -26,12 +26,12 @@ import mu.KotlinLogging import org.apache.avro.generic.GenericData import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetReader -import org.opendc.compute.core.image.FlopsHistoryFragment -import org.opendc.compute.core.image.VmImage +import org.opendc.compute.core.image.SimWorkloadImage import org.opendc.compute.core.workload.VmWorkload import org.opendc.core.User import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader +import org.opendc.simulator.compute.workload.SimTraceWorkload import java.io.File import java.util.UUID @@ -47,12 +47,12 @@ public class Sc20RawParquetTraceReader(private val path: File) { /** * Read the fragments into memory. */ - private fun parseFragments(path: File): Map<String, List<FlopsHistoryFragment>> { + private fun parseFragments(path: File): Map<String, List<SimTraceWorkload.Fragment>> { val reader = AvroParquetReader.builder<GenericData.Record>(Path(path.absolutePath, "trace.parquet")) .disableCompatibility() .build() - val fragments = mutableMapOf<String, MutableList<FlopsHistoryFragment>>() + val fragments = mutableMapOf<String, MutableList<SimTraceWorkload.Fragment>>() return try { while (true) { @@ -65,7 +65,7 @@ public class Sc20RawParquetTraceReader(private val path: File) { val cpuUsage = record["cpuUsage"] as Double val flops = record["flops"] as Long - val fragment = FlopsHistoryFragment( + val fragment = SimTraceWorkload.Fragment( tick, flops, duration, @@ -85,7 +85,7 @@ public class Sc20RawParquetTraceReader(private val path: File) { /** * Read the metadata into a workload. */ - private fun parseMeta(path: File, fragments: Map<String, List<FlopsHistoryFragment>>): List<TraceEntryImpl> { + private fun parseMeta(path: File, fragments: Map<String, List<SimTraceWorkload.Fragment>>): List<TraceEntryImpl> { val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(path.absolutePath, "meta.parquet")) .disableCompatibility() .build() @@ -114,17 +114,17 @@ public class Sc20RawParquetTraceReader(private val path: File) { uid, id, UnnamedUser, - VmImage( + SimWorkloadImage( uid, id, mapOf( "submit-time" to submissionTime, "end-time" to endTime, - "total-load" to totalLoad + "total-load" to totalLoad, + "cores" to maxCores, + "required-memory" to requiredMemory ), - vmFragments, - maxCores, - requiredMemory + SimTraceWorkload(vmFragments) ) ) entries.add(TraceEntryImpl(submissionTime, vmWorkload)) diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt index 41dc4b49..2ec01606 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt +++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt @@ -31,14 +31,14 @@ import org.apache.parquet.filter2.predicate.FilterApi import org.apache.parquet.filter2.predicate.Statistics import org.apache.parquet.filter2.predicate.UserDefinedPredicate import org.apache.parquet.io.api.Binary -import org.opendc.compute.core.image.FlopsHistoryFragment -import org.opendc.compute.core.image.VmImage +import org.opendc.compute.core.image.SimWorkloadImage import org.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL import org.opendc.compute.core.workload.PerformanceInterferenceModel import org.opendc.compute.core.workload.VmWorkload import org.opendc.core.User import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader +import org.opendc.simulator.compute.workload.SimTraceWorkload import java.io.File import java.io.Serializable import java.util.SortedSet @@ -71,7 +71,7 @@ public class Sc20StreamingParquetTraceReader( /** * The intermediate buffer to store the read records in. */ - private val queue = ArrayBlockingQueue<Pair<String, FlopsHistoryFragment>>(1024) + private val queue = ArrayBlockingQueue<Pair<String, SimTraceWorkload.Fragment>>(1024) /** * An optional filter for filtering the selected VMs @@ -92,7 +92,7 @@ public class Sc20StreamingParquetTraceReader( /** * A poisonous fragment. */ - private val poison = Pair("\u0000", FlopsHistoryFragment(0, 0, 0, 0.0, 0)) + private val poison = Pair("\u0000", SimTraceWorkload.Fragment(0, 0, 0, 0.0, 0)) /** * The thread to read the records in. @@ -119,7 +119,7 @@ public class Sc20StreamingParquetTraceReader( val cpuUsage = record["cpuUsage"] as Double val flops = record["flops"] as Long - val fragment = FlopsHistoryFragment( + val fragment = SimTraceWorkload.Fragment( tick, flops, duration, @@ -139,12 +139,12 @@ public class Sc20StreamingParquetTraceReader( /** * Fill the buffers with the VMs */ - private fun pull(buffers: Map<String, List<MutableList<FlopsHistoryFragment>>>) { + private fun pull(buffers: Map<String, List<MutableList<SimTraceWorkload.Fragment>>>) { if (!hasNext) { return } - val fragments = mutableListOf<Pair<String, FlopsHistoryFragment>>() + val fragments = mutableListOf<Pair<String, SimTraceWorkload.Fragment>>() queue.drainTo(fragments) for ((id, fragment) in fragments) { @@ -167,7 +167,7 @@ public class Sc20StreamingParquetTraceReader( init { val takenIds = mutableSetOf<UUID>() val entries = mutableMapOf<String, GenericData.Record>() - val buffers = mutableMapOf<String, MutableList<MutableList<FlopsHistoryFragment>>>() + val buffers = mutableMapOf<String, MutableList<MutableList<SimTraceWorkload.Fragment>>>() val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "meta.parquet")) .disableCompatibility() @@ -200,10 +200,10 @@ public class Sc20StreamingParquetTraceReader( logger.info("Processing VM $id") - val internalBuffer = mutableListOf<FlopsHistoryFragment>() - val externalBuffer = mutableListOf<FlopsHistoryFragment>() + val internalBuffer = mutableListOf<SimTraceWorkload.Fragment>() + val externalBuffer = mutableListOf<SimTraceWorkload.Fragment>() buffers.getOrPut(id) { mutableListOf() }.add(externalBuffer) - val fragments = sequence<FlopsHistoryFragment> { + val fragments = sequence { repeat@ while (true) { if (externalBuffer.isEmpty()) { if (hasNext) { @@ -220,7 +220,7 @@ public class Sc20StreamingParquetTraceReader( for (fragment in internalBuffer) { yield(fragment) - if (fragment.tick >= endTime) { + if (fragment.time >= endTime) { break@repeat } } @@ -239,13 +239,15 @@ public class Sc20StreamingParquetTraceReader( uid, "VM Workload $id", UnnamedUser, - VmImage( + SimWorkloadImage( uid, id, - mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), - fragments, - maxCores, - requiredMemory + mapOf( + IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems, + "cores" to maxCores, + "required-memory" to requiredMemory + ), + SimTraceWorkload(fragments), ) ) diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/WorkloadSampler.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/WorkloadSampler.kt index 19da97bb..e8cc2c36 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/WorkloadSampler.kt +++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/WorkloadSampler.kt @@ -23,7 +23,7 @@ package org.opendc.experiments.sc20.trace import mu.KotlinLogging -import org.opendc.compute.core.image.VmImage +import org.opendc.compute.core.image.SimWorkloadImage import org.opendc.compute.core.workload.VmWorkload import org.opendc.experiments.sc20.experiment.model.CompositeWorkload import org.opendc.experiments.sc20.experiment.model.SamplingStrategy @@ -197,13 +197,11 @@ public fun sampleHpcWorkload( */ private fun sample(entry: TraceEntry<VmWorkload>, i: Int): TraceEntry<VmWorkload> { val id = UUID.nameUUIDFromBytes("${entry.workload.image.uid}-$i".toByteArray()) - val image = VmImage( + val image = SimWorkloadImage( id, entry.workload.image.name, entry.workload.image.tags, - entry.workload.image.flopsHistory, - entry.workload.image.maxCores, - entry.workload.image.requiredMemory + (entry.workload.image as SimWorkloadImage).workload ) val vmWorkload = entry.workload.copy(uid = id, image = image, name = entry.workload.name) return VmTraceEntry(vmWorkload, entry.submissionTime) diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/resources/log4j2.xml b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/resources/log4j2.xml index 5ce99dfb..8029092e 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/resources/log4j2.xml +++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/resources/log4j2.xml @@ -30,7 +30,7 @@ </Console> </Appenders> <Loggers> - <Logger name="org.opendc" level="warn" additivity="false"> + <Logger name="org.opendc" level="debug" additivity="false"> <AppenderRef ref="Console"/> </Logger> <Logger name="org.opendc.experiments.sc20" level="info" additivity="false"> diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt index 230e7f36..ff6e4bb9 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt +++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt @@ -142,12 +142,14 @@ class Sc20IntegrationTest { runSimulation() // Note that these values have been verified beforehand - assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") - assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") - assertEquals(207379117949, monitor.totalRequestedBurst) - assertEquals(203388071813, monitor.totalGrantedBurst) - assertEquals(3991046136, monitor.totalOvercommissionedBurst) - assertEquals(0, monitor.totalInterferedBurst) + assertAll( + { assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") }, + { assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") }, + { assertEquals(207379117949, monitor.totalRequestedBurst) }, + { assertEquals(203388071813, monitor.totalGrantedBurst) }, + { assertEquals(3991046136, monitor.totalOvercommissionedBurst) }, + { assertEquals(0, monitor.totalInterferedBurst) } + ) } @Test diff --git a/simulator/opendc-format/build.gradle.kts b/simulator/opendc-format/build.gradle.kts index 38fcb329..3ab704bd 100644 --- a/simulator/opendc-format/build.gradle.kts +++ b/simulator/opendc-format/build.gradle.kts @@ -31,7 +31,8 @@ dependencies { api(project(":opendc-core")) api(project(":opendc-compute")) api(project(":opendc-workflows")) - api("com.fasterxml.jackson.module:jackson-module-kotlin:2.9.8") { + implementation(project(":opendc-simulator:opendc-simulator-compute")) + implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.9.8") { exclude("org.jetbrains.kotlin", module = "kotlin-reflect") } implementation(kotlin("reflect")) diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt index 027548a8..3fde1a6c 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt @@ -26,10 +26,7 @@ import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue import kotlinx.coroutines.CoroutineScope -import org.opendc.compute.core.MemoryUnit -import org.opendc.compute.core.ProcessingNode -import org.opendc.compute.core.ProcessingUnit -import org.opendc.compute.metal.driver.SimpleBareMetalDriver +import org.opendc.compute.metal.driver.SimBareMetalDriver import org.opendc.compute.metal.service.ProvisioningService import org.opendc.compute.metal.service.SimpleProvisioningService import org.opendc.core.Environment @@ -37,6 +34,10 @@ import org.opendc.core.Platform import org.opendc.core.Zone import org.opendc.core.services.ServiceRegistry import org.opendc.format.environment.EnvironmentReader +import org.opendc.simulator.compute.SimMachineModel +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit import java.io.InputStream import java.time.Clock import java.util.* @@ -74,14 +75,13 @@ public class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = ja else -> throw IllegalArgumentException("The cpu id $id is not recognized") } } - SimpleBareMetalDriver( + SimBareMetalDriver( coroutineScope, clock, UUID.randomUUID(), "node-${counter++}", emptyMap(), - cores, - listOf(MemoryUnit("", "", 2300.0, 16000)) + SimMachineModel(cores, listOf(MemoryUnit("", "", 2300.0, 16000))) ) } } diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt index 634d5de6..63539d61 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt @@ -23,11 +23,8 @@ package org.opendc.format.environment.sc20 import kotlinx.coroutines.CoroutineScope -import org.opendc.compute.core.MemoryUnit -import org.opendc.compute.core.ProcessingNode -import org.opendc.compute.core.ProcessingUnit import org.opendc.compute.metal.NODE_CLUSTER -import org.opendc.compute.metal.driver.SimpleBareMetalDriver +import org.opendc.compute.metal.driver.SimBareMetalDriver import org.opendc.compute.metal.power.LinearLoadPowerModel import org.opendc.compute.metal.service.ProvisioningService import org.opendc.compute.metal.service.SimpleProvisioningService @@ -36,6 +33,10 @@ import org.opendc.core.Platform import org.opendc.core.Zone import org.opendc.core.services.ServiceRegistry import org.opendc.format.environment.EnvironmentReader +import org.opendc.simulator.compute.SimMachineModel +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit import java.io.File import java.io.FileInputStream import java.io.InputStream @@ -68,7 +69,7 @@ public class Sc20ClusterEnvironmentReader( var memoryPerHost: Long var coresPerHost: Int - val nodes = mutableListOf<SimpleBareMetalDriver>() + val nodes = mutableListOf<SimBareMetalDriver>() val random = Random(0) input.bufferedReader().use { reader -> @@ -102,19 +103,21 @@ public class Sc20ClusterEnvironmentReader( repeat(numberOfHosts) { nodes.add( - SimpleBareMetalDriver( + SimBareMetalDriver( coroutineScope, clock, UUID(random.nextLong(), random.nextLong()), "node-$clusterId-$it", mapOf(NODE_CLUSTER to clusterId), - List(coresPerHost) { coreId -> - ProcessingUnit(unknownProcessingNode, coreId, speed) - }, + SimMachineModel( + List(coresPerHost) { coreId -> + ProcessingUnit(unknownProcessingNode, coreId, speed) + }, + listOf(unknownMemoryUnit) + ), // For now we assume a simple linear load model with an idle draw of ~200W and a maximum // power draw of 350W. // Source: https://stackoverflow.com/questions/6128960 - listOf(unknownMemoryUnit), LinearLoadPowerModel(200.0, 350.0) ) ) diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt index 9fd38d13..36136888 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt @@ -26,10 +26,7 @@ import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue import kotlinx.coroutines.CoroutineScope -import org.opendc.compute.core.MemoryUnit -import org.opendc.compute.core.ProcessingNode -import org.opendc.compute.core.ProcessingUnit -import org.opendc.compute.metal.driver.SimpleBareMetalDriver +import org.opendc.compute.metal.driver.SimBareMetalDriver import org.opendc.compute.metal.power.LinearLoadPowerModel import org.opendc.compute.metal.service.ProvisioningService import org.opendc.compute.metal.service.SimpleProvisioningService @@ -38,6 +35,10 @@ import org.opendc.core.Platform import org.opendc.core.Zone import org.opendc.core.services.ServiceRegistry import org.opendc.format.environment.EnvironmentReader +import org.opendc.simulator.compute.SimMachineModel +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit import java.io.InputStream import java.time.Clock import java.util.* @@ -80,17 +81,16 @@ public class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = ja else -> throw IllegalArgumentException("The cpu id $id is not recognized") } } - SimpleBareMetalDriver( + SimBareMetalDriver( coroutineScope, clock, UUID.randomUUID(), "node-${counter++}", emptyMap(), - cores, + SimMachineModel(cores, memories), // For now we assume a simple linear load model with an idle draw of ~200W and a maximum // power draw of 350W. // Source: https://stackoverflow.com/questions/6128960 - memories, LinearLoadPowerModel(200.0, 350.0) ) } diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt index 89d4246d..f98a5a6d 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt @@ -22,14 +22,14 @@ package org.opendc.format.trace.bitbrains -import org.opendc.compute.core.image.FlopsHistoryFragment -import org.opendc.compute.core.image.VmImage +import org.opendc.compute.core.image.SimWorkloadImage import org.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL import org.opendc.compute.core.workload.PerformanceInterferenceModel import org.opendc.compute.core.workload.VmWorkload import org.opendc.core.User import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader +import org.opendc.simulator.compute.workload.SimTraceWorkload import java.io.BufferedReader import java.io.File import java.io.FileReader @@ -66,7 +66,7 @@ public class BitbrainsTraceReader( .filterNot { it.isDirectory } .forEach { vmFile -> println(vmFile) - val flopsHistory = mutableListOf<FlopsHistoryFragment>() + val flopsHistory = mutableListOf<SimTraceWorkload.Fragment>() var vmId = -1L var cores = -1 var requiredMemory = -1L @@ -99,11 +99,11 @@ public class BitbrainsTraceReader( val flops: Long = (cpuUsage * 5 * 60 * cores).toLong() if (flopsHistory.isEmpty()) { - flopsHistory.add(FlopsHistoryFragment(timestamp, flops, traceInterval, cpuUsage, cores)) + flopsHistory.add(SimTraceWorkload.Fragment(timestamp, flops, traceInterval, cpuUsage, cores)) } else { if (flopsHistory.last().flops != flops) { flopsHistory.add( - FlopsHistoryFragment( + SimTraceWorkload.Fragment( timestamp, flops, traceInterval, @@ -114,8 +114,8 @@ public class BitbrainsTraceReader( } else { val oldFragment = flopsHistory.removeAt(flopsHistory.size - 1) flopsHistory.add( - FlopsHistoryFragment( - oldFragment.tick, + SimTraceWorkload.Fragment( + oldFragment.time, oldFragment.flops + flops, oldFragment.duration + traceInterval, cpuUsage, @@ -139,17 +139,19 @@ public class BitbrainsTraceReader( uuid, "VM Workload $vmId", UnnamedUser, - VmImage( + SimWorkloadImage( uuid, vmId.toString(), - mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), - flopsHistory.asSequence(), - cores, - requiredMemory + mapOf( + IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems, + "cores" to cores, + "required-memory" to requiredMemory + ), + SimTraceWorkload(flopsHistory.asSequence()) ) ) entries[vmId] = TraceEntryImpl( - flopsHistory.firstOrNull()?.tick ?: -1, + flopsHistory.firstOrNull()?.time ?: -1, vmWorkload ) } diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt index 9f8fb558..53bc3956 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt @@ -22,10 +22,11 @@ package org.opendc.format.trace.gwf -import org.opendc.compute.core.image.FlopsApplicationImage +import org.opendc.compute.core.image.SimWorkloadImage import org.opendc.core.User import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader +import org.opendc.simulator.compute.workload.SimFlopsWorkload import org.opendc.workflows.workload.Job import org.opendc.workflows.workload.Task import org.opendc.workflows.workload.WORKFLOW_TASK_DEADLINE @@ -137,7 +138,7 @@ public class GwfTraceReader(reader: BufferedReader) : TraceReader<Job> { val task = Task( UUID(0L, taskId), "<unnamed>", - FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), flops, cores), + SimWorkloadImage(UUID.randomUUID(), "<unnamed>", emptyMap(), SimFlopsWorkload(flops, cores)), HashSet(), mapOf(WORKFLOW_TASK_DEADLINE to runtime) ) diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt index bfcc30ce..560ad846 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt @@ -22,14 +22,14 @@ package org.opendc.format.trace.sc20 -import org.opendc.compute.core.image.FlopsHistoryFragment -import org.opendc.compute.core.image.VmImage +import org.opendc.compute.core.image.SimWorkloadImage import org.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL import org.opendc.compute.core.workload.PerformanceInterferenceModel import org.opendc.compute.core.workload.VmWorkload import org.opendc.core.User import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader +import org.opendc.simulator.compute.workload.SimTraceWorkload import java.io.BufferedReader import java.io.File import java.io.FileReader @@ -109,7 +109,7 @@ public class Sc20TraceReader( } val flopsFragments = sequence { - var last: FlopsHistoryFragment? = null + var last: SimTraceWorkload.Fragment? = null BufferedReader(FileReader(vmFile)).use { reader -> reader.lineSequence() @@ -130,8 +130,8 @@ public class Sc20TraceReader( last = if (last != null && last!!.flops == 0L && flops == 0L) { val oldFragment = last!! - FlopsHistoryFragment( - oldFragment.tick, + SimTraceWorkload.Fragment( + oldFragment.time, oldFragment.flops + flops, oldFragment.duration + traceInterval, cpuUsage, @@ -139,7 +139,7 @@ public class Sc20TraceReader( ) } else { val fragment = - FlopsHistoryFragment(timestamp, flops, traceInterval, cpuUsage, cores) + SimTraceWorkload.Fragment(timestamp, flops, traceInterval, cpuUsage, cores) if (last != null) { yield(last!!) } @@ -165,13 +165,15 @@ public class Sc20TraceReader( uuid, "VM Workload $vmId", UnnamedUser, - VmImage( + SimWorkloadImage( uuid, vmId, - mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), - flopsFragments.asSequence(), - maxCores, - requiredMemory + mapOf( + IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems, + "cores" to cores, + "required-memory" to requiredMemory + ), + SimTraceWorkload(flopsFragments.asSequence()) ) ) entries[uuid] = TraceEntryImpl( diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt index 13bd7ac3..5f312d01 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt @@ -22,12 +22,12 @@ package org.opendc.format.trace.swf -import org.opendc.compute.core.image.FlopsHistoryFragment -import org.opendc.compute.core.image.VmImage +import org.opendc.compute.core.image.SimWorkloadImage import org.opendc.compute.core.workload.VmWorkload import org.opendc.core.User import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader +import org.opendc.simulator.compute.workload.SimTraceWorkload import java.io.BufferedReader import java.io.File import java.io.FileReader @@ -103,7 +103,7 @@ public class SwfTraceReader( memory /= 1000 // convert KB to MB } - val flopsHistory = mutableListOf<FlopsHistoryFragment>() + val flopsHistory = mutableListOf<SimTraceWorkload.Fragment>() // Insert waiting time slices @@ -112,7 +112,7 @@ public class SwfTraceReader( if (waitTime >= sliceDuration) { for (tick in submitTime until (submitTime + waitTime - sliceDuration) step sliceDuration) { flopsHistory.add( - FlopsHistoryFragment( + SimTraceWorkload.Fragment( tick * 1000L, 0L, sliceDuration * 1000L, @@ -137,7 +137,7 @@ public class SwfTraceReader( step sliceDuration ) { flopsHistory.add( - FlopsHistoryFragment( + SimTraceWorkload.Fragment( tick * 1000L, flopsFullSlice / sliceDuration, sliceDuration * 1000L, @@ -149,7 +149,7 @@ public class SwfTraceReader( if (runtimePartialSliceRemainder > 0) { flopsHistory.add( - FlopsHistoryFragment( + SimTraceWorkload.Fragment( submitTime + (slicedWaitTime + runTime - runtimePartialSliceRemainder), flopsPartialSlice, sliceDuration, @@ -164,13 +164,14 @@ public class SwfTraceReader( uuid, "SWF Workload $jobNumber", UnnamedUser, - VmImage( + SimWorkloadImage( uuid, jobNumber.toString(), - emptyMap(), - flopsHistory.asSequence(), - cores, - memory + mapOf( + "cores" to cores, + "required-memory" to memory + ), + SimTraceWorkload(flopsHistory.asSequence()) ) ) diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt index de6647d0..09f2764b 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt @@ -25,10 +25,11 @@ package org.opendc.format.trace.wtf import org.apache.avro.generic.GenericRecord import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetReader -import org.opendc.compute.core.image.FlopsApplicationImage +import org.opendc.compute.core.image.SimWorkloadImage import org.opendc.core.User import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader +import org.opendc.simulator.compute.workload.SimFlopsWorkload import org.opendc.workflows.workload.Job import org.opendc.workflows.workload.Task import org.opendc.workflows.workload.WORKFLOW_TASK_DEADLINE @@ -81,7 +82,7 @@ public class WtfTraceReader(path: String) : TraceReader<Job> { val task = Task( UUID(0L, taskId), "<unnamed>", - FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), flops, cores), + SimWorkloadImage(UUID.randomUUID(), "<unnamed>", emptyMap(), SimFlopsWorkload(flops, cores)), HashSet(), mapOf(WORKFLOW_TASK_DEADLINE to runtime) ) diff --git a/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt b/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt index 40132ad3..78066c61 100644 --- a/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt +++ b/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt @@ -24,6 +24,8 @@ package org.opendc.format.trace.swf import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test +import org.opendc.compute.core.image.SimWorkloadImage +import org.opendc.simulator.compute.workload.SimTraceWorkload import java.io.File class SwfTraceReaderTest { @@ -33,13 +35,13 @@ class SwfTraceReaderTest { var entry = reader.next() assertEquals(0, entry.submissionTime) // 1961 slices for waiting, 3 full and 1 partial running slices - assertEquals(1965, entry.workload.image.flopsHistory.toList().size) + assertEquals(1965, ((entry.workload.image as SimWorkloadImage).workload as SimTraceWorkload).trace.toList().size) entry = reader.next() assertEquals(164472, entry.submissionTime) // 1188 slices for waiting, 0 full and 1 partial running slices - assertEquals(1189, entry.workload.image.flopsHistory.toList().size) - assertEquals(5_100_000L, entry.workload.image.flopsHistory.toList().last().flops) - assertEquals(0.25, entry.workload.image.flopsHistory.toList().last().usage) + assertEquals(1189, ((entry.workload.image as SimWorkloadImage).workload as SimTraceWorkload).trace.toList().size) + assertEquals(5_100_000L, ((entry.workload.image as SimWorkloadImage).workload as SimTraceWorkload).trace.toList().last().flops) + assertEquals(0.25, ((entry.workload.image as SimWorkloadImage).workload as SimTraceWorkload).trace.toList().last().usage) } } diff --git a/simulator/opendc-runner-web/build.gradle.kts b/simulator/opendc-runner-web/build.gradle.kts index 594e8197..9e2c5277 100644 --- a/simulator/opendc-runner-web/build.gradle.kts +++ b/simulator/opendc-runner-web/build.gradle.kts @@ -38,6 +38,7 @@ dependencies { implementation(project(":opendc-format")) implementation(project(":opendc-experiments:opendc-experiments-sc20")) implementation(project(":opendc-simulator:opendc-simulator-core")) + implementation(project(":opendc-simulator:opendc-simulator-compute")) implementation("com.github.ajalt:clikt:2.8.0") implementation("io.github.microutils:kotlin-logging:1.7.10") diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt index de9ece75..cdcf1b08 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt @@ -31,11 +31,8 @@ import com.mongodb.client.model.Projections import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.launch import org.bson.Document -import org.opendc.compute.core.MemoryUnit -import org.opendc.compute.core.ProcessingNode -import org.opendc.compute.core.ProcessingUnit import org.opendc.compute.metal.NODE_CLUSTER -import org.opendc.compute.metal.driver.SimpleBareMetalDriver +import org.opendc.compute.metal.driver.SimBareMetalDriver import org.opendc.compute.metal.power.LinearLoadPowerModel import org.opendc.compute.metal.service.ProvisioningService import org.opendc.compute.metal.service.SimpleProvisioningService @@ -44,6 +41,10 @@ import org.opendc.core.Platform import org.opendc.core.Zone import org.opendc.core.services.ServiceRegistry import org.opendc.format.environment.EnvironmentReader +import org.opendc.simulator.compute.SimMachineModel +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit import java.time.Clock import java.util.* @@ -55,7 +56,7 @@ public class TopologyParser(private val collection: MongoCollection<Document>, p * Parse the topology with the specified [id]. */ override suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment { - val nodes = mutableListOf<SimpleBareMetalDriver>() + val nodes = mutableListOf<SimBareMetalDriver>() val random = Random(0) for (machine in fetchMachines(id)) { @@ -81,14 +82,13 @@ public class TopologyParser(private val collection: MongoCollection<Document>, p ) } nodes.add( - SimpleBareMetalDriver( + SimBareMetalDriver( coroutineScope, clock, UUID(random.nextLong(), random.nextLong()), "node-$clusterId-$position", mapOf(NODE_CLUSTER to clusterId), - processors, - memoryUnits, + SimMachineModel(processors, memoryUnits), // For now we assume a simple linear load model with an idle draw of ~200W and a maximum // power draw of 350W. // Source: https://stackoverflow.com/questions/6128960 diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt index 7c2cfbe3..55eda70f 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt @@ -103,14 +103,15 @@ public class SimHypervisor( * Run the scheduling process of the hypervisor. */ override suspend fun run(ctx: SimExecutionContext) { - val maxUsage = ctx.machine.cpus.sumByDouble { it.frequency } - val pCPUs = ctx.machine.cpus.indices.sortedBy { ctx.machine.cpus[it].frequency } + val model = ctx.machine + val maxUsage = model.cpus.sumByDouble { it.frequency } + val pCPUs = model.cpus.indices.sortedBy { model.cpus[it].frequency } val vms = mutableSetOf<VmSession>() val vcpus = mutableListOf<VCpu>() - val usage = DoubleArray(ctx.machine.cpus.size) - val burst = LongArray(ctx.machine.cpus.size) + val usage = DoubleArray(model.cpus.size) + val burst = LongArray(model.cpus.size) fun process(command: SchedulerCommand) { when (command) { @@ -180,13 +181,16 @@ public class SimHypervisor( duration = min(duration, req.burst / grantedUsage) } + // XXX We set the minimum duration to 5 minutes here to prevent the rounding issues that are occurring with the FLOPs. + duration = 300.0 + val totalAllocatedUsage = maxUsage - availableUsage var totalAllocatedBurst = 0L availableUsage = totalAllocatedUsage // Divide the requests over the available capacity of the pCPUs fairly for (i in pCPUs) { - val maxCpuUsage = ctx.machine.cpus[i].frequency + val maxCpuUsage = model.cpus[i].frequency val fraction = maxCpuUsage / maxUsage val grantedUsage = min(maxCpuUsage, totalAllocatedUsage * fraction) val grantedBurst = ceil(duration * grantedUsage).toLong() diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt index e7eaa8bf..7b1ddf32 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt @@ -29,7 +29,7 @@ import kotlin.math.min * A [SimWorkload] that replays a workload trace consisting of multiple fragments, each indicating the resource * consumption for some period of time. */ -public class SimTraceWorkload(private val trace: Sequence<Fragment>) : SimWorkload { +public class SimTraceWorkload(public val trace: Sequence<Fragment>) : SimWorkload { override suspend fun run(ctx: SimExecutionContext) { var offset = ctx.clock.millis() diff --git a/simulator/opendc-workflows/build.gradle.kts b/simulator/opendc-workflows/build.gradle.kts index efffe7a1..b30ecd39 100644 --- a/simulator/opendc-workflows/build.gradle.kts +++ b/simulator/opendc-workflows/build.gradle.kts @@ -34,6 +34,10 @@ dependencies { testImplementation(project(":opendc-simulator:opendc-simulator-core")) testImplementation(project(":opendc-format")) + testImplementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.9.8") { + exclude("org.jetbrains.kotlin", module = "kotlin-reflect") + } + testImplementation(kotlin("reflect")) testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}") testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:${Library.JUNIT_JUPITER}") testImplementation("org.junit.platform:junit-platform-launcher:${Library.JUNIT_PLATFORM}") |
