diff options
| author | Georgios Andreadis <info@gandreadis.com> | 2020-02-17 14:22:30 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-02-20 22:23:38 +0100 |
| commit | 6660f4170d3afebd7c778dc352cb1a2d55017dc5 (patch) | |
| tree | 679e08e71dd14efc813030c1606d79ff95489478 | |
| parent | 04e4bddccc4e06a126f3c6ee2878502323c7116e (diff) | |
feat: Implement VM support
This change adds support for virtual machines and hypervisors to the
_opendc-compute_ module. Moreover, this change also includes VM trace
reading capabilities.
29 files changed, 880 insertions, 30 deletions
diff --git a/.editorconfig b/.editorconfig index 4b0abe4d..1cdb4ce2 100644 --- a/.editorconfig +++ b/.editorconfig @@ -15,3 +15,7 @@ insert_final_newline = true [*.md] trim_trailing_whitespace = false indent_style = space + +# ktlint +[*.{kt, kts}] +disabled_rules = import-ordering diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt index d42b59b6..60340286 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt @@ -26,6 +26,8 @@ package com.atlarge.opendc.compute.core import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.core.Identity +import com.atlarge.opendc.core.services.ServiceRegistry +import com.atlarge.opendc.core.services.ServiceRegistryImpl import java.util.UUID /** @@ -55,8 +57,12 @@ public data class Server( /** * The last known state of the server. */ - public val state: ServerState + public val state: ServerState, + /** + * The services published by this server. + */ + public val serviceRegistry: ServiceRegistry = ServiceRegistryImpl() ) : Identity { override fun hashCode(): Int = uid.hashCode() override fun equals(other: Any?): Boolean = other is Server && uid == other.uid diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/Protocol.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/Protocol.kt index 1e4fa0fb..fa40d4a3 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/Protocol.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/Protocol.kt @@ -41,9 +41,9 @@ public sealed class ServerRequest { public object Initialize : ServerRequest() /** - * Request for each core the specified amount of cpu time to run from the server + * Request for each core the specified amount of cpu time to run from the server. */ - public data class Run(public val req: LongArray) : ServerRequest() + public data class Run(public val req: LongArray, public val reqDuration: Long) : ServerRequest() /** * Terminate the execution of the server. @@ -61,9 +61,17 @@ public sealed class ServerResponse { public abstract val server: Server /** + * The amount cpu time granted on this server. + */ + public abstract val rec: LongArray? + + /** * Indicate that this request was processed successfully. */ - public data class Ok(public override val server: Server) : ServerResponse() + public data class Ok( + public override val server: Server, + public override val rec: LongArray? = null + ) : ServerResponse() } /** @@ -86,8 +94,8 @@ public suspend fun ServerManagementContext.serialize(): ServerManagementContext outlet.send(ServerResponse.Ok(server)) } is ServerRequest.Run -> { - run(msg.req) - outlet.send(ServerResponse.Ok(server)) + val rec = run(msg.req, msg.reqDuration) + outlet.send(ServerResponse.Ok(server, rec)) } is ServerRequest.Exit -> { exit(msg.cause) @@ -103,12 +111,13 @@ public suspend fun ServerManagementContext.serialize(): ServerManagementContext override var server: Server = this@serialize.server - override suspend fun run(req: LongArray) { - outlet.send(ServerRequest.Run(req)) + override suspend fun run(req: LongArray, reqDuration: Long): LongArray { + outlet.send(ServerRequest.Run(req, reqDuration)) when (val res = inlet.receive()) { is ServerResponse.Ok -> { server = res.server + return res.rec ?: error("Received should be defined in this type of request.") } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt index 057ed118..539a991b 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt @@ -26,6 +26,7 @@ package com.atlarge.opendc.compute.core.execution import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.image.Image +import com.atlarge.opendc.core.services.AbstractServiceKey /** * Represents the execution context in which an bootable [Image] runs on a [Server]. @@ -41,6 +42,15 @@ public interface ServerContext { * finished processing. If none of the cores are non-zero, the method will return immediately. * * @param req An array specifying for each core the amount of cpu time to request. + * @param reqDuration A [Long] specifying the duration in which this request needs to be fulfilled. + * @return An array specifying for each core the amount of cpu time it actually received. */ - public suspend fun run(req: LongArray) + public suspend fun run(req: LongArray, reqDuration: Long): LongArray + + /** + * Publishes the given [service] with key [serviceKey] in the server's registry. + */ + public suspend fun <T : Any> publishService(serviceKey: AbstractServiceKey<T>, service: T) { + server.serviceRegistry[serviceKey] = service + } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt index 4519dc49..216a62a2 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt @@ -52,6 +52,6 @@ class FlopsApplicationImage( override suspend fun invoke(ctx: ServerContext) { val cores = min(this.cores, ctx.server.flavor.cpus.sumBy { it.cores }) val req = (flops * (1 / utilization) / cores).toLong() - ctx.run(LongArray(cores) { req }) + ctx.run(LongArray(cores) { req }, req) } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsHistoryFragment.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsHistoryFragment.kt new file mode 100644 index 00000000..320c09ff --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsHistoryFragment.kt @@ -0,0 +1,3 @@ +package com.atlarge.opendc.compute.core.image + +data class FlopsHistoryFragment(val tick: Long, val flops: Long, val duration: Long) diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/Image.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/Image.kt index bd4f5648..ff922aa9 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/Image.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/Image.kt @@ -33,7 +33,6 @@ import com.atlarge.opendc.compute.core.execution.ServerContext * images by default. You may also create custom images from cloud servers you have launched. These custom images are * useful for backup purposes or for producing “gold” server images if you plan to deploy a particular server * configuration frequently. - */ public interface Image { /** diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt new file mode 100644 index 00000000..82aa28e4 --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt @@ -0,0 +1,29 @@ +package com.atlarge.opendc.compute.core.image + +import com.atlarge.opendc.compute.core.execution.ServerContext +import kotlinx.coroutines.delay +import kotlin.math.ceil +import kotlin.math.min + +public val VM_SCHEDULING_SLICE_DURATION = 5 * 60 * 1000L + +class VmImage( + public val flopsHistory: List<FlopsHistoryFragment>, + public val cores: Int, + public override val details: Map<String, Any> = emptyMap() +) : Image { + override suspend fun invoke(ctx: ServerContext) { + flopsHistory.forEach { fragment -> + if (fragment.flops == 0L) { + delay(fragment.duration) + } else { + for (time in fragment.tick until fragment.tick + fragment.duration step VM_SCHEDULING_SLICE_DURATION) { + val cores = min(this.cores, ctx.server.flavor.cpus.sumBy { it.cores }) + val req = + (fragment.flops / (ceil(fragment.duration.toDouble() / VM_SCHEDULING_SLICE_DURATION)) / cores).toLong() + ctx.run(LongArray(cores) { req }, VM_SCHEDULING_SLICE_DURATION) + } + } + } + } +} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/monitor/Protocol.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/monitor/Protocol.kt index b16c0d59..fa9f627b 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/monitor/Protocol.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/monitor/Protocol.kt @@ -43,7 +43,10 @@ public sealed class ServerEvent { /** * A response sent when the bare metal driver has been initialized. */ - public data class StateChanged(public override val server: Server, public val previousState: ServerState) : ServerEvent() + public data class StateChanged( + public override val server: Server, + public val previousState: ServerState + ) : ServerEvent() } /** diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/VmWorkload.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/VmWorkload.kt new file mode 100644 index 00000000..098eb8ca --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/VmWorkload.kt @@ -0,0 +1,25 @@ +package com.atlarge.opendc.compute.core.workload + +import com.atlarge.opendc.compute.core.image.VmImage +import com.atlarge.opendc.core.User +import com.atlarge.opendc.core.workload.Workload +import java.util.UUID + +/** + * A workload that represents a VM. + * + * @property uid A unique identified of this VM. + * @property name The name of this VM. + * @property owner The owner of the VM. + * @property image The image of the VM. + */ +data class VmWorkload( + override val uid: UUID, + override val name: String, + override val owner: User, + val image: VmImage +) : Workload { + override fun equals(other: Any?): Boolean = other is VmWorkload && uid == other.uid + + override fun hashCode(): Int = uid.hashCode() +} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt index 4c702ffa..1dde8286 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt @@ -34,9 +34,9 @@ import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.metal.Node import com.atlarge.opendc.compute.metal.PowerState +import kotlinx.coroutines.delay import java.util.UUID import kotlin.math.max -import kotlinx.coroutines.delay /** * A basic implementation of the [BareMetalDriver] that simulates an [Image] running on a bare-metal machine. @@ -69,7 +69,13 @@ public class SimpleBareMetalDriver( val previousPowerState = node.powerState val server = when (node.powerState to powerState) { PowerState.POWER_OFF to PowerState.POWER_OFF -> null - PowerState.POWER_OFF to PowerState.POWER_ON -> Server(UUID.randomUUID(), node.name, flavor, node.image, ServerState.BUILD) + PowerState.POWER_OFF to PowerState.POWER_ON -> Server( + UUID.randomUUID(), + node.name, + flavor, + node.image, + ServerState.BUILD + ) PowerState.POWER_ON to PowerState.POWER_OFF -> null // TODO Terminate existing image PowerState.POWER_ON to PowerState.POWER_ON -> node.server else -> throw IllegalStateException() @@ -136,10 +142,11 @@ public class SimpleBareMetalDriver( initialized = false } - override suspend fun run(req: LongArray) { + override suspend fun run(req: LongArray, reqDuration: Long): LongArray { // TODO Properly implement this for multiple CPUs val time = max(0, req.max() ?: 0) / (flavor.cpus[0].clockRate * 1000) - delay(time.toLong()) + delay(max(time.toLong(), reqDuration)) + return req } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt new file mode 100644 index 00000000..4f482ab7 --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt @@ -0,0 +1,50 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.compute.virt + +import com.atlarge.odcsim.processContext +import com.atlarge.opendc.compute.core.execution.ServerContext +import com.atlarge.opendc.compute.core.image.Image +import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver +import com.atlarge.opendc.compute.virt.driver.VirtDriver +import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor +import kotlinx.coroutines.suspendCancellableCoroutine + +/** + * A hypervisor managing the VMs of a node. + */ +class HypervisorImage( + private val hypervisorMonitor: HypervisorMonitor, + public override val details: Map<String, Any> = emptyMap() +) : Image { + override suspend fun invoke(ctx: ServerContext) { + val driver = SimpleVirtDriver(processContext, ctx, hypervisorMonitor) + + ctx.publishService(VirtDriver.Key, driver) + + // Suspend image until it is cancelled + suspendCancellableCoroutine<Unit> {} + } +} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorState.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorState.kt new file mode 100644 index 00000000..9d76927f --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorState.kt @@ -0,0 +1,40 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.compute.virt + +/** + * The power state of a compute node. + */ +public enum class HypervisorState { + /** + * Hypervisor is running. + */ + RUNNING, + + /** + * Hypervisor is destroyed. + */ + DESTROYED, +} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/RunRequest.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/RunRequest.kt new file mode 100644 index 00000000..cba8ca78 --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/RunRequest.kt @@ -0,0 +1,29 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.compute.virt + +import kotlin.coroutines.Continuation + +data class RunRequest(val req: LongArray, val reqDuration: Long, val continuation: Continuation<LongArray>) diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt new file mode 100644 index 00000000..cc24b49f --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt @@ -0,0 +1,123 @@ +package com.atlarge.opendc.compute.virt.driver + +import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.processContext +import com.atlarge.opendc.compute.core.Server +import com.atlarge.opendc.compute.core.ServerFlavor +import com.atlarge.opendc.compute.core.ServerState +import com.atlarge.opendc.compute.core.execution.ServerContext +import com.atlarge.opendc.compute.core.execution.ServerManagementContext +import com.atlarge.opendc.compute.core.image.Image +import com.atlarge.opendc.compute.core.image.VM_SCHEDULING_SLICE_DURATION +import com.atlarge.opendc.compute.core.monitor.ServerMonitor +import com.atlarge.opendc.compute.virt.RunRequest +import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import kotlinx.coroutines.suspendCancellableCoroutine +import java.util.UUID +import kotlin.coroutines.resume +import kotlin.math.min + +class SimpleVirtDriver( + private val ctx: ProcessContext, + private val hostContext: ServerContext, + private val hypervisorMonitor: HypervisorMonitor +) : VirtDriver { + /** + * The contexts of all VMs running on this hypervisor. + */ + private val serverContexts: MutableSet<VmServerContext> = mutableSetOf() + + init { + ctx.launch { + while (isActive) { + val serverFlavor = hostContext.server.flavor + + val requests = serverContexts.map { ctx.async { it.channel.receive() } }.awaitAll() + require(requests.all { it.reqDuration == VM_SCHEDULING_SLICE_DURATION }) + + if (requests.isEmpty()) { + hostContext.run(LongArray(serverFlavor.cpus[0].cores) { 0 }, 5 * 60 * 1000) + } else { + val totalRequested = requests.map { it.req.sum() }.sum() + val capacity = (serverFlavor.cpus[0].cores * serverFlavor.cpus[0].clockRate * 1_000_000L).toLong() + + hypervisorMonitor.onSliceFinish( + processContext.clock.millis(), + totalRequested, + capacity, + serverContexts.size, + hostContext.server + ) + + val satisfiedCapacity = min(capacity, totalRequested) + requests.forEach { request -> + val individualAssignedCapacity = ( + satisfiedCapacity * (request.req.sum().toDouble() / totalRequested) / + request.req.size).toLong() + + request.continuation.resume( + hostContext.run( + LongArray(request.req.size) { individualAssignedCapacity }, + VM_SCHEDULING_SLICE_DURATION + ) + ) + } + } + } + } + } + + override suspend fun spawn(image: Image, monitor: ServerMonitor, flavor: ServerFlavor): Server { + val server = Server(UUID.randomUUID(), "<unnamed>", flavor, image, ServerState.BUILD) + val context = VmServerContext(server, monitor, flavor, hostContext, Channel(Channel.CONFLATED)) + serverContexts.add(context) + context.init() + processContext.launch { image(context) } + return server + } + + override suspend fun getNumberOfSpawnedImages(): Int { + return serverContexts.size + } + + class VmServerContext( + override var server: Server, + val monitor: ServerMonitor, + val flavor: ServerFlavor, + val hostContext: ServerContext, + val channel: Channel<RunRequest> + ) : + ServerManagementContext { + private var initialized: Boolean = false + + override suspend fun init() { + if (initialized) { + throw IllegalStateException() + } + + val previousState = server.state + server = server.copy(state = ServerState.ACTIVE) + monitor.onUpdate(server, previousState) + initialized = true + } + + override suspend fun exit(cause: Throwable?) { + val previousState = server.state + val state = if (cause == null) ServerState.SHUTOFF else ServerState.ERROR + server = server.copy(state = state) + monitor.onUpdate(server, previousState) + initialized = false + } + + override suspend fun run(req: LongArray, reqDuration: Long): LongArray { + return suspendCancellableCoroutine { cont -> + channel.offer(RunRequest(req, reqDuration, cont)) + } + } + } +} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt index 52aa6488..3541b414 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt @@ -25,8 +25,11 @@ package com.atlarge.opendc.compute.virt.driver import com.atlarge.opendc.compute.core.Server +import com.atlarge.opendc.compute.core.ServerFlavor import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.core.monitor.ServerMonitor +import com.atlarge.opendc.core.services.AbstractServiceKey +import java.util.UUID /** * A driver interface for a hypervisor running on some host server and communicating with the central compute service to @@ -35,6 +38,20 @@ import com.atlarge.opendc.compute.core.monitor.ServerMonitor public interface VirtDriver { /** * Spawn the given [Image] on the compute resource of this driver. + * + * @param image The image to deploy. + * @param monitor The monitor to use for the deployment of this particular image. + * @param flavor The flavor of the server which this driver is controlling. + * @return The virtual server spawned by this method. */ - public suspend fun spawn(image: Image, monitor: ServerMonitor): Server + public suspend fun spawn(image: Image, monitor: ServerMonitor, flavor: ServerFlavor): Server + + /** + * Returns the number of spawned images on the server managed by this driver. + * + * @return The number of spawned images. + */ + public suspend fun getNumberOfSpawnedImages(): Int + + companion object Key : AbstractServiceKey<VirtDriver>(UUID.randomUUID(), "virtual-driver") } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/monitor/HypervisorMonitor.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/monitor/HypervisorMonitor.kt new file mode 100644 index 00000000..f034cc3f --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/monitor/HypervisorMonitor.kt @@ -0,0 +1,25 @@ +package com.atlarge.opendc.compute.virt.monitor + +import com.atlarge.opendc.compute.core.Server + +/** + * Monitoring interface for hypervisor-specific events. + */ +interface HypervisorMonitor { + /** + * Invoked after a scheduling slice has finished processed. + * + * @param time The current time (in ms). + * @param totalRequestedCpuTime The total requested CPU time (can be above capacity). + * @param totalCpuTimeCapacity The actual total capacity of the machine managed by this hypervisor. + * @param numberOfDeployedImages The number of images deployed on this hypervisor. + * @param hostServer The server hosting this hypervisor. + */ + fun onSliceFinish( + time: Long, + totalRequestedCpuTime: Long, + totalCpuTimeCapacity: Long, + numberOfDeployedImages: Int, + hostServer: Server + ) +} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt new file mode 100644 index 00000000..2b1eed1e --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt @@ -0,0 +1,115 @@ +package com.atlarge.opendc.compute.virt.service + +import com.atlarge.odcsim.ProcessContext +import com.atlarge.opendc.compute.core.Server +import com.atlarge.opendc.compute.core.ServerState +import com.atlarge.opendc.compute.core.image.Image +import com.atlarge.opendc.compute.core.monitor.ServerMonitor +import com.atlarge.opendc.compute.metal.Node +import com.atlarge.opendc.compute.metal.service.ProvisioningService +import com.atlarge.opendc.compute.virt.HypervisorImage +import com.atlarge.opendc.compute.virt.driver.VirtDriver +import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor +import kotlinx.coroutines.launch + +class SimpleVirtProvisioningService( + private val ctx: ProcessContext, + private val provisioningService: ProvisioningService, + private val hypervisorMonitor: HypervisorMonitor +) : VirtProvisioningService, ServerMonitor { + /** + * The nodes that are controlled by the service. + */ + internal lateinit var nodes: List<Node> + + /** + * The available nodes. + */ + internal val availableNodes: MutableSet<Node> = mutableSetOf() + + /** + * The available hypervisors. + */ + internal val hypervisorByNode: MutableMap<Node, HypervisorImage> = mutableMapOf() + + /** + * The incoming images to be processed by the provisioner. + */ + internal val incomingImages: MutableSet<ImageView> = mutableSetOf() + + /** + * The active images in the system. + */ + internal val activeImages: MutableSet<ImageView> = mutableSetOf() + + /** + * The images hosted on each server. + */ + internal val imagesByServer: MutableMap<Server, MutableSet<ImageView>> = mutableMapOf() + + init { + ctx.launch { + val provisionedNodes = provisioningService.nodes().toList() + val deployedNodes = provisionedNodes.map { node -> + val hypervisorImage = HypervisorImage(hypervisorMonitor) + hypervisorByNode[node] = hypervisorImage + provisioningService.deploy(node, hypervisorImage, this@SimpleVirtProvisioningService) + } + nodes = deployedNodes + availableNodes.addAll(deployedNodes) + } + } + + override suspend fun deploy(image: Image, monitor: ServerMonitor) { + val vmInstance = ImageView(image, monitor) + incomingImages += vmInstance + requestCycle() + } + + private fun requestCycle() { + ctx.launch { + schedule() + } + } + + private suspend fun schedule() { + val imagesToBeScheduled = incomingImages.toSet() + + for (imageInstance in imagesToBeScheduled) { + println("Spawning $imageInstance") + + val selectedNode = availableNodes.minBy { + it.server!!.serviceRegistry[VirtDriver.Key].getNumberOfSpawnedImages() + } + + imageInstance.server = selectedNode?.server!!.serviceRegistry[VirtDriver.Key].spawn( + imageInstance.image, + imageInstance.monitor, + nodes[0].server!!.flavor + ) + + incomingImages -= imageInstance + activeImages += imageInstance + imagesByServer.putIfAbsent(imageInstance.server!!, mutableSetOf()) + imagesByServer[imageInstance.server!!]!!.add(imageInstance) + } + } + + override suspend fun onUpdate(server: Server, previousState: ServerState) { + when (server.state) { + ServerState.ACTIVE -> { + // TODO handle hypervisor server becoming active + } + ServerState.SHUTOFF, ServerState.ERROR -> { + // TODO handle hypervisor server shutting down or failing + } + else -> throw IllegalStateException() + } + } + + class ImageView( + val image: Image, + val monitor: ServerMonitor, + var server: Server? = null + ) +} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt new file mode 100644 index 00000000..8e0e1137 --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt @@ -0,0 +1,14 @@ +package com.atlarge.opendc.compute.virt.service + +import com.atlarge.opendc.compute.core.image.Image +import com.atlarge.opendc.compute.core.monitor.ServerMonitor + +/** + * A service for VM provisioning on a cloud. + */ +interface VirtProvisioningService { + /** + * Submit the specified [Image] to the provisioning service. + */ + public suspend fun deploy(image: Image, monitor: ServerMonitor) +} diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt index c57d6eca..80ad18c5 100644 --- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt +++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt @@ -32,11 +32,11 @@ import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.image.FlopsApplicationImage import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.metal.PowerState -import java.util.ServiceLoader -import java.util.UUID import kotlinx.coroutines.delay import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.Test +import java.util.ServiceLoader +import java.util.UUID internal class SimpleBareMetalDriverTest { /** @@ -45,7 +45,7 @@ internal class SimpleBareMetalDriverTest { @Test fun smoke() { val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - val system = provider({ ctx -> + val system = provider({ _ -> val flavor = ServerFlavor(listOf(ProcessingUnit("Intel", "Xeon", "amd64", 2300.0, 4))) val image = FlopsApplicationImage(1000, 2) val monitor = object : ServerMonitor { diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt index 0f9cbd7f..9c5f97aa 100644 --- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt +++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt @@ -32,11 +32,11 @@ import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.image.FlopsApplicationImage import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver -import java.util.ServiceLoader -import java.util.UUID import kotlinx.coroutines.delay import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.Test +import java.util.ServiceLoader +import java.util.UUID /** * Test suite for the [SimpleProvisioningService]. @@ -48,7 +48,7 @@ internal class SimpleProvisioningServiceTest { @Test fun smoke() { val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - val system = provider({ ctx -> + val system = provider({ _ -> val flavor = ServerFlavor(listOf(ProcessingUnit("Intel", "Xeon", "amd64", 2300.0, 4))) val image = FlopsApplicationImage(1000, 2) val monitor = object : ServerMonitor { diff --git a/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt b/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt index 9ed4e369..3dc8be51 100644 --- a/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt +++ b/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt @@ -39,12 +39,12 @@ import com.atlarge.opendc.workflows.service.stage.task.FifoTaskSortingPolicy import com.atlarge.opendc.workflows.service.stage.task.FunctionalTaskEligibilityPolicy import com.atlarge.opendc.workflows.workload.Job import com.atlarge.opendc.workflows.workload.Task -import java.io.File -import java.util.ServiceLoader -import kotlin.math.max import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay import kotlinx.coroutines.runBlocking +import java.io.File +import java.util.ServiceLoader +import kotlin.math.max /** * Main entry point of the experiment. diff --git a/opendc/opendc-experiments-sc20/build.gradle.kts b/opendc/opendc-experiments-sc20/build.gradle.kts new file mode 100644 index 00000000..348f6f77 --- /dev/null +++ b/opendc/opendc-experiments-sc20/build.gradle.kts @@ -0,0 +1,46 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "Experiments for the SC20 paper" + +/* Build configuration */ +plugins { + `kotlin-library-convention` + application +} + +application { + mainClassName = "com.atlarge.opendc.experiments.sc20.TestExperiment" +} + +dependencies { + api(project(":opendc:opendc-core")) + implementation(project(":opendc:opendc-format")) + implementation(kotlin("stdlib")) + + runtimeOnly(project(":odcsim:odcsim-engine-omega")) + testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}") + testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:${Library.JUNIT_JUPITER}") + testImplementation("org.junit.platform:junit-platform-launcher:${Library.JUNIT_PLATFORM}") +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20HypervisorMonitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20HypervisorMonitor.kt new file mode 100644 index 00000000..5a277dff --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20HypervisorMonitor.kt @@ -0,0 +1,23 @@ +package com.atlarge.opendc.experiments.sc20 + +import com.atlarge.opendc.compute.core.Server +import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor +import java.io.File + +class Sc20HypervisorMonitor : HypervisorMonitor { + private val outputFile = File("sc20-experiment-results.csv") + + init { + outputFile.appendText("time,totalRequestedCpuTime,totalCpuTimeCapacity,numberOfDeployedImages,server\n") + } + + override fun onSliceFinish( + time: Long, + totalRequestedCpuTime: Long, + totalCpuTimeCapacity: Long, + numberOfDeployedImages: Int, + hostServer: Server + ) { + outputFile.appendText("$time,$totalRequestedCpuTime,$totalCpuTimeCapacity,$numberOfDeployedImages,$numberOfDeployedImages,${hostServer.uid}\n") + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt new file mode 100644 index 00000000..c7d7ac51 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -0,0 +1,88 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20 + +import com.atlarge.odcsim.SimulationEngineProvider +import com.atlarge.opendc.compute.core.Server +import com.atlarge.opendc.compute.core.ServerState +import com.atlarge.opendc.compute.core.monitor.ServerMonitor +import com.atlarge.opendc.compute.metal.service.ProvisioningService +import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService +import com.atlarge.opendc.format.environment.sc18.Sc18EnvironmentReader +import com.atlarge.opendc.format.trace.vm.VmTraceReader +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.delay +import kotlinx.coroutines.runBlocking +import java.io.File +import java.util.ServiceLoader +import kotlin.math.max + +/** + * Main entry point of the experiment. + */ +fun main(args: Array<String>) { + if (args.isEmpty()) { + println("error: Please provide path to directory containing VM trace files") + return + } + + val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/env/setup-test.json")) + .use { it.read() } + + val token = Channel<Boolean>() + + val monitor = object : ServerMonitor { + override suspend fun onUpdate(server: Server, previousState: ServerState) { + println(server) + } + } + + val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() + val system = provider({ ctx -> + println(ctx.clock.instant()) + val scheduler = SimpleVirtProvisioningService( + ctx, + environment.platforms[0].zones[0].services[ProvisioningService.Key], + Sc20HypervisorMonitor() + ) + + val reader = VmTraceReader(File(args[0])) + delay(1376314846 * 1000L) + while (reader.hasNext()) { + val (time, workload) = reader.next() + delay(max(0, time * 1000 - ctx.clock.millis())) + scheduler.deploy(workload.image, monitor) + } + + token.receive() + + println(ctx.clock.instant()) + }, name = "sim") + + runBlocking { + system.run() + system.terminate() + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/resources/env/setup-test.json b/opendc/opendc-experiments-sc20/src/main/resources/env/setup-test.json new file mode 100644 index 00000000..0965b250 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/resources/env/setup-test.json @@ -0,0 +1,36 @@ +{ + "name": "Experimental Setup 2", + "rooms": [ + { + "type": "SERVER", + "objects": [ + { + "type": "RACK", + "machines": [ + { "cpus": [2] }, { "cpus": [2]}, + { "cpus": [2] }, { "cpus": [2]}, + { "cpus": [2] }, { "cpus": [2]}, + { "cpus": [2] }, { "cpus": [2]}, + { "cpus": [2] }, { "cpus": [2]}, + { "cpus": [2] }, { "cpus": [2]}, + { "cpus": [2] }, { "cpus": [2]}, + { "cpus": [2] }, { "cpus": [2]} + ] + }, + { + "type": "RACK", + "machines": [ + { "cpus": [1] }, { "cpus": [1]}, + { "cpus": [1] }, { "cpus": [1]}, + { "cpus": [1] }, { "cpus": [1]}, + { "cpus": [1] }, { "cpus": [1]}, + { "cpus": [1] }, { "cpus": [1]}, + { "cpus": [1] }, { "cpus": [1]}, + { "cpus": [1] }, { "cpus": [1]}, + { "cpus": [1] }, { "cpus": [1]} + ] + } + ] + } + ] +} diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt index 4d2f9e85..55061492 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt @@ -37,9 +37,9 @@ import com.atlarge.opendc.format.environment.EnvironmentReader import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue +import kotlinx.coroutines.runBlocking import java.io.InputStream import java.util.UUID -import kotlinx.coroutines.runBlocking /** * A parser for the JSON experiment setup files used for the SC18 paper: "A Reference Architecture for Datacenter @@ -87,9 +87,11 @@ class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb val serviceRegistry = ServiceRegistryImpl() serviceRegistry[ProvisioningService.Key] = provisioningService - val platform = Platform(UUID.randomUUID(), "sc18-platform", listOf( - Zone(UUID.randomUUID(), "zone", serviceRegistry) - )) + val platform = Platform( + UUID.randomUUID(), "sc18-platform", listOf( + Zone(UUID.randomUUID(), "zone", serviceRegistry) + ) + ) environment = Environment(setup.name, null, listOf(platform)) } diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt new file mode 100644 index 00000000..b5c6ca0d --- /dev/null +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt @@ -0,0 +1,146 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.format.trace.vm + +import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment +import com.atlarge.opendc.compute.core.image.VmImage +import com.atlarge.opendc.compute.core.workload.VmWorkload +import com.atlarge.opendc.core.User +import com.atlarge.opendc.format.trace.TraceEntry +import com.atlarge.opendc.format.trace.TraceReader +import java.io.BufferedReader +import java.io.File +import java.io.FileReader +import java.util.UUID + +/** + * A [TraceReader] for the VM workload trace format. + * + * @param traceDirectory The directory of the traces. + */ +class VmTraceReader(traceDirectory: File) : TraceReader<VmWorkload> { + /** + * The internal iterator to use for this reader. + */ + private val iterator: Iterator<TraceEntry<VmWorkload>> + + /** + * Initialize the reader. + */ + init { + val entries = mutableMapOf<Long, TraceEntry<VmWorkload>>() + + var timestampCol = 0 + var coreCol = 0 + var cpuUsageCol = 0 + val traceInterval = 5 * 60 * 1000L + + traceDirectory.walk() + .filterNot { it.isDirectory } + .forEach { vmFile -> + println(vmFile) + val flopsHistory = mutableListOf<FlopsHistoryFragment>() + var vmId = -1L + var cores = -1 + BufferedReader(FileReader(vmFile)).use { reader -> + reader.lineSequence() + .filter { line -> + // Ignore comments in the trace + !line.startsWith("#") && line.isNotBlank() + } + .forEachIndexed { idx, line -> + val values = line.split(";\t") + + // Parse GWF header + if (idx == 0) { + val header = values.mapIndexed { col, name -> Pair(name.trim(), col) }.toMap() + timestampCol = header["Timestamp [ms]"]!! + coreCol = header["CPU cores"]!! + cpuUsageCol = header["CPU usage [MHZ]"]!! + return@forEachIndexed + } + + vmId = vmFile.nameWithoutExtension.trim().toLong() + val timestamp = values[timestampCol].trim().toLong() - 5 * 60 + cores = values[coreCol].trim().toInt() + val cpuUsage = values[cpuUsageCol].trim().toDouble() + + val flops: Long = (cpuUsage * cores * 1_000_000L * 5 * 60).toLong() + + if (flopsHistory.isEmpty()) { + flopsHistory.add(FlopsHistoryFragment(timestamp, flops, traceInterval)) + } else { + if (flopsHistory.last().flops != flops) { + flopsHistory.add(FlopsHistoryFragment(timestamp, flops, traceInterval)) + } else { + val oldFragment = flopsHistory.removeAt(flopsHistory.size - 1) + flopsHistory.add( + FlopsHistoryFragment( + oldFragment.tick, + oldFragment.flops + flops, + oldFragment.duration + traceInterval + ) + ) + } + } + } + } + + val vmWorkload = VmWorkload( + UUID(0L, vmId), "<unnamed>", UnnamedUser, + VmImage(flopsHistory, cores) + ) + entries[vmId] = TraceEntryImpl( + flopsHistory.firstOrNull()?.tick ?: -1, + vmWorkload + ) + } + + // Create the entry iterator + iterator = entries.values.sortedBy { it.submissionTime }.iterator() + } + + override fun hasNext(): Boolean = iterator.hasNext() + + override fun next(): TraceEntry<VmWorkload> = iterator.next() + + override fun close() {} + + /** + * An unnamed user. + */ + private object UnnamedUser : User { + override val name: String = "<unnamed>" + override val uid: UUID = UUID.randomUUID() + } + + /** + * An entry in the trace. + */ + private data class TraceEntryImpl( + override var submissionTime: Long, + override val workload: VmWorkload + ) : TraceEntry<VmWorkload> +} diff --git a/settings.gradle.kts b/settings.gradle.kts index c7dc5f1b..677a9817 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -30,3 +30,4 @@ include(":opendc:opendc-compute") include(":opendc:opendc-format") include(":opendc:opendc-workflows") include(":opendc:opendc-experiments-sc18") +include(":opendc:opendc-experiments-sc20") |
