diff options
| author | Georgios Andreadis <g.andreadis@student.tudelft.nl> | 2020-02-28 14:34:45 +0100 |
|---|---|---|
| committer | Georgios Andreadis <g.andreadis@student.tudelft.nl> | 2020-02-28 14:34:45 +0100 |
| commit | 3a5eac673fb67a6cff7fc79f16312db78d706322 (patch) | |
| tree | 6c0e37e994d0a1ada6cef8d42d7dfbd9cdde3ccc /opendc/opendc-compute/src/main | |
| parent | 0c19b32433e2086e72e0d22595f4daa6ef04b64b (diff) | |
| parent | 2ed1e47b5d82229a873febebb2d8bd3d8f5832ea (diff) | |
Merge branch 'refactor/domains' into 'feat/2.x'
Change from logical processes to simulation domains
See merge request opendc/opendc-simulator!28
Diffstat (limited to 'opendc/opendc-compute/src/main')
8 files changed, 58 insertions, 271 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt index 4e8162ec..436f4653 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt @@ -1,6 +1,6 @@ package com.atlarge.opendc.compute.core.image -import com.atlarge.odcsim.processContext +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.execution.ServerContext import com.atlarge.opendc.core.resource.TagContainer import kotlinx.coroutines.coroutineScope @@ -28,7 +28,7 @@ class VmImage( coroutineScope { for (cpu in ctx.cpus.take(cores)) { val usage = req / (fragment.usage * 1_000_000L) - launch { cpu.run(req, usage, processContext.clock.millis() + fragment.duration) } + launch { cpu.run(req, usage, simulationContext.clock.millis() + fragment.duration) } } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/monitor/Protocol.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/monitor/Protocol.kt deleted file mode 100644 index fa9f627b..00000000 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/monitor/Protocol.kt +++ /dev/null @@ -1,80 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.compute.core.monitor - -import com.atlarge.odcsim.SendPort -import com.atlarge.odcsim.processContext -import com.atlarge.opendc.compute.core.Server -import com.atlarge.opendc.compute.core.ServerState -import kotlinx.coroutines.isActive -import kotlinx.coroutines.launch - -/** - * Events emitted by a [Server] instance. - */ -public sealed class ServerEvent { - /** - * The server that emitted this event. - */ - public abstract val server: Server - - /** - * A response sent when the bare metal driver has been initialized. - */ - public data class StateChanged( - public override val server: Server, - public val previousState: ServerState - ) : ServerEvent() -} - -/** - * Serialize the specified [ServerMonitor] instance in order to safely send this object across logical processes. - */ -public suspend fun ServerMonitor.serialize(): ServerMonitor { - val ctx = processContext - val input = ctx.open<ServerEvent>() - - ctx.launch { - val inlet = processContext.listen(input.receive) - - while (isActive) { - when (val msg = inlet.receive()) { - is ServerEvent.StateChanged -> onUpdate(msg.server, msg.previousState) - } - } - } - - return object : ServerMonitor { - private var outlet: SendPort<ServerEvent>? = null - - override suspend fun onUpdate(server: Server, previousState: ServerState) { - if (outlet == null) { - outlet = processContext.connect(input.send) - } - - outlet!!.send(ServerEvent.StateChanged(server, previousState)) - } - } -} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/Protocol.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/Protocol.kt deleted file mode 100644 index a8996f61..00000000 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/Protocol.kt +++ /dev/null @@ -1,146 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.compute.metal.driver - -import com.atlarge.odcsim.ReceivePort -import com.atlarge.odcsim.SendPort -import com.atlarge.odcsim.processContext -import com.atlarge.opendc.compute.core.image.Image -import com.atlarge.opendc.compute.core.monitor.ServerMonitor -import com.atlarge.opendc.compute.metal.Node -import com.atlarge.opendc.compute.metal.PowerState -import kotlinx.coroutines.isActive -import kotlinx.coroutines.launch - -/** - * Messages that may be sent to the management interface of a bare-metal compute [Node], similar to the - * [BareMetalDriver] interface. - */ -public sealed class NodeRequest { - /** - * Initialize the compute node. - */ - public data class Initialize(public val monitor: ServerMonitor) : NodeRequest() - - /** - * Update the power state of the compute node. - */ - public data class SetPowerState(public val state: PowerState) : NodeRequest() - - /** - * Update the boot disk image of the compute node. - */ - public data class SetImage(public val image: Image) : NodeRequest() - - /** - * Obtain the state of the compute node. - */ - public object Refresh : NodeRequest() -} - -/** - * Responses emitted by a bare-metal compute [Node]. - */ -public sealed class NodeResponse { - /** - * The node that sent this response. - */ - public abstract val node: Node - - /** - * A response sent when the bare metal driver has been initialized. - */ - public data class Initialized(public override val node: Node) : NodeResponse() - - /** - * A response sent to indicate the power state of the node changed. - */ - public data class PowerStateChanged(public override val node: Node) : NodeResponse() - - /** - * A response sent to indicate the image of a node was changed. - */ - public data class ImageChanged(public override val node: Node) : NodeResponse() - - /** - * A response sent for obtaining the refreshed [Node] instance. - */ - public data class Refreshed(public override val node: Node) : NodeResponse() -} - -/** - * Serialize the specified [BareMetalDriver] instance in order to safely send this object across logical processes. - */ -public suspend fun BareMetalDriver.serialize(): BareMetalDriver { - val ctx = processContext - val input = ctx.open<NodeRequest>() - val output = ctx.open<NodeResponse>() - - ctx.launch { - val outlet = processContext.connect(output.send) - val inlet = processContext.listen(input.receive) - - while (isActive) { - when (val msg = inlet.receive()) { - is NodeRequest.Initialize -> - outlet.send(NodeResponse.Initialized(init(msg.monitor))) - is NodeRequest.SetPowerState -> - outlet.send(NodeResponse.PowerStateChanged(setPower(msg.state))) - is NodeRequest.SetImage -> - outlet.send(NodeResponse.ImageChanged(setImage(msg.image))) - is NodeRequest.Refresh -> - outlet.send(NodeResponse.Refreshed(refresh())) - } - } - } - - return object : BareMetalDriver { - private lateinit var inlet: ReceivePort<NodeResponse> - private lateinit var outlet: SendPort<NodeRequest> - - override suspend fun init(monitor: ServerMonitor): Node { - outlet = processContext.connect(input.send) - inlet = processContext.listen(output.receive) - - outlet.send(NodeRequest.Initialize(monitor)) - return (inlet.receive() as NodeResponse.Initialized).node - } - - override suspend fun setPower(powerState: PowerState): Node { - outlet.send(NodeRequest.SetPowerState(powerState)) - return (inlet.receive() as NodeResponse.PowerStateChanged).node - } - - override suspend fun setImage(image: Image): Node { - outlet.send(NodeRequest.SetImage(image)) - return (inlet.receive() as NodeResponse.ImageChanged).node - } - - override suspend fun refresh(): Node { - outlet.send(NodeRequest.Refresh) - return (inlet.receive() as NodeResponse.Refreshed).node - } - } -} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt index b6d74cde..1adc8652 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt @@ -24,8 +24,8 @@ package com.atlarge.opendc.compute.metal.driver -import com.atlarge.odcsim.ProcessContext -import com.atlarge.odcsim.processContext +import com.atlarge.odcsim.Domain +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.Flavor @@ -39,11 +39,14 @@ import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.metal.Node import com.atlarge.opendc.compute.metal.PowerState import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.Job import kotlinx.coroutines.delay +import kotlinx.coroutines.launch import java.util.UUID import kotlin.math.ceil import kotlin.math.max import kotlin.math.min +import kotlinx.coroutines.withContext /** * A basic implementation of the [BareMetalDriver] that simulates an [Image] running on a bare-metal machine. @@ -51,12 +54,15 @@ import kotlin.math.min * @param uid The unique identifier of the machine. * @param name An optional name of the machine. * @param cpuNodes The CPU nodes/packages available to the bare metal machine. + * @param memoryUnits The memory units in this machine. + * @param domain The simulation domain the driver runs in. */ public class SimpleBareMetalDriver( uid: UUID, name: String, val cpuNodes: List<ProcessingUnit>, - val memoryUnits: List<MemoryUnit> + val memoryUnits: List<MemoryUnit>, + private val domain: Domain ) : BareMetalDriver { /** * The monitor to use. @@ -73,12 +79,17 @@ public class SimpleBareMetalDriver( */ private val flavor = Flavor(cpuNodes.sumBy { it.cores }, memoryUnits.map { it.size }.sum()) - override suspend fun init(monitor: ServerMonitor): Node { - this.monitor = monitor - return node + /** + * The job that is running the image. + */ + private var job: Job? = null + + override suspend fun init(monitor: ServerMonitor): Node = withContext(domain.coroutineContext) { + this@SimpleBareMetalDriver.monitor = monitor + return@withContext node } - override suspend fun setPower(powerState: PowerState): Node { + override suspend fun setPower(powerState: PowerState): Node = withContext(domain.coroutineContext) { val previousPowerState = node.powerState val server = when (node.powerState to powerState) { PowerState.POWER_OFF to PowerState.POWER_OFF -> null @@ -100,36 +111,36 @@ public class SimpleBareMetalDriver( launch() } - return node + return@withContext node } - override suspend fun setImage(image: Image): Node { + override suspend fun setImage(image: Image): Node = withContext(domain.coroutineContext) { node = node.copy(image = image) - return node + return@withContext node } - override suspend fun refresh(): Node = node + override suspend fun refresh(): Node = withContext(domain.coroutineContext) { node } /** * Launch the server image on the machine. */ private suspend fun launch() { - val serverCtx = this.serverCtx + val serverContext = serverCtx - processContext.spawn { - serverCtx.init() + job = domain.launch { + serverContext.init() try { - node.server!!.image(serverCtx) - serverCtx.exit() + node.server!!.image(serverContext) + serverContext.exit() } catch (cause: Throwable) { - serverCtx.exit(cause) + serverContext.exit(cause) } } } private data class ProcessorContextImpl(override val info: ProcessingUnit) : ProcessorContext { override suspend fun run(burst: Long, maxUsage: Double, deadline: Long): Long { - val start = processContext.clock.millis() + val start = simulationContext.clock.millis() val usage = min(maxUsage, info.clockRate) * 1_000_000 // Usage from MHz to Hz try { @@ -141,7 +152,7 @@ public class SimpleBareMetalDriver( } catch (_: CancellationException) { // On cancellation, we compute and return the remaining burst } - val end = processContext.clock.millis() + val end = simulationContext.clock.millis() val granted = ceil((end - start) / 1000.0 * usage).toLong() return max(0, burst - granted) } @@ -149,7 +160,6 @@ public class SimpleBareMetalDriver( private val serverCtx = object : ServerManagementContext { private var initialized: Boolean = false - private lateinit var ctx: ProcessContext override val cpus: List<ProcessorContextImpl> = cpuNodes .asSequence() @@ -172,7 +182,6 @@ public class SimpleBareMetalDriver( val previousState = server.state server = server.copy(state = ServerState.ACTIVE) monitor.onUpdate(server, previousState) - ctx = processContext initialized = true } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt index 6b5c0979..b18a4006 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt @@ -24,6 +24,7 @@ package com.atlarge.opendc.compute.metal.service +import com.atlarge.odcsim.Domain import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.image.Image @@ -31,11 +32,12 @@ import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.metal.Node import com.atlarge.opendc.compute.metal.PowerState import com.atlarge.opendc.compute.metal.driver.BareMetalDriver +import kotlinx.coroutines.withContext /** * A very basic implementation of the [ProvisioningService]. */ -public class SimpleProvisioningService : ProvisioningService, ServerMonitor { +public class SimpleProvisioningService(val domain: Domain) : ProvisioningService, ServerMonitor { /** * The active nodes in this service. */ @@ -46,29 +48,31 @@ public class SimpleProvisioningService : ProvisioningService, ServerMonitor { */ private val monitors: MutableMap<Server, ServerMonitor> = mutableMapOf() - override suspend fun create(driver: BareMetalDriver): Node { - val node = driver.init(this) + override suspend fun create(driver: BareMetalDriver): Node = withContext(domain.coroutineContext) { + val node = driver.init(this@SimpleProvisioningService) nodes[node] = driver - return node + return@withContext node } - override suspend fun nodes(): Set<Node> = nodes.keys + override suspend fun nodes(): Set<Node> = withContext(domain.coroutineContext) { nodes.keys } - override suspend fun refresh(node: Node): Node { - return nodes[node]!!.refresh() + override suspend fun refresh(node: Node): Node = withContext(domain.coroutineContext) { + return@withContext nodes[node]!!.refresh() } - override suspend fun deploy(node: Node, image: Image, monitor: ServerMonitor): Node { + override suspend fun deploy(node: Node, image: Image, monitor: ServerMonitor): Node = withContext(domain.coroutineContext) { val driver = nodes[node]!! driver.setImage(image) driver.setPower(PowerState.POWER_OFF) val newNode = driver.setPower(PowerState.POWER_ON) monitors[newNode.server!!] = monitor - return newNode + return@withContext newNode } override suspend fun onUpdate(server: Server, previousState: ServerState) { - monitors[server]?.onUpdate(server, previousState) + withContext(domain.coroutineContext) { + monitors[server]?.onUpdate(server, previousState) + } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt index c0d5fe0f..9745b56c 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt @@ -24,8 +24,8 @@ package com.atlarge.opendc.compute.virt.driver.hypervisor -import com.atlarge.odcsim.ProcessContext -import com.atlarge.odcsim.processContext +import com.atlarge.odcsim.SimulationContext +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.ServerState @@ -65,7 +65,7 @@ class HypervisorVirtDriver( val server = Server(UUID.randomUUID(), "<unnamed>", emptyMap(), flavor, image, ServerState.BUILD) memoryAvailable -= requiredMemory - vms.add(VmServerContext(server, monitor, processContext)) + vms.add(VmServerContext(server, monitor, simulationContext)) return server } @@ -76,11 +76,11 @@ class HypervisorVirtDriver( internal inner class VmServerContext( override var server: Server, val monitor: ServerMonitor, - ctx: ProcessContext + ctx: SimulationContext ) : ServerManagementContext { private var initialized: Boolean = false - internal val job: Job = ctx.launch { + internal val job: Job = ctx.domain.launch { init() try { server.image(this@VmServerContext) diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmSchedulerImpl.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmSchedulerImpl.kt index 0b172c61..4cc5ac9e 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmSchedulerImpl.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmSchedulerImpl.kt @@ -24,7 +24,7 @@ package com.atlarge.opendc.compute.virt.driver.hypervisor -import com.atlarge.odcsim.processContext +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.execution.ProcessorContext @@ -91,7 +91,7 @@ public class VmSchedulerImpl( flush() val vcpus = HashSet(vcpus) // Create snapshot of the vCPUs that were scheduled at this moment - val call = processContext.launch { + val call = simulationContext.domain.launch { var duration: Long = Long.MAX_VALUE var deadline: Long = Long.MAX_VALUE @@ -121,7 +121,7 @@ public class VmSchedulerImpl( // We run the total burst on the host processor. Note that this call may be cancelled at any moment in // time, so not all of the burst may be executed. val remainder = run(burst, usage, deadline) - val time = processContext.clock.millis() + val time = simulationContext.clock.millis() val totalGrantedBurst: Long = burst - remainder // Compute for each vCPU the diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt index ef1528d9..888364e2 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt @@ -1,6 +1,6 @@ package com.atlarge.opendc.compute.virt.service -import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.SimulationContext import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerState @@ -15,7 +15,7 @@ import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor import kotlinx.coroutines.launch class SimpleVirtProvisioningService( - private val ctx: ProcessContext, + private val ctx: SimulationContext, private val provisioningService: ProvisioningService, private val hypervisorMonitor: HypervisorMonitor ) : VirtProvisioningService, ServerMonitor { @@ -50,7 +50,7 @@ class SimpleVirtProvisioningService( internal val imagesByServer: MutableMap<Server, MutableSet<ImageView>> = mutableMapOf() init { - ctx.launch { + ctx.domain.launch { val provisionedNodes = provisioningService.nodes().toList() val deployedNodes = provisionedNodes.map { node -> val hypervisorImage = @@ -72,7 +72,7 @@ class SimpleVirtProvisioningService( } private fun requestCycle() { - ctx.launch { + ctx.domain.launch { schedule() } } |
