summaryrefslogtreecommitdiff
path: root/opendc
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-02-16 20:30:17 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-02-28 14:28:29 +0100
commitb82e573c67f0004945aa18c575268100fb279b56 (patch)
tree7af8e303ea9ab3821e8d8c7c1c7e9b1de058a9e7 /opendc
parent0c19b32433e2086e72e0d22595f4daa6ef04b64b (diff)
refactor: Change from logical processes to simulation domains
This change moves the simulator terminology from logical processes to simulation domains. This prevents the clash with "processes" that we are trying to simulate. In addition, simulation domains allows us to reduce the amount of boilerplate and instead allows for simulation modelled using standard techniques.
Diffstat (limited to 'opendc')
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt4
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/monitor/Protocol.kt80
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/Protocol.kt146
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt51
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt24
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt10
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmSchedulerImpl.kt6
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt8
-rw-r--r--opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt34
-rw-r--r--opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt13
-rw-r--r--opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt16
-rw-r--r--opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt34
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt25
-rw-r--r--opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt5
-rw-r--r--opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt24
-rw-r--r--opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt21
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt26
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt10
18 files changed, 177 insertions, 360 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
index 4e8162ec..436f4653 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
@@ -1,6 +1,6 @@
package com.atlarge.opendc.compute.core.image
-import com.atlarge.odcsim.processContext
+import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.execution.ServerContext
import com.atlarge.opendc.core.resource.TagContainer
import kotlinx.coroutines.coroutineScope
@@ -28,7 +28,7 @@ class VmImage(
coroutineScope {
for (cpu in ctx.cpus.take(cores)) {
val usage = req / (fragment.usage * 1_000_000L)
- launch { cpu.run(req, usage, processContext.clock.millis() + fragment.duration) }
+ launch { cpu.run(req, usage, simulationContext.clock.millis() + fragment.duration) }
}
}
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/monitor/Protocol.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/monitor/Protocol.kt
deleted file mode 100644
index fa9f627b..00000000
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/monitor/Protocol.kt
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2020 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package com.atlarge.opendc.compute.core.monitor
-
-import com.atlarge.odcsim.SendPort
-import com.atlarge.odcsim.processContext
-import com.atlarge.opendc.compute.core.Server
-import com.atlarge.opendc.compute.core.ServerState
-import kotlinx.coroutines.isActive
-import kotlinx.coroutines.launch
-
-/**
- * Events emitted by a [Server] instance.
- */
-public sealed class ServerEvent {
- /**
- * The server that emitted this event.
- */
- public abstract val server: Server
-
- /**
- * A response sent when the bare metal driver has been initialized.
- */
- public data class StateChanged(
- public override val server: Server,
- public val previousState: ServerState
- ) : ServerEvent()
-}
-
-/**
- * Serialize the specified [ServerMonitor] instance in order to safely send this object across logical processes.
- */
-public suspend fun ServerMonitor.serialize(): ServerMonitor {
- val ctx = processContext
- val input = ctx.open<ServerEvent>()
-
- ctx.launch {
- val inlet = processContext.listen(input.receive)
-
- while (isActive) {
- when (val msg = inlet.receive()) {
- is ServerEvent.StateChanged -> onUpdate(msg.server, msg.previousState)
- }
- }
- }
-
- return object : ServerMonitor {
- private var outlet: SendPort<ServerEvent>? = null
-
- override suspend fun onUpdate(server: Server, previousState: ServerState) {
- if (outlet == null) {
- outlet = processContext.connect(input.send)
- }
-
- outlet!!.send(ServerEvent.StateChanged(server, previousState))
- }
- }
-}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/Protocol.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/Protocol.kt
deleted file mode 100644
index a8996f61..00000000
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/Protocol.kt
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2020 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package com.atlarge.opendc.compute.metal.driver
-
-import com.atlarge.odcsim.ReceivePort
-import com.atlarge.odcsim.SendPort
-import com.atlarge.odcsim.processContext
-import com.atlarge.opendc.compute.core.image.Image
-import com.atlarge.opendc.compute.core.monitor.ServerMonitor
-import com.atlarge.opendc.compute.metal.Node
-import com.atlarge.opendc.compute.metal.PowerState
-import kotlinx.coroutines.isActive
-import kotlinx.coroutines.launch
-
-/**
- * Messages that may be sent to the management interface of a bare-metal compute [Node], similar to the
- * [BareMetalDriver] interface.
- */
-public sealed class NodeRequest {
- /**
- * Initialize the compute node.
- */
- public data class Initialize(public val monitor: ServerMonitor) : NodeRequest()
-
- /**
- * Update the power state of the compute node.
- */
- public data class SetPowerState(public val state: PowerState) : NodeRequest()
-
- /**
- * Update the boot disk image of the compute node.
- */
- public data class SetImage(public val image: Image) : NodeRequest()
-
- /**
- * Obtain the state of the compute node.
- */
- public object Refresh : NodeRequest()
-}
-
-/**
- * Responses emitted by a bare-metal compute [Node].
- */
-public sealed class NodeResponse {
- /**
- * The node that sent this response.
- */
- public abstract val node: Node
-
- /**
- * A response sent when the bare metal driver has been initialized.
- */
- public data class Initialized(public override val node: Node) : NodeResponse()
-
- /**
- * A response sent to indicate the power state of the node changed.
- */
- public data class PowerStateChanged(public override val node: Node) : NodeResponse()
-
- /**
- * A response sent to indicate the image of a node was changed.
- */
- public data class ImageChanged(public override val node: Node) : NodeResponse()
-
- /**
- * A response sent for obtaining the refreshed [Node] instance.
- */
- public data class Refreshed(public override val node: Node) : NodeResponse()
-}
-
-/**
- * Serialize the specified [BareMetalDriver] instance in order to safely send this object across logical processes.
- */
-public suspend fun BareMetalDriver.serialize(): BareMetalDriver {
- val ctx = processContext
- val input = ctx.open<NodeRequest>()
- val output = ctx.open<NodeResponse>()
-
- ctx.launch {
- val outlet = processContext.connect(output.send)
- val inlet = processContext.listen(input.receive)
-
- while (isActive) {
- when (val msg = inlet.receive()) {
- is NodeRequest.Initialize ->
- outlet.send(NodeResponse.Initialized(init(msg.monitor)))
- is NodeRequest.SetPowerState ->
- outlet.send(NodeResponse.PowerStateChanged(setPower(msg.state)))
- is NodeRequest.SetImage ->
- outlet.send(NodeResponse.ImageChanged(setImage(msg.image)))
- is NodeRequest.Refresh ->
- outlet.send(NodeResponse.Refreshed(refresh()))
- }
- }
- }
-
- return object : BareMetalDriver {
- private lateinit var inlet: ReceivePort<NodeResponse>
- private lateinit var outlet: SendPort<NodeRequest>
-
- override suspend fun init(monitor: ServerMonitor): Node {
- outlet = processContext.connect(input.send)
- inlet = processContext.listen(output.receive)
-
- outlet.send(NodeRequest.Initialize(monitor))
- return (inlet.receive() as NodeResponse.Initialized).node
- }
-
- override suspend fun setPower(powerState: PowerState): Node {
- outlet.send(NodeRequest.SetPowerState(powerState))
- return (inlet.receive() as NodeResponse.PowerStateChanged).node
- }
-
- override suspend fun setImage(image: Image): Node {
- outlet.send(NodeRequest.SetImage(image))
- return (inlet.receive() as NodeResponse.ImageChanged).node
- }
-
- override suspend fun refresh(): Node {
- outlet.send(NodeRequest.Refresh)
- return (inlet.receive() as NodeResponse.Refreshed).node
- }
- }
-}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
index b6d74cde..1adc8652 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
@@ -24,8 +24,8 @@
package com.atlarge.opendc.compute.metal.driver
-import com.atlarge.odcsim.ProcessContext
-import com.atlarge.odcsim.processContext
+import com.atlarge.odcsim.Domain
+import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.Flavor
@@ -39,11 +39,14 @@ import com.atlarge.opendc.compute.core.monitor.ServerMonitor
import com.atlarge.opendc.compute.metal.Node
import com.atlarge.opendc.compute.metal.PowerState
import kotlinx.coroutines.CancellationException
+import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
import java.util.UUID
import kotlin.math.ceil
import kotlin.math.max
import kotlin.math.min
+import kotlinx.coroutines.withContext
/**
* A basic implementation of the [BareMetalDriver] that simulates an [Image] running on a bare-metal machine.
@@ -51,12 +54,15 @@ import kotlin.math.min
* @param uid The unique identifier of the machine.
* @param name An optional name of the machine.
* @param cpuNodes The CPU nodes/packages available to the bare metal machine.
+ * @param memoryUnits The memory units in this machine.
+ * @param domain The simulation domain the driver runs in.
*/
public class SimpleBareMetalDriver(
uid: UUID,
name: String,
val cpuNodes: List<ProcessingUnit>,
- val memoryUnits: List<MemoryUnit>
+ val memoryUnits: List<MemoryUnit>,
+ private val domain: Domain
) : BareMetalDriver {
/**
* The monitor to use.
@@ -73,12 +79,17 @@ public class SimpleBareMetalDriver(
*/
private val flavor = Flavor(cpuNodes.sumBy { it.cores }, memoryUnits.map { it.size }.sum())
- override suspend fun init(monitor: ServerMonitor): Node {
- this.monitor = monitor
- return node
+ /**
+ * The job that is running the image.
+ */
+ private var job: Job? = null
+
+ override suspend fun init(monitor: ServerMonitor): Node = withContext(domain.coroutineContext) {
+ this@SimpleBareMetalDriver.monitor = monitor
+ return@withContext node
}
- override suspend fun setPower(powerState: PowerState): Node {
+ override suspend fun setPower(powerState: PowerState): Node = withContext(domain.coroutineContext) {
val previousPowerState = node.powerState
val server = when (node.powerState to powerState) {
PowerState.POWER_OFF to PowerState.POWER_OFF -> null
@@ -100,36 +111,36 @@ public class SimpleBareMetalDriver(
launch()
}
- return node
+ return@withContext node
}
- override suspend fun setImage(image: Image): Node {
+ override suspend fun setImage(image: Image): Node = withContext(domain.coroutineContext) {
node = node.copy(image = image)
- return node
+ return@withContext node
}
- override suspend fun refresh(): Node = node
+ override suspend fun refresh(): Node = withContext(domain.coroutineContext) { node }
/**
* Launch the server image on the machine.
*/
private suspend fun launch() {
- val serverCtx = this.serverCtx
+ val serverContext = serverCtx
- processContext.spawn {
- serverCtx.init()
+ job = domain.launch {
+ serverContext.init()
try {
- node.server!!.image(serverCtx)
- serverCtx.exit()
+ node.server!!.image(serverContext)
+ serverContext.exit()
} catch (cause: Throwable) {
- serverCtx.exit(cause)
+ serverContext.exit(cause)
}
}
}
private data class ProcessorContextImpl(override val info: ProcessingUnit) : ProcessorContext {
override suspend fun run(burst: Long, maxUsage: Double, deadline: Long): Long {
- val start = processContext.clock.millis()
+ val start = simulationContext.clock.millis()
val usage = min(maxUsage, info.clockRate) * 1_000_000 // Usage from MHz to Hz
try {
@@ -141,7 +152,7 @@ public class SimpleBareMetalDriver(
} catch (_: CancellationException) {
// On cancellation, we compute and return the remaining burst
}
- val end = processContext.clock.millis()
+ val end = simulationContext.clock.millis()
val granted = ceil((end - start) / 1000.0 * usage).toLong()
return max(0, burst - granted)
}
@@ -149,7 +160,6 @@ public class SimpleBareMetalDriver(
private val serverCtx = object : ServerManagementContext {
private var initialized: Boolean = false
- private lateinit var ctx: ProcessContext
override val cpus: List<ProcessorContextImpl> = cpuNodes
.asSequence()
@@ -172,7 +182,6 @@ public class SimpleBareMetalDriver(
val previousState = server.state
server = server.copy(state = ServerState.ACTIVE)
monitor.onUpdate(server, previousState)
- ctx = processContext
initialized = true
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt
index 6b5c0979..b18a4006 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt
@@ -24,6 +24,7 @@
package com.atlarge.opendc.compute.metal.service
+import com.atlarge.odcsim.Domain
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.core.image.Image
@@ -31,11 +32,12 @@ import com.atlarge.opendc.compute.core.monitor.ServerMonitor
import com.atlarge.opendc.compute.metal.Node
import com.atlarge.opendc.compute.metal.PowerState
import com.atlarge.opendc.compute.metal.driver.BareMetalDriver
+import kotlinx.coroutines.withContext
/**
* A very basic implementation of the [ProvisioningService].
*/
-public class SimpleProvisioningService : ProvisioningService, ServerMonitor {
+public class SimpleProvisioningService(val domain: Domain) : ProvisioningService, ServerMonitor {
/**
* The active nodes in this service.
*/
@@ -46,29 +48,31 @@ public class SimpleProvisioningService : ProvisioningService, ServerMonitor {
*/
private val monitors: MutableMap<Server, ServerMonitor> = mutableMapOf()
- override suspend fun create(driver: BareMetalDriver): Node {
- val node = driver.init(this)
+ override suspend fun create(driver: BareMetalDriver): Node = withContext(domain.coroutineContext) {
+ val node = driver.init(this@SimpleProvisioningService)
nodes[node] = driver
- return node
+ return@withContext node
}
- override suspend fun nodes(): Set<Node> = nodes.keys
+ override suspend fun nodes(): Set<Node> = withContext(domain.coroutineContext) { nodes.keys }
- override suspend fun refresh(node: Node): Node {
- return nodes[node]!!.refresh()
+ override suspend fun refresh(node: Node): Node = withContext(domain.coroutineContext) {
+ return@withContext nodes[node]!!.refresh()
}
- override suspend fun deploy(node: Node, image: Image, monitor: ServerMonitor): Node {
+ override suspend fun deploy(node: Node, image: Image, monitor: ServerMonitor): Node = withContext(domain.coroutineContext) {
val driver = nodes[node]!!
driver.setImage(image)
driver.setPower(PowerState.POWER_OFF)
val newNode = driver.setPower(PowerState.POWER_ON)
monitors[newNode.server!!] = monitor
- return newNode
+ return@withContext newNode
}
override suspend fun onUpdate(server: Server, previousState: ServerState) {
- monitors[server]?.onUpdate(server, previousState)
+ withContext(domain.coroutineContext) {
+ monitors[server]?.onUpdate(server, previousState)
+ }
}
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt
index c0d5fe0f..9745b56c 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt
@@ -24,8 +24,8 @@
package com.atlarge.opendc.compute.virt.driver.hypervisor
-import com.atlarge.odcsim.ProcessContext
-import com.atlarge.odcsim.processContext
+import com.atlarge.odcsim.SimulationContext
+import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.ServerState
@@ -65,7 +65,7 @@ class HypervisorVirtDriver(
val server = Server(UUID.randomUUID(), "<unnamed>", emptyMap(), flavor, image, ServerState.BUILD)
memoryAvailable -= requiredMemory
- vms.add(VmServerContext(server, monitor, processContext))
+ vms.add(VmServerContext(server, monitor, simulationContext))
return server
}
@@ -76,11 +76,11 @@ class HypervisorVirtDriver(
internal inner class VmServerContext(
override var server: Server,
val monitor: ServerMonitor,
- ctx: ProcessContext
+ ctx: SimulationContext
) : ServerManagementContext {
private var initialized: Boolean = false
- internal val job: Job = ctx.launch {
+ internal val job: Job = ctx.domain.launch {
init()
try {
server.image(this@VmServerContext)
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmSchedulerImpl.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmSchedulerImpl.kt
index 0b172c61..4cc5ac9e 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmSchedulerImpl.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmSchedulerImpl.kt
@@ -24,7 +24,7 @@
package com.atlarge.opendc.compute.virt.driver.hypervisor
-import com.atlarge.odcsim.processContext
+import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.execution.ProcessorContext
@@ -91,7 +91,7 @@ public class VmSchedulerImpl(
flush()
val vcpus = HashSet(vcpus) // Create snapshot of the vCPUs that were scheduled at this moment
- val call = processContext.launch {
+ val call = simulationContext.domain.launch {
var duration: Long = Long.MAX_VALUE
var deadline: Long = Long.MAX_VALUE
@@ -121,7 +121,7 @@ public class VmSchedulerImpl(
// We run the total burst on the host processor. Note that this call may be cancelled at any moment in
// time, so not all of the burst may be executed.
val remainder = run(burst, usage, deadline)
- val time = processContext.clock.millis()
+ val time = simulationContext.clock.millis()
val totalGrantedBurst: Long = burst - remainder
// Compute for each vCPU the
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
index ef1528d9..888364e2 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
@@ -1,6 +1,6 @@
package com.atlarge.opendc.compute.virt.service
-import com.atlarge.odcsim.ProcessContext
+import com.atlarge.odcsim.SimulationContext
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.ServerState
@@ -15,7 +15,7 @@ import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor
import kotlinx.coroutines.launch
class SimpleVirtProvisioningService(
- private val ctx: ProcessContext,
+ private val ctx: SimulationContext,
private val provisioningService: ProvisioningService,
private val hypervisorMonitor: HypervisorMonitor
) : VirtProvisioningService, ServerMonitor {
@@ -50,7 +50,7 @@ class SimpleVirtProvisioningService(
internal val imagesByServer: MutableMap<Server, MutableSet<ImageView>> = mutableMapOf()
init {
- ctx.launch {
+ ctx.domain.launch {
val provisionedNodes = provisioningService.nodes().toList()
val deployedNodes = provisionedNodes.map { node ->
val hypervisorImage =
@@ -72,7 +72,7 @@ class SimpleVirtProvisioningService(
}
private fun requestCycle() {
- ctx.launch {
+ ctx.domain.launch {
schedule()
}
}
diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
index dc4f8078..6b234b73 100644
--- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
+++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
@@ -25,14 +25,17 @@
package com.atlarge.opendc.compute.metal.driver
import com.atlarge.odcsim.SimulationEngineProvider
+import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.core.image.FlopsApplicationImage
import com.atlarge.opendc.compute.core.monitor.ServerMonitor
import com.atlarge.opendc.compute.metal.PowerState
-import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.withContext
+import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import java.util.ServiceLoader
import java.util.UUID
@@ -43,26 +46,35 @@ internal class SimpleBareMetalDriverTest {
*/
@Test
fun smoke() {
+ var finalState: ServerState = ServerState.BUILD
val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
- val system = provider({ _ ->
- val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1000, 2)
+ val system = provider("sim")
+ val root = system.newDomain(name = "root")
+ root.launch {
+ val dom = root.newDomain(name = "driver")
+ val flavor = Flavor(4, 0)
+ val driver = SimpleBareMetalDriver(UUID.randomUUID(), "test", listOf(ProcessingUnit("Intel", "Xeon", "amd64", 2300.0, 4)), emptyList(), dom)
+
val monitor = object : ServerMonitor {
override suspend fun onUpdate(server: Server, previousState: ServerState) {
- println(server)
+ finalState = server.state
}
}
- val driver = SimpleBareMetalDriver(UUID.randomUUID(), "test", listOf(ProcessingUnit("Intel", "Xeon", "amd64", 2300.0, 4)), emptyList())
+ val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1000, 2)
- driver.init(monitor)
- driver.setImage(image)
- driver.setPower(PowerState.POWER_ON)
- delay(5)
- println(driver.refresh())
- }, name = "sim")
+ // Batch driver commands
+ withContext(dom.coroutineContext) {
+ driver.init(monitor)
+ driver.setImage(image)
+ driver.setPower(PowerState.POWER_ON)
+ }
+ }
runBlocking {
system.run()
system.terminate()
}
+
+ assertEquals(finalState, ServerState.SHUTOFF)
}
}
diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt
index 85e3383c..3b32b3b8 100644
--- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt
+++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt
@@ -32,6 +32,7 @@ import com.atlarge.opendc.compute.core.image.FlopsApplicationImage
import com.atlarge.opendc.compute.core.monitor.ServerMonitor
import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver
import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Test
import java.util.ServiceLoader
@@ -47,21 +48,25 @@ internal class SimpleProvisioningServiceTest {
@Test
fun smoke() {
val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
- val system = provider({ _ ->
+ val system = provider("sim")
+ val root = system.newDomain(name = "root")
+ root.launch {
val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1000, 2)
val monitor = object : ServerMonitor {
override suspend fun onUpdate(server: Server, previousState: ServerState) {
println(server)
}
}
- val driver = SimpleBareMetalDriver(UUID.randomUUID(), "test", listOf(ProcessingUnit("Intel", "Xeon", "amd64", 2300.0, 4)), emptyList())
- val provisioner = SimpleProvisioningService()
+ val dom = root.newDomain("provisioner")
+ val driver = SimpleBareMetalDriver(UUID.randomUUID(), "test", listOf(ProcessingUnit("Intel", "Xeon", "amd64", 2300.0, 4)), emptyList(), dom)
+
+ val provisioner = SimpleProvisioningService(dom)
provisioner.create(driver)
delay(5)
val nodes = provisioner.nodes()
provisioner.deploy(nodes.first(), image, monitor)
- }, name = "sim")
+ }
runBlocking {
system.run()
diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt
index f59f4830..002fa175 100644
--- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt
+++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt
@@ -25,7 +25,7 @@
package com.atlarge.opendc.compute.virt.driver.hypervisor
import com.atlarge.odcsim.SimulationEngineProvider
-import com.atlarge.odcsim.processContext
+import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.Flavor
@@ -37,6 +37,7 @@ import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver
import com.atlarge.opendc.compute.virt.driver.VirtDriver
import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor
import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Test
import java.util.ServiceLoader
@@ -52,7 +53,10 @@ internal class HypervisorTest {
@Test
fun smoke() {
val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
- val system = provider({ _ ->
+ val system = provider("test")
+ val root = system.newDomain("root")
+
+ root.launch {
val vmm = HypervisorImage(object : HypervisorMonitor {
override fun onSliceFinish(
time: Long,
@@ -68,10 +72,12 @@ internal class HypervisorTest {
val workloadB = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 2_000_000, 1)
val monitor = object : ServerMonitor {
override suspend fun onUpdate(server: Server, previousState: ServerState) {
- println("[${processContext.clock.millis()}]: $server")
+ println("[${simulationContext.clock.millis()}]: $server")
}
}
- val metalDriver = SimpleBareMetalDriver(UUID.randomUUID(), "test", listOf(ProcessingUnit("Intel", "Xeon", "amd64", 2000.0, 1)), emptyList())
+
+ val driverDom = root.newDomain("driver")
+ val metalDriver = SimpleBareMetalDriver(UUID.randomUUID(), "test", listOf(ProcessingUnit("Intel", "Xeon", "amd64", 2000.0, 1)), emptyList(), driverDom)
metalDriver.init(monitor)
metalDriver.setImage(vmm)
@@ -82,7 +88,7 @@ internal class HypervisorTest {
val vmDriver = metalDriver.refresh().server!!.serviceRegistry[VirtDriver]
vmDriver.spawn(workloadA, monitor, flavor)
vmDriver.spawn(workloadB, monitor, flavor)
- }, name = "sim")
+ }
runBlocking {
system.run()
diff --git a/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt b/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt
index 96796c07..d5e1404a 100644
--- a/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt
+++ b/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt
@@ -25,6 +25,7 @@
package com.atlarge.opendc.experiments.sc18
import com.atlarge.odcsim.SimulationEngineProvider
+import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.metal.service.ProvisioningService
import com.atlarge.opendc.format.environment.sc18.Sc18EnvironmentReader
import com.atlarge.opendc.format.trace.gwf.GwfTraceReader
@@ -39,12 +40,14 @@ import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy
import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy
import com.atlarge.opendc.workflows.workload.Job
import com.atlarge.opendc.workflows.workload.Task
+import kotlin.math.max
+import kotlinx.coroutines.async
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.io.File
import java.util.ServiceLoader
-import kotlin.math.max
/**
* Main entry point of the experiment.
@@ -55,9 +58,6 @@ fun main(args: Array<String>) {
return
}
- val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/env/setup-test.json"))
- .use { it.read() }
-
var total = 0
var finished = 0
@@ -85,11 +85,16 @@ fun main(args: Array<String>) {
}
val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
- val system = provider({ ctx ->
- println(ctx.clock.instant())
- val scheduler = StageWorkflowService(
- ctx,
- environment.platforms[0].zones[0].services[ProvisioningService.Key],
+ val system = provider(name = "sim")
+
+ val schedulerDomain = system.newDomain(name = "scheduler")
+ val schedulerAsync = schedulerDomain.async {
+ val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/env/setup-test.json"))
+ .use { it.construct(system.newDomain("topology")) }
+
+ StageWorkflowService(
+ schedulerDomain,
+ environment.platforms[0].zones[0].services[ProvisioningService],
mode = WorkflowSchedulerMode.Batch(100),
jobAdmissionPolicy = NullJobAdmissionPolicy,
jobOrderPolicy = SubmissionTimeJobOrderPolicy(),
@@ -98,8 +103,13 @@ fun main(args: Array<String>) {
resourceFilterPolicy = FunctionalResourceFilterPolicy,
resourceSelectionPolicy = FirstFitResourceSelectionPolicy
)
+ }
+ val broker = system.newDomain(name = "broker")
+ broker.launch {
+ val ctx = simulationContext
val reader = GwfTraceReader(File(args[0]))
+ val scheduler = schedulerAsync.await()
while (reader.hasNext()) {
val (time, job) = reader.next()
@@ -107,11 +117,7 @@ fun main(args: Array<String>) {
delay(max(0, time * 1000 - ctx.clock.millis()))
scheduler.submit(job, monitor)
}
-
- token.receive()
-
- println(ctx.clock.instant())
- }, name = "sim")
+ }
runBlocking {
system.run()
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
index 3882feb7..48aca303 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
@@ -25,6 +25,7 @@
package com.atlarge.opendc.experiments.sc20
import com.atlarge.odcsim.SimulationEngineProvider
+import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.ServerState
@@ -35,6 +36,7 @@ import com.atlarge.opendc.format.environment.sc20.Sc20EnvironmentReader
import com.atlarge.opendc.format.trace.vm.VmTraceReader
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.io.File
import java.util.ServiceLoader
@@ -48,10 +50,6 @@ fun main(args: Array<String>) {
println("error: Please provide path to directory containing VM trace files")
return
}
-
- val environment = Sc20EnvironmentReader(object {}.javaClass.getResourceAsStream("/env/setup-small.json"))
- .use { it.read() }
-
val token = Channel<Boolean>()
val monitor = object : ServerMonitor {
@@ -61,10 +59,17 @@ fun main(args: Array<String>) {
}
val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
- val system = provider({ ctx ->
- println(ctx.clock.instant())
+ val system = provider("test")
+ val root = system.newDomain("root")
+
+ root.launch {
+ val environment = Sc20EnvironmentReader(object {}.javaClass.getResourceAsStream("/env/setup-small.json"))
+ .use { it.construct(root) }
+
+ println(simulationContext.clock.instant())
+
val scheduler = SimpleVirtProvisioningService(
- ctx,
+ simulationContext,
environment.platforms[0].zones[0].services[ProvisioningService.Key],
Sc20HypervisorMonitor()
)
@@ -73,14 +78,14 @@ fun main(args: Array<String>) {
delay(1376314846 * 1000L)
while (reader.hasNext()) {
val (time, workload) = reader.next()
- delay(max(0, time * 1000 - ctx.clock.millis()))
+ delay(max(0, time * 1000 - simulationContext.clock.millis()))
scheduler.deploy(workload.image, monitor, Flavor(workload.image.cores, workload.image.requiredMemory))
}
token.receive()
- println(ctx.clock.instant())
- }, name = "sim")
+ println(simulationContext.clock.instant())
+ }
runBlocking {
system.run()
diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt
index 6ca53a05..42551f43 100644
--- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt
+++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt
@@ -24,6 +24,7 @@
package com.atlarge.opendc.format.environment
+import com.atlarge.odcsim.Domain
import com.atlarge.opendc.core.Environment
import java.io.Closeable
@@ -32,7 +33,7 @@ import java.io.Closeable
*/
interface EnvironmentReader : Closeable {
/**
- * Read the description of the datacenter environment as [Environment].
+ * Construct an [Environment] in the specified domain.
*/
- fun read(): Environment
+ suspend fun construct(dom: Domain): Environment
}
diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
index ac44337a..8898ddc7 100644
--- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
+++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
@@ -24,6 +24,7 @@
package com.atlarge.opendc.format.environment.sc18
+import com.atlarge.odcsim.Domain
import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.MemoryUnit
import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver
@@ -37,7 +38,6 @@ import com.atlarge.opendc.format.environment.EnvironmentReader
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
-import kotlinx.coroutines.runBlocking
import java.io.InputStream
import java.util.UUID
@@ -52,10 +52,11 @@ class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb
/**
* The environment that was read from the file.
*/
- private val environment: Environment
+ private val setup: Setup = mapper.readValue(input)
+
+ override suspend fun construct(dom: Domain): Environment {
+ val provisioningDomain = dom.newDomain("provisioner")
- init {
- val setup = mapper.readValue<Setup>(input)
var counter = 0
val nodes = setup.rooms.flatMap { room ->
room.objects.flatMap { roomObject ->
@@ -69,18 +70,17 @@ class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb
else -> throw IllegalArgumentException("The cpu id $id is not recognized")
}
}
- SimpleBareMetalDriver(UUID.randomUUID(), "node-${counter++}", cores, listOf(MemoryUnit("", "", 2300.0, 16000)))
+ SimpleBareMetalDriver(UUID.randomUUID(), "node-${counter++}", cores, listOf(MemoryUnit("", "", 2300.0, 16000)),
+ dom.newDomain("node-$counter"))
}
}
}
}
}
- val provisioningService = SimpleProvisioningService()
- runBlocking {
- for (node in nodes) {
- provisioningService.create(node)
- }
+ val provisioningService = SimpleProvisioningService(provisioningDomain)
+ for (node in nodes) {
+ provisioningService.create(node)
}
val serviceRegistry = ServiceRegistryImpl()
@@ -92,10 +92,8 @@ class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb
)
)
- environment = Environment(setup.name, null, listOf(platform))
+ return Environment(setup.name, null, listOf(platform))
}
- override fun read(): Environment = environment
-
override fun close() {}
}
diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt
index 5eb711cc..fecba302 100644
--- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt
+++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt
@@ -24,6 +24,7 @@
package com.atlarge.opendc.format.environment.sc20
+import com.atlarge.odcsim.Domain
import com.atlarge.opendc.compute.core.MemoryUnit
import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver
@@ -37,7 +38,6 @@ import com.atlarge.opendc.format.environment.EnvironmentReader
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
-import kotlinx.coroutines.runBlocking
import java.io.InputStream
import java.util.UUID
@@ -51,10 +51,9 @@ class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb
/**
* The environment that was read from the file.
*/
- private val environment: Environment
+ private val setup: Setup = mapper.readValue(input)
- init {
- val setup = mapper.readValue<Setup>(input)
+ override suspend fun construct(dom: Domain): Environment {
var counter = 0
val nodes = setup.rooms.flatMap { room ->
room.objects.flatMap { roomObject ->
@@ -74,18 +73,16 @@ class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb
else -> throw IllegalArgumentException("The cpu id $id is not recognized")
}
}
- SimpleBareMetalDriver(UUID.randomUUID(), "node-${counter++}", cores, memories)
+ SimpleBareMetalDriver(UUID.randomUUID(), "node-${counter++}", cores, memories, dom.newDomain("node-$counter"))
}
}
}
}
}
- val provisioningService = SimpleProvisioningService()
- runBlocking {
- for (node in nodes) {
- provisioningService.create(node)
- }
+ val provisioningService = SimpleProvisioningService(dom.newDomain("provisioner"))
+ for (node in nodes) {
+ provisioningService.create(node)
}
val serviceRegistry = ServiceRegistryImpl()
@@ -97,10 +94,8 @@ class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb
)
)
- environment = Environment(setup.name, null, listOf(platform))
+ return Environment(setup.name, null, listOf(platform))
}
- override fun read(): Environment = environment
-
override fun close() {}
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
index 48f06bcd..008cd1ee 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
@@ -24,7 +24,8 @@
package com.atlarge.opendc.workflows.service
-import com.atlarge.odcsim.ProcessContext
+import com.atlarge.odcsim.Domain
+import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.core.monitor.ServerMonitor
@@ -41,13 +42,14 @@ import com.atlarge.opendc.workflows.workload.Job
import java.util.PriorityQueue
import java.util.Queue
import kotlinx.coroutines.launch
+import kotlinx.coroutines.withContext
/**
* A [WorkflowService] that distributes work through a multi-stage process based on the Reference Architecture for
* Datacenter Scheduling.
*/
class StageWorkflowService(
- private val ctx: ProcessContext,
+ private val domain: Domain,
private val provisioningService: ProvisioningService,
mode: WorkflowSchedulerMode,
jobAdmissionPolicy: JobAdmissionPolicy,
@@ -167,7 +169,7 @@ class StageWorkflowService(
private val resourceSelectionPolicy: Comparator<Node>
init {
- ctx.launch {
+ domain.launch {
nodes = provisioningService.nodes().toList()
available.addAll(nodes)
}
@@ -181,9 +183,9 @@ class StageWorkflowService(
this.resourceSelectionPolicy = resourceSelectionPolicy(this)
}
- override suspend fun submit(job: Job, monitor: WorkflowMonitor) {
+ override suspend fun submit(job: Job, monitor: WorkflowMonitor) = withContext(domain.coroutineContext) {
// J1 Incoming Jobs
- val jobInstance = JobState(job, monitor, ctx.clock.millis())
+ val jobInstance = JobState(job, monitor, simulationContext.clock.millis())
val instances = job.tasks.associateWith {
TaskState(jobInstance, it)
}
@@ -230,7 +232,7 @@ class StageWorkflowService(
iterator.remove()
jobQueue.add(jobInstance)
activeJobs += jobInstance
- jobInstance.monitor.onJobStart(jobInstance.job, ctx.clock.millis())
+ jobInstance.monitor.onJobStart(jobInstance.job, simulationContext.clock.millis())
rootListener.jobStarted(jobInstance)
}
@@ -292,23 +294,23 @@ class StageWorkflowService(
}
}
- override suspend fun onUpdate(server: Server, previousState: ServerState) {
+ override suspend fun onUpdate(server: Server, previousState: ServerState) = withContext(domain.coroutineContext) {
when (server.state) {
ServerState.ACTIVE -> {
val task = taskByServer.getValue(server)
- task.startedAt = ctx.clock.millis()
- task.job.monitor.onTaskStart(task.job.job, task.task, ctx.clock.millis())
+ task.startedAt = simulationContext.clock.millis()
+ task.job.monitor.onTaskStart(task.job.job, task.task, simulationContext.clock.millis())
rootListener.taskStarted(task)
}
ServerState.SHUTOFF, ServerState.ERROR -> {
val task = taskByServer.remove(server) ?: throw IllegalStateException()
val job = task.job
task.state = TaskStatus.FINISHED
- task.finishedAt = ctx.clock.millis()
+ task.finishedAt = simulationContext.clock.millis()
job.tasks.remove(task)
available += task.host!!
activeTasks -= task
- job.monitor.onTaskFinish(job.job, task.task, 0, ctx.clock.millis())
+ job.monitor.onTaskFinish(job.job, task.task, 0, simulationContext.clock.millis())
rootListener.taskFinished(task)
// Add job roots to the scheduling queue
@@ -333,7 +335,7 @@ class StageWorkflowService(
private suspend fun finishJob(job: JobState) {
activeJobs -= job
- job.monitor.onJobFinish(job.job, ctx.clock.millis())
+ job.monitor.onJobFinish(job.job, simulationContext.clock.millis())
rootListener.jobFinished(job)
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt
index cfec93b5..776f0b07 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt
@@ -24,7 +24,7 @@
package com.atlarge.opendc.workflows.service
-import com.atlarge.odcsim.processContext
+import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.workflows.service.stage.StagePolicy
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
@@ -66,13 +66,13 @@ sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Logic> {
override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic {
override suspend fun requestCycle() {
- val ctx = processContext
+ val ctx = simulationContext
if (next == null) {
// In batch mode, we assume that the scheduler runs at a fixed slot every time
// quantum (e.g t=0, t=60, t=120). We calculate here the delay until the next scheduling slot.
val delay = quantum - (ctx.clock.millis() % quantum)
- val job = ctx.launch {
+ val job = ctx.domain.launch {
delay(delay)
next = null
scheduler.schedule()
@@ -93,11 +93,11 @@ sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Logic> {
override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic {
override suspend fun requestCycle() {
- val ctx = processContext
+ val ctx = simulationContext
if (next == null) {
val delay = random.nextInt(200).toLong()
- val job = ctx.launch {
+ val job = ctx.domain.launch {
delay(delay)
next = null
scheduler.schedule()