diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-09-30 21:14:20 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-09-30 23:40:57 +0200 |
| commit | c41d201343263346ac84855a0b2254051ed33c21 (patch) | |
| tree | 9141a382f9e1b2d924e9a191e53cc6daa9107563 /simulator | |
| parent | c543f55e961f9f7468e19c1c0f5f20566d07dfb5 (diff) | |
Eliminate use of Domain and simulationContext in OpenDC
This change takes the first step in eliminating the explict use of
Domain and simulationContext from OpenDC. In this way, we decouple the
logic of various datacenter services from simulation logic, which should
promote re-use.
Diffstat (limited to 'simulator')
54 files changed, 446 insertions, 356 deletions
diff --git a/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationContext.kt b/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationContext.kt index c51d1d8b..3423d43a 100644 --- a/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationContext.kt +++ b/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationContext.kt @@ -24,10 +24,10 @@ package com.atlarge.odcsim +import org.slf4j.Logger import java.time.Clock import kotlin.coroutines.CoroutineContext import kotlin.coroutines.coroutineContext -import org.slf4j.Logger /** * Represents the execution context of a simulation domain. diff --git a/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/EventFlow.kt b/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/EventFlow.kt index 0e18f82f..5d9af9ec 100644 --- a/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/EventFlow.kt +++ b/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/EventFlow.kt @@ -24,7 +24,6 @@ package com.atlarge.odcsim.flow -import java.util.WeakHashMap import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.InternalCoroutinesApi @@ -33,6 +32,7 @@ import kotlinx.coroutines.channels.SendChannel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.FlowCollector import kotlinx.coroutines.flow.consumeAsFlow +import java.util.WeakHashMap /** * A [Flow] that can be used to emit events. diff --git a/simulator/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt b/simulator/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt index e675b877..e4d0f4ac 100644 --- a/simulator/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt +++ b/simulator/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt @@ -28,13 +28,6 @@ import com.atlarge.odcsim.Domain import com.atlarge.odcsim.SimulationContext import com.atlarge.odcsim.SimulationEngine import com.atlarge.odcsim.engine.omega.logging.LoggerImpl -import java.time.Clock -import java.time.Instant -import java.time.ZoneId -import java.util.PriorityQueue -import java.util.UUID -import kotlin.coroutines.CoroutineContext -import kotlin.coroutines.coroutineContext import kotlinx.coroutines.CancellableContinuation import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineExceptionHandler @@ -48,6 +41,13 @@ import kotlinx.coroutines.Runnable import kotlinx.coroutines.SupervisorJob import org.jetbrains.annotations.Async import org.slf4j.Logger +import java.time.Clock +import java.time.Instant +import java.time.ZoneId +import java.util.PriorityQueue +import java.util.UUID +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.coroutineContext /** * The reference implementation of the [SimulationEngine] instance for the OpenDC simulation core. @@ -71,12 +71,14 @@ public class OmegaSimulationEngine(override val name: String) : SimulationEngine /** * The event queue to process */ - private val queue: PriorityQueue<Event> = PriorityQueue(Comparator<Event> { lhs, rhs -> - // Note that Comparator gives better performance than Comparable according to - // profiling - val cmp = lhs.time.compareTo(rhs.time) - if (cmp == 0) lhs.id.compareTo(rhs.id) else cmp - }) + private val queue: PriorityQueue<Event> = PriorityQueue( + Comparator<Event> { lhs, rhs -> + // Note that Comparator gives better performance than Comparable according to +// profiling + val cmp = lhs.time.compareTo(rhs.time) + if (cmp == 0) lhs.id.compareTo(rhs.id) else cmp + } + ) /** * The active processes in the simulation engine. diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt index fd0fc836..01968cd8 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt @@ -28,8 +28,8 @@ import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.core.resource.Resource import com.atlarge.opendc.core.resource.TagContainer import com.atlarge.opendc.core.services.ServiceRegistry -import java.util.UUID import kotlinx.coroutines.flow.Flow +import java.util.UUID /** * A server instance that is running on some physical or virtual machine. diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt index f770fa49..817bee4b 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt @@ -30,12 +30,18 @@ import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.core.services.ServiceKey import kotlinx.coroutines.selects.SelectClause0 import kotlinx.coroutines.selects.select +import java.time.Clock /** * Represents the execution context in which a bootable [Image] runs on a [Server]. */ public interface ServerContext { /** + * The virtual clock. + */ + public val clock: Clock + + /** * The server on which the image runs. */ public val server: Server diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt index c615d865..0e1af093 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt @@ -1,6 +1,5 @@ package com.atlarge.opendc.compute.core.image -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.execution.ServerContext import com.atlarge.opendc.core.resource.TagContainer import java.util.UUID @@ -16,8 +15,7 @@ class VmImage( ) : Image { override suspend fun invoke(ctx: ServerContext) { - val clock = simulationContext.clock - var offset = clock.millis() + var offset = ctx.clock.millis() val batch = flopsHistory.map { fragment -> val cores = min(fragment.cores, ctx.server.flavor.cpuCount) diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt index cb637aea..7cb4c0c5 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt @@ -27,8 +27,8 @@ package com.atlarge.opendc.compute.metal import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.core.Identity -import java.util.UUID import kotlinx.coroutines.flow.Flow +import java.util.UUID /** * A bare-metal compute node. diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt index 17d8ee53..41cec291 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt @@ -30,8 +30,8 @@ import com.atlarge.opendc.compute.metal.Node import com.atlarge.opendc.core.failure.FailureDomain import com.atlarge.opendc.core.power.Powerable import com.atlarge.opendc.core.services.AbstractServiceKey -import java.util.UUID import kotlinx.coroutines.flow.Flow +import java.util.UUID /** * A driver interface for the management interface of a bare-metal compute node. diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt index a453e459..c118cc3d 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt @@ -24,8 +24,6 @@ package com.atlarge.opendc.compute.metal.driver -import com.atlarge.odcsim.Domain -import com.atlarge.odcsim.SimulationContext import com.atlarge.odcsim.flow.EventFlow import com.atlarge.odcsim.flow.StateFlow import com.atlarge.opendc.compute.core.Flavor @@ -46,14 +44,6 @@ import com.atlarge.opendc.compute.metal.power.ConstantPowerModel import com.atlarge.opendc.core.power.PowerModel import com.atlarge.opendc.core.services.ServiceKey import com.atlarge.opendc.core.services.ServiceRegistry -import java.lang.Exception -import java.time.Clock -import java.util.UUID -import kotlin.coroutines.ContinuationInterceptor -import kotlin.math.ceil -import kotlin.math.max -import kotlin.math.min -import kotlin.random.Random import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Delay import kotlinx.coroutines.DisposableHandle @@ -67,12 +57,20 @@ import kotlinx.coroutines.intrinsics.startCoroutineCancellable import kotlinx.coroutines.launch import kotlinx.coroutines.selects.SelectClause0 import kotlinx.coroutines.selects.SelectInstance -import kotlinx.coroutines.withContext +import java.lang.Exception +import java.time.Clock +import java.util.UUID +import kotlin.coroutines.ContinuationInterceptor +import kotlin.math.ceil +import kotlin.math.max +import kotlin.math.min +import kotlin.random.Random /** * A basic implementation of the [BareMetalDriver] that simulates an [Image] running on a bare-metal machine. * - * @param domain The simulation domain the driver runs in. + * @param coroutineScope The [CoroutineScope] the driver runs in. + * @param clock The virtual clock to keep track of time. * @param uid The unique identifier of the machine. * @param name An optional name of the machine. * @param metadata The initial metadata of the node. @@ -82,7 +80,8 @@ import kotlinx.coroutines.withContext */ @OptIn(ExperimentalCoroutinesApi::class) public class SimpleBareMetalDriver( - private val domain: Domain, + private val coroutineScope: CoroutineScope, + private val clock: Clock, uid: UUID, name: String, metadata: Map<String, Any>, @@ -129,14 +128,14 @@ public class SimpleBareMetalDriver( */ private val random = Random(uid.leastSignificantBits xor uid.mostSignificantBits) - override suspend fun init(): Node = withContext(domain.coroutineContext) { - nodeState.value + override suspend fun init(): Node { + return nodeState.value } - override suspend fun start(): Node = withContext(domain.coroutineContext) { + override suspend fun start(): Node { val node = nodeState.value if (node.state != NodeState.SHUTOFF) { - return@withContext node + return node } val events = EventFlow<ServerEvent>() @@ -153,13 +152,13 @@ public class SimpleBareMetalDriver( setNode(node.copy(state = NodeState.BOOT, server = server)) serverContext = BareMetalServerContext(events) - return@withContext nodeState.value + return nodeState.value } - override suspend fun stop(): Node = withContext(domain.coroutineContext) { + override suspend fun stop(): Node { val node = nodeState.value if (node.state == NodeState.SHUTOFF) { - return@withContext node + return node } // We terminate the image running on the machine @@ -167,20 +166,20 @@ public class SimpleBareMetalDriver( serverContext = null setNode(node.copy(state = NodeState.SHUTOFF, server = null)) - return@withContext node + return node } - override suspend fun reboot(): Node = withContext(domain.coroutineContext) { + override suspend fun reboot(): Node { stop() - start() + return start() } - override suspend fun setImage(image: Image): Node = withContext(domain.coroutineContext) { + override suspend fun setImage(image: Image): Node { setNode(nodeState.value.copy(image = image)) - return@withContext nodeState.value + return nodeState.value } - override suspend fun refresh(): Node = withContext(domain.coroutineContext) { nodeState.value } + override suspend fun refresh(): Node = nodeState.value private fun setNode(value: Node) { val field = nodeState.value @@ -207,7 +206,10 @@ public class SimpleBareMetalDriver( override val server: Server get() = nodeState.value.server!! - private val job = domain.launch { + override val clock: Clock + get() = this@SimpleBareMetalDriver.clock + + private val job = coroutineScope.launch { delay(1) // TODO Introduce boot time init() try { @@ -265,18 +267,13 @@ public class SimpleBareMetalDriver( private var usageFlush: DisposableHandle? = null /** - * Cache the [Clock] for timing. - */ - private val clock = domain.coroutineContext[SimulationContext]!!.clock - - /** * Cache the [Delay] instance for timing. * * XXX We need to cache this before the call to [onRun] since doing this in [onRun] is too heavy. * XXX Note however that this is an ugly hack which may break in the future. */ @OptIn(InternalCoroutinesApi::class) - private val delay = domain.coroutineContext[ContinuationInterceptor] as Delay + private val delay = coroutineScope.coroutineContext[ContinuationInterceptor] as Delay @OptIn(InternalCoroutinesApi::class) override fun onRun( @@ -353,10 +350,10 @@ public class SimpleBareMetalDriver( currentDisposable?.dispose() // Schedule reset the usage of the machine since the call is returning - usageFlush = delay.invokeOnTimeout(1, Runnable { + usageFlush = delay.invokeOnTimeout(1) { usageState.value = 0.0 usageFlush = null - }) + } } select.disposeOnSelect(disposable) @@ -458,7 +455,7 @@ public class SimpleBareMetalDriver( } override val scope: CoroutineScope - get() = domain + get() = coroutineScope override suspend fun fail() { serverContext?.unavailable = true diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt index f6b236ae..f64f9b5a 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt @@ -24,44 +24,41 @@ package com.atlarge.opendc.compute.metal.service -import com.atlarge.odcsim.Domain import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.metal.Node import com.atlarge.opendc.compute.metal.driver.BareMetalDriver import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.withContext /** * A very basic implementation of the [ProvisioningService]. */ -public class SimpleProvisioningService(val domain: Domain) : ProvisioningService { +public class SimpleProvisioningService : ProvisioningService { /** * The active nodes in this service. */ private val nodes: MutableMap<Node, BareMetalDriver> = mutableMapOf() - override suspend fun create(driver: BareMetalDriver): Node = withContext(domain.coroutineContext) { + override suspend fun create(driver: BareMetalDriver): Node { val node = driver.init() nodes[node] = driver - return@withContext node + return node } - override suspend fun nodes(): Set<Node> = withContext(domain.coroutineContext) { nodes.keys } + override suspend fun nodes(): Set<Node> = nodes.keys - override suspend fun refresh(node: Node): Node = withContext(domain.coroutineContext) { - return@withContext nodes[node]!!.refresh() + override suspend fun refresh(node: Node): Node { + return nodes[node]!!.refresh() } - override suspend fun deploy(node: Node, image: Image): Node = withContext(domain.coroutineContext) { + override suspend fun deploy(node: Node, image: Image): Node { val driver = nodes[node]!! driver.setImage(image) - val newNode = driver.reboot() - return@withContext newNode + return driver.reboot() } - override suspend fun stop(node: Node): Node = withContext(domain.coroutineContext) { + override suspend fun stop(node: Node): Node { val driver = nodes[node]!! - try { + return try { driver.stop() } catch (e: CancellationException) { node diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt index 1e7e351f..69b0124d 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt @@ -25,8 +25,8 @@ package com.atlarge.opendc.compute.virt import com.atlarge.opendc.core.Identity -import java.util.UUID import kotlinx.coroutines.flow.Flow +import java.util.UUID /** * A hypervisor (or virtual machine monitor) is software or firmware that virtualizes the host compute environment diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt index 607759a8..bd395f0d 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt @@ -29,9 +29,9 @@ import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver import com.atlarge.opendc.compute.virt.driver.VirtDriver import com.atlarge.opendc.core.resource.TagContainer -import java.util.UUID import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.suspendCancellableCoroutine +import java.util.UUID /** * A hypervisor managing the VMs of a node. diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt index 192db413..df45f440 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt @@ -25,7 +25,6 @@ package com.atlarge.opendc.compute.virt.driver import com.atlarge.odcsim.flow.EventFlow -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.Server @@ -40,10 +39,6 @@ import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel import com.atlarge.opendc.compute.virt.HypervisorEvent import com.atlarge.opendc.core.services.ServiceKey import com.atlarge.opendc.core.services.ServiceRegistry -import java.util.UUID -import kotlin.math.ceil -import kotlin.math.max -import kotlin.math.min import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.DisposableHandle @@ -59,6 +54,17 @@ import kotlinx.coroutines.launch import kotlinx.coroutines.selects.SelectClause0 import kotlinx.coroutines.selects.SelectInstance import kotlinx.coroutines.selects.select +import mu.KotlinLogging +import java.time.Clock +import java.util.UUID +import kotlin.math.ceil +import kotlin.math.max +import kotlin.math.min + +/** + * The logging instance to use. + */ +private val logger = KotlinLogging.logger {} /** * A [VirtDriver] that is backed by a simple hypervisor implementation. @@ -97,7 +103,7 @@ class SimpleVirtDriver( scheduler() } catch (e: Exception) { if (e !is CancellationException) { - simulationContext.log.error("Hypervisor scheduler failed", e) + logger.error("Hypervisor scheduler failed", e) } throw e } @@ -117,8 +123,14 @@ class SimpleVirtDriver( val events = EventFlow<ServerEvent>() val server = Server( - UUID.randomUUID(), name, emptyMap(), flavor, image, ServerState.BUILD, - ServiceRegistry(), events + UUID.randomUUID(), + name, + emptyMap(), + flavor, + image, + ServerState.BUILD, + ServiceRegistry(), + events ) availableMemory -= requiredMemory vms.add(VmServerContext(server, events)) @@ -181,7 +193,7 @@ class SimpleVirtDriver( * The scheduling process of the hypervisor. */ private suspend fun scheduler() { - val clock = simulationContext.clock + val clock = hostContext.clock val maxUsage = hostContext.cpus.sumByDouble { it.frequency } val pCPUs = hostContext.cpus.indices.sortedBy { hostContext.cpus[it].frequency } @@ -561,6 +573,9 @@ class SimpleVirtDriver( override val cpus: List<ProcessingUnit> = hostContext.cpus.take(server.flavor.cpuCount) + override val clock: Clock + get() = hostContext.clock + init { vm = Vm(this) } diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt index b1844f67..1002d382 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt @@ -29,8 +29,8 @@ import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.virt.HypervisorEvent import com.atlarge.opendc.core.services.AbstractServiceKey -import java.util.UUID import kotlinx.coroutines.flow.Flow +import java.util.UUID /** * A driver interface for a hypervisor running on some host server and communicating with the central compute service to diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt index 79388bc3..6b2cfc40 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt @@ -1,8 +1,6 @@ package com.atlarge.opendc.compute.virt.service -import com.atlarge.odcsim.SimulationContext import com.atlarge.odcsim.flow.EventFlow -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerEvent @@ -16,29 +14,25 @@ import com.atlarge.opendc.compute.virt.driver.InsufficientMemoryOnServerExceptio import com.atlarge.opendc.compute.virt.driver.VirtDriver import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy import com.atlarge.opendc.core.services.ServiceKey -import kotlin.coroutines.Continuation -import kotlin.coroutines.resume -import kotlin.math.max -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.Job -import kotlinx.coroutines.delay +import kotlinx.coroutines.* import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.launch -import kotlinx.coroutines.suspendCancellableCoroutine -import kotlinx.coroutines.withContext import mu.KotlinLogging +import java.time.Clock +import kotlin.coroutines.Continuation +import kotlin.coroutines.resume +import kotlin.math.max private val logger = KotlinLogging.logger {} @OptIn(ExperimentalCoroutinesApi::class) class SimpleVirtProvisioningService( - public override val allocationPolicy: AllocationPolicy, - private val ctx: SimulationContext, - private val provisioningService: ProvisioningService -) : VirtProvisioningService, CoroutineScope by ctx.domain { + private val coroutineScope: CoroutineScope, + private val clock: Clock, + private val provisioningService: ProvisioningService, + override val allocationPolicy: AllocationPolicy +) : VirtProvisioningService { /** * The hypervisors that have been launched by the service. */ @@ -59,11 +53,11 @@ class SimpleVirtProvisioningService( */ private val activeImages: MutableSet<ImageView> = mutableSetOf() - public var submittedVms = 0 - public var queuedVms = 0 - public var runningVms = 0 - public var finishedVms = 0 - public var unscheduledVms = 0 + var submittedVms = 0 + var queuedVms = 0 + var runningVms = 0 + var finishedVms = 0 + var unscheduledVms = 0 private var maxCores = 0 private var maxMemory = 0L @@ -81,7 +75,7 @@ class SimpleVirtProvisioningService( override val events: Flow<VirtProvisioningEvent> = eventFlow init { - launch { + coroutineScope.launch { val provisionedNodes = provisioningService.nodes() provisionedNodes.forEach { node -> val hypervisorImage = HypervisorImage @@ -96,27 +90,29 @@ class SimpleVirtProvisioningService( } } - override suspend fun drivers(): Set<VirtDriver> = withContext(coroutineContext) { - availableHypervisors.map { it.driver }.toSet() + override suspend fun drivers(): Set<VirtDriver> { + return availableHypervisors.map { it.driver }.toSet() } override suspend fun deploy( name: String, image: Image, flavor: Flavor - ): Server = withContext(coroutineContext) { - eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( - this@SimpleVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, - ++submittedVms, - runningVms, - finishedVms, - ++queuedVms, - unscheduledVms - )) - - suspendCancellableCoroutine<Server> { cont -> + ): Server { + eventFlow.emit( + VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + ++submittedVms, + runningVms, + finishedVms, + ++queuedVms, + unscheduledVms + ) + ) + + return suspendCancellableCoroutine<Server> { cont -> val vmInstance = ImageView(name, image, flavor, cont) incomingImages += vmInstance requestCycle() @@ -139,9 +135,9 @@ class SimpleVirtProvisioningService( // We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120). // This is important because the slices of the VMs need to be aligned. // We calculate here the delay until the next scheduling slot. - val delay = quantum - (ctx.clock.millis() % quantum) + val delay = quantum - (clock.millis() % quantum) - val call = launch { + val call = coroutineScope.launch { delay(delay) this@SimpleVirtProvisioningService.call = null schedule() @@ -150,7 +146,6 @@ class SimpleVirtProvisioningService( } private suspend fun schedule() { - val clock = simulationContext.clock val imagesToBeScheduled = incomingImages.toSet() for (imageInstance in imagesToBeScheduled) { @@ -159,16 +154,18 @@ class SimpleVirtProvisioningService( if (selectedHv == null) { if (requiredMemory > maxMemory || imageInstance.flavor.cpuCount > maxCores) { - eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( - this@SimpleVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, - submittedVms, - runningVms, - finishedVms, - queuedVms, - ++unscheduledVms - )) + eventFlow.emit( + VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + runningVms, + finishedVms, + queuedVms, + ++unscheduledVms + ) + ) incomingImages -= imageInstance @@ -180,7 +177,7 @@ class SimpleVirtProvisioningService( } try { - logger.info { "[${ctx.clock.millis()}] Spawning ${imageInstance.image} on ${selectedHv.server.uid} ${selectedHv.server.name} ${selectedHv.server.flavor}" } + logger.info { "[${clock.millis()}] Spawning ${imageInstance.image} on ${selectedHv.server.uid} ${selectedHv.server.name} ${selectedHv.server.flavor}" } incomingImages -= imageInstance // Speculatively update the hypervisor view information to prevent other images in the queue from @@ -197,16 +194,18 @@ class SimpleVirtProvisioningService( imageInstance.server = server imageInstance.continuation.resume(server) - eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( - this@SimpleVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, - submittedVms, - ++runningVms, - finishedVms, - --queuedVms, - unscheduledVms - )) + eventFlow.emit( + VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + ++runningVms, + finishedVms, + --queuedVms, + unscheduledVms + ) + ) activeImages += imageInstance server.events @@ -214,18 +213,20 @@ class SimpleVirtProvisioningService( when (event) { is ServerEvent.StateChanged -> { if (event.server.state == ServerState.SHUTOFF) { - logger.info { "[${ctx.clock.millis()}] Server ${event.server.uid} ${event.server.name} ${event.server.flavor} finished." } - - eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( - this@SimpleVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, - submittedVms, - --runningVms, - ++finishedVms, - queuedVms, - unscheduledVms - )) + logger.info { "[${clock.millis()}] Server ${event.server.uid} ${event.server.name} ${event.server.flavor} finished." } + + eventFlow.emit( + VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + --runningVms, + ++finishedVms, + queuedVms, + unscheduledVms + ) + ) activeImages -= imageInstance selectedHv.provisionedCores -= server.flavor.cpuCount @@ -238,7 +239,7 @@ class SimpleVirtProvisioningService( } } } - .launchIn(this) + .launchIn(coroutineScope) } catch (e: InsufficientMemoryOnServerException) { logger.error("Failed to deploy VM", e) @@ -254,7 +255,7 @@ class SimpleVirtProvisioningService( private fun stateChanged(server: Server) { when (server.state) { ServerState.ACTIVE -> { - logger.debug { "[${ctx.clock.millis()}] Server ${server.uid} available: ${server.state}" } + logger.debug { "[${clock.millis()}] Server ${server.uid} available: ${server.state}" } if (server in hypervisors) { // Corner case for when the hypervisor already exists @@ -272,16 +273,18 @@ class SimpleVirtProvisioningService( hypervisors[server] = hv } - eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( - this@SimpleVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, - submittedVms, - runningVms, - finishedVms, - queuedVms, - unscheduledVms - )) + eventFlow.emit( + VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + runningVms, + finishedVms, + queuedVms, + unscheduledVms + ) + ) // Re-schedule on the new machine if (incomingImages.isNotEmpty()) { @@ -289,20 +292,22 @@ class SimpleVirtProvisioningService( } } ServerState.SHUTOFF, ServerState.ERROR -> { - logger.debug { "[${ctx.clock.millis()}] Server ${server.uid} unavailable: ${server.state}" } + logger.debug { "[${clock.millis()}] Server ${server.uid} unavailable: ${server.state}" } val hv = hypervisors[server] ?: return availableHypervisors -= hv - eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( - this@SimpleVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, - submittedVms, - runningVms, - finishedVms, - queuedVms, - unscheduledVms - )) + eventFlow.emit( + VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + runningVms, + finishedVms, + queuedVms, + unscheduledVms + ) + ) if (incomingImages.isNotEmpty()) { requestCycle() @@ -318,16 +323,18 @@ class SimpleVirtProvisioningService( hv.driver = server.services[VirtDriver] availableHypervisors += hv - eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( - this@SimpleVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, - submittedVms, - runningVms, - finishedVms, - queuedVms, - unscheduledVms - )) + eventFlow.emit( + VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + runningVms, + finishedVms, + queuedVms, + unscheduledVms + ) + ) hv.driver.events .onEach { event -> @@ -335,7 +342,7 @@ class SimpleVirtProvisioningService( hv.numberOfActiveServers = event.numberOfActiveServers hv.availableMemory = event.availableMemory } - }.launchIn(this) + }.launchIn(coroutineScope) requestCycle() } diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt index 1c7b751c..417db77d 100644 --- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt +++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt @@ -24,10 +24,10 @@ package com.atlarge.opendc.compute.core.image -import java.util.UUID import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows +import java.util.UUID /** * Test suite for [FlopsApplicationImage] diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt index af9d3421..80c9c547 100644 --- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt +++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt @@ -31,8 +31,6 @@ import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.ServerEvent import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.image.FlopsApplicationImage -import java.util.ServiceLoader -import java.util.UUID import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach @@ -41,6 +39,8 @@ import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withContext import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test +import java.util.ServiceLoader +import java.util.UUID internal class SimpleBareMetalDriverTest { /** @@ -56,7 +56,7 @@ internal class SimpleBareMetalDriverTest { val dom = root.newDomain(name = "driver") val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4) val cpus = List(4) { ProcessingUnit(cpuNode, it, 2400.0) } - val driver = SimpleBareMetalDriver(dom, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) + val driver = SimpleBareMetalDriver(dom, simulationContext.clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1_000, 2) // Batch driver commands diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt index ed2256c0..37cd5898 100644 --- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt +++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt @@ -25,17 +25,18 @@ package com.atlarge.opendc.compute.metal.service import com.atlarge.odcsim.SimulationEngineProvider +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.image.FlopsApplicationImage import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver -import java.util.ServiceLoader -import java.util.UUID import kotlinx.coroutines.delay import kotlinx.coroutines.flow.collect import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.Test +import java.util.ServiceLoader +import java.util.UUID /** * Test suite for the [SimpleProvisioningService]. @@ -50,14 +51,15 @@ internal class SimpleProvisioningServiceTest { val system = provider("sim") val root = system.newDomain(name = "root") root.launch { + val clock = simulationContext.clock val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1000, 2) val dom = root.newDomain("provisioner") val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4) val cpus = List(4) { ProcessingUnit(cpuNode, it, 2400.0) } - val driver = SimpleBareMetalDriver(dom.newDomain(), UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) + val driver = SimpleBareMetalDriver(dom.newDomain(), clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) - val provisioner = SimpleProvisioningService(dom) + val provisioner = SimpleProvisioningService() provisioner.create(driver) delay(5) val nodes = provisioner.nodes() diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt index 622b185e..528434b1 100644 --- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt +++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt @@ -25,6 +25,7 @@ package com.atlarge.opendc.compute.virt import com.atlarge.odcsim.SimulationEngineProvider +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.ProcessingUnit @@ -33,8 +34,6 @@ import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment import com.atlarge.opendc.compute.core.image.VmImage import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver import com.atlarge.opendc.compute.virt.driver.VirtDriver -import java.util.ServiceLoader -import java.util.UUID import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.delay import kotlinx.coroutines.flow.launchIn @@ -45,6 +44,8 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Disabled import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll +import java.util.ServiceLoader +import java.util.UUID /** * Basic test-suite for the hypervisor. @@ -62,6 +63,7 @@ internal class HypervisorTest { val root = system.newDomain("root") root.launch { + val clock = simulationContext.clock val vmm = HypervisorImage val workloadA = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1_000, 1) val workloadB = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 2_000, 1) @@ -70,7 +72,7 @@ internal class HypervisorTest { val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 1) val cpus = List(1) { ProcessingUnit(cpuNode, it, 2000.0) } - val metalDriver = SimpleBareMetalDriver(driverDom, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) + val metalDriver = SimpleBareMetalDriver(driverDom, clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) metalDriver.init() metalDriver.setImage(vmm) @@ -108,26 +110,41 @@ internal class HypervisorTest { var overcommissionedBurst = 0L root.launch { + val clock = simulationContext.clock val vmm = HypervisorImage val duration = 5 * 60L - val vmImageA = VmImage(UUID.randomUUID(), "<unnamed>", emptyMap(), sequenceOf( - FlopsHistoryFragment(0, 28L * duration, duration * 1000, 28.0, 2), - FlopsHistoryFragment(0, 3500L * duration, duration * 1000, 3500.0, 2), - FlopsHistoryFragment(0, 0, duration * 1000, 0.0, 2), - FlopsHistoryFragment(0, 183L * duration, duration * 1000, 183.0, 2) - ), 2, 0) - val vmImageB = VmImage(UUID.randomUUID(), "<unnamed>", emptyMap(), sequenceOf( - FlopsHistoryFragment(0, 28L * duration, duration * 1000, 28.0, 2), - FlopsHistoryFragment(0, 3100L * duration, duration * 1000, 3100.0, 2), - FlopsHistoryFragment(0, 0, duration * 1000, 0.0, 2), - FlopsHistoryFragment(0, 73L * duration, duration * 1000, 73.0, 2) - ), 2, 0) + val vmImageA = VmImage( + UUID.randomUUID(), + "<unnamed>", + emptyMap(), + sequenceOf( + FlopsHistoryFragment(0, 28L * duration, duration * 1000, 28.0, 2), + FlopsHistoryFragment(0, 3500L * duration, duration * 1000, 3500.0, 2), + FlopsHistoryFragment(0, 0, duration * 1000, 0.0, 2), + FlopsHistoryFragment(0, 183L * duration, duration * 1000, 183.0, 2) + ), + 2, + 0 + ) + val vmImageB = VmImage( + UUID.randomUUID(), + "<unnamed>", + emptyMap(), + sequenceOf( + FlopsHistoryFragment(0, 28L * duration, duration * 1000, 28.0, 2), + FlopsHistoryFragment(0, 3100L * duration, duration * 1000, 3100.0, 2), + FlopsHistoryFragment(0, 0, duration * 1000, 0.0, 2), + FlopsHistoryFragment(0, 73L * duration, duration * 1000, 73.0, 2) + ), + 2, + 0 + ) val driverDom = root.newDomain("driver") val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) val cpus = List(2) { ProcessingUnit(cpuNode, it, 3200.0) } - val metalDriver = SimpleBareMetalDriver(driverDom, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) + val metalDriver = SimpleBareMetalDriver(driverDom, clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) metalDriver.init() metalDriver.setImage(vmm) diff --git a/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt b/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt index f77a581e..87d6b7bd 100644 --- a/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt +++ b/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt @@ -24,23 +24,20 @@ package com.atlarge.opendc.core.failure -import com.atlarge.odcsim.Domain -import com.atlarge.odcsim.simulationContext +import kotlinx.coroutines.* +import java.time.Clock import kotlin.math.exp import kotlin.math.max import kotlin.random.Random import kotlin.random.asJavaRandom -import kotlinx.coroutines.Job -import kotlinx.coroutines.delay -import kotlinx.coroutines.ensureActive -import kotlinx.coroutines.launch /** * A [FaultInjector] that injects fault in the system which are correlated to each other. Failures do not occur in * isolation, but will trigger other faults. */ public class CorrelatedFaultInjector( - private val domain: Domain, + private val coroutineScope: CoroutineScope, + private val clock: Clock, private val iatScale: Double, private val iatShape: Double, private val sizeScale: Double, @@ -72,7 +69,7 @@ public class CorrelatedFaultInjector( // Clean up the domain if it finishes domain.scope.coroutineContext[Job]!!.invokeOnCompletion { - this@CorrelatedFaultInjector.domain.launch { + this@CorrelatedFaultInjector.coroutineScope.launch { active -= domain if (active.isEmpty()) { @@ -86,7 +83,7 @@ public class CorrelatedFaultInjector( return } - job = this.domain.launch { + job = this.coroutineScope.launch { while (active.isNotEmpty()) { ensureActive() @@ -94,7 +91,7 @@ public class CorrelatedFaultInjector( val d = lognvariate(iatScale, iatShape) * 3.6e6 // Handle long overflow - if (simulationContext.clock.millis() + d <= 0) { + if (clock.millis() + d <= 0) { return@launch } @@ -111,7 +108,7 @@ public class CorrelatedFaultInjector( val df = max(lognvariate(dScale, dShape) * 6e4, 15 * 6e4) // Handle long overflow - if (simulationContext.clock.millis() + df <= 0) { + if (clock.millis() + df <= 0) { return@launch } diff --git a/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt b/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt index 0f62667f..1b896858 100644 --- a/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt +++ b/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt @@ -25,11 +25,11 @@ package com.atlarge.opendc.core.failure import com.atlarge.odcsim.simulationContext +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch import kotlin.math.ln1p import kotlin.math.pow import kotlin.random.Random -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch /** * A [FaultInjector] that injects uncorrelated faults into the system, meaning that failures of the subsystems are diff --git a/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt b/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt index 7659b18e..c7577824 100644 --- a/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt +++ b/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt @@ -38,9 +38,6 @@ import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelec import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy -import java.io.File -import java.util.ServiceLoader -import kotlin.math.max import kotlinx.coroutines.async import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay @@ -48,6 +45,9 @@ import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking +import java.io.File +import java.util.ServiceLoader +import kotlin.math.max /** * Main entry point of the experiment. @@ -68,10 +68,11 @@ fun main(args: Array<String>) { val schedulerDomain = system.newDomain(name = "scheduler") val schedulerAsync = schedulerDomain.async { val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/env/setup-test.json")) - .use { it.construct(system.newDomain("topology")) } + .use { it.construct(schedulerDomain) } StageWorkflowService( schedulerDomain, + simulationContext.clock, environment.platforms[0].zones[0].services[ProvisioningService], mode = WorkflowSchedulerMode.Batch(100), jobAdmissionPolicy = NullJobAdmissionPolicy, diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt index ec721ff0..cd85351e 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt @@ -41,9 +41,9 @@ import com.github.ajalt.clikt.parameters.options.required import com.github.ajalt.clikt.parameters.types.choice import com.github.ajalt.clikt.parameters.types.file import com.github.ajalt.clikt.parameters.types.int +import mu.KotlinLogging import java.io.File import java.io.InputStream -import mu.KotlinLogging /** * The logger for this experiment. diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt index b09c0dbb..3765f307 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt @@ -25,6 +25,7 @@ package com.atlarge.opendc.experiments.sc20.experiment import com.atlarge.odcsim.Domain +import com.atlarge.odcsim.SimulationContext import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.ServerEvent @@ -45,10 +46,6 @@ import com.atlarge.opendc.experiments.sc20.experiment.monitor.ExperimentMonitor import com.atlarge.opendc.experiments.sc20.trace.Sc20StreamingParquetTraceReader import com.atlarge.opendc.format.environment.EnvironmentReader import com.atlarge.opendc.format.trace.TraceReader -import java.io.File -import kotlin.math.ln -import kotlin.math.max -import kotlin.random.Random import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay @@ -59,6 +56,10 @@ import kotlinx.coroutines.flow.takeWhile import kotlinx.coroutines.launch import kotlinx.coroutines.withContext import mu.KotlinLogging +import java.io.File +import kotlin.math.ln +import kotlin.math.max +import kotlin.random.Random /** * The logger for this experiment. @@ -85,7 +86,7 @@ suspend fun createFailureDomain( val injector = injectors.getOrPut(cluster) { createFaultInjector( - simulationContext.domain, + simulationContext, random, failureInterval ) @@ -99,11 +100,12 @@ suspend fun createFailureDomain( /** * Obtain the [FaultInjector] to use for the experiments. */ -fun createFaultInjector(domain: Domain, random: Random, failureInterval: Double): FaultInjector { +fun createFaultInjector(simulationContext: SimulationContext, random: Random, failureInterval: Double): FaultInjector { // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 // GRID'5000 return CorrelatedFaultInjector( - domain, + simulationContext.domain, + simulationContext.clock, iatScale = ln(failureInterval), iatShape = 1.03, // Hours sizeScale = ln(2.0), sizeShape = ln(1.0), // Expect 2 machines, with variation of 1 dScale = ln(60.0), dShape = ln(60.0 * 8), // Minutes @@ -137,7 +139,7 @@ suspend fun createProvisioner( // Wait for the bare metal nodes to be spawned delay(10) - val scheduler = SimpleVirtProvisioningService(allocationPolicy, simulationContext, bareMetalProvisioner) + val scheduler = SimpleVirtProvisioningService(simulationContext.domain, simulationContext.clock, bareMetalProvisioner, allocationPolicy) // Wait for the hypervisors to be spawned delay(10) @@ -219,7 +221,8 @@ suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtP domain.launch { chan.send(Unit) val server = scheduler.deploy( - workload.image.name, workload.image, + workload.image.name, + workload.image, Flavor(workload.image.maxCores, workload.image.requiredMemory) ) // Monitor server events diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt index 1580e4dd..7b42b095 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt @@ -38,13 +38,13 @@ import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionC import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader -import java.io.File -import java.util.ServiceLoader -import kotlin.random.Random import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.launch import mu.KotlinLogging +import java.io.File +import java.util.ServiceLoader +import kotlin.random.Random /** * The logger for the experiment scenario. diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt index b931fef9..a06317cb 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt @@ -31,8 +31,8 @@ import com.atlarge.opendc.experiments.sc20.telemetry.HostEvent import com.atlarge.opendc.experiments.sc20.telemetry.ProvisionerEvent import com.atlarge.opendc.experiments.sc20.telemetry.parquet.ParquetHostEventWriter import com.atlarge.opendc.experiments.sc20.telemetry.parquet.ParquetProvisionerEventWriter -import java.io.File import mu.KotlinLogging +import java.io.File /** * The logger instance to use. diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt index a8ee59a8..ddd64560 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt @@ -25,12 +25,12 @@ package com.atlarge.opendc.experiments.sc20.runner.execution import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor -import java.util.concurrent.Executors import kotlinx.coroutines.asCoroutineDispatcher import kotlinx.coroutines.launch import kotlinx.coroutines.supervisorScope import kotlinx.coroutines.sync.Semaphore import kotlinx.coroutines.withContext +import java.util.concurrent.Executors /** * An [ExperimentScheduler] that runs experiments using a local thread pool. diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt index 28a19172..3b80276f 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt @@ -30,8 +30,8 @@ import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionC import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionListener import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionResult import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentScheduler -import java.util.concurrent.ConcurrentHashMap import kotlinx.coroutines.runBlocking +import java.util.concurrent.ConcurrentHashMap /** * The default implementation of the [ExperimentRunner] interface. diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt index afa21f93..0a310027 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt @@ -25,17 +25,17 @@ package com.atlarge.opendc.experiments.sc20.telemetry.parquet import com.atlarge.opendc.experiments.sc20.telemetry.Event -import java.io.Closeable -import java.io.File -import java.util.concurrent.ArrayBlockingQueue -import java.util.concurrent.BlockingQueue -import kotlin.concurrent.thread import mu.KotlinLogging import org.apache.avro.Schema import org.apache.avro.generic.GenericData import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName +import java.io.Closeable +import java.io.File +import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.BlockingQueue +import kotlin.concurrent.thread /** * The logging instance to use. diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt index 9fa4e0fb..3bc09435 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt @@ -25,10 +25,10 @@ package com.atlarge.opendc.experiments.sc20.telemetry.parquet import com.atlarge.opendc.experiments.sc20.telemetry.HostEvent -import java.io.File import org.apache.avro.Schema import org.apache.avro.SchemaBuilder import org.apache.avro.generic.GenericData +import java.io.File /** * A Parquet event writer for [HostEvent]s. diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt index 3d28860c..1f3b0472 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt @@ -25,10 +25,10 @@ package com.atlarge.opendc.experiments.sc20.telemetry.parquet import com.atlarge.opendc.experiments.sc20.telemetry.ProvisionerEvent -import java.io.File import org.apache.avro.Schema import org.apache.avro.SchemaBuilder import org.apache.avro.generic.GenericData +import java.io.File /** * A Parquet event writer for [ProvisionerEvent]s. diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt index 043e4670..98afe3b8 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt @@ -25,10 +25,10 @@ package com.atlarge.opendc.experiments.sc20.telemetry.parquet import com.atlarge.opendc.experiments.sc20.telemetry.RunEvent -import java.io.File import org.apache.avro.Schema import org.apache.avro.SchemaBuilder import org.apache.avro.generic.GenericData +import java.io.File /** * A Parquet event writer for [RunEvent]s. diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt index f9709b9f..f1c1dc25 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt @@ -30,12 +30,12 @@ import com.atlarge.opendc.compute.core.workload.VmWorkload import com.atlarge.opendc.core.User import com.atlarge.opendc.format.trace.TraceEntry import com.atlarge.opendc.format.trace.TraceReader -import java.io.File -import java.util.UUID import mu.KotlinLogging import org.apache.avro.generic.GenericData import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetReader +import java.io.File +import java.util.UUID private val logger = KotlinLogging.logger {} @@ -113,7 +113,8 @@ class Sc20RawParquetTraceReader(private val path: File) { val vmFragments = fragments.getValue(id).asSequence() val totalLoad = vmFragments.sumByDouble { it.usage } * 5 * 60 // avg MHz * duration = MFLOPs val vmWorkload = VmWorkload( - uid, id, + uid, + id, UnnamedUser, VmImage( uid, diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt index 8b7b222f..9fa33c3f 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt @@ -32,14 +32,6 @@ import com.atlarge.opendc.compute.core.workload.VmWorkload import com.atlarge.opendc.core.User import com.atlarge.opendc.format.trace.TraceEntry import com.atlarge.opendc.format.trace.TraceReader -import java.io.File -import java.io.Serializable -import java.util.SortedSet -import java.util.TreeSet -import java.util.UUID -import java.util.concurrent.ArrayBlockingQueue -import kotlin.concurrent.thread -import kotlin.random.Random import mu.KotlinLogging import org.apache.avro.generic.GenericData import org.apache.hadoop.fs.Path @@ -49,6 +41,14 @@ import org.apache.parquet.filter2.predicate.FilterApi import org.apache.parquet.filter2.predicate.Statistics import org.apache.parquet.filter2.predicate.UserDefinedPredicate import org.apache.parquet.io.api.Binary +import java.io.File +import java.io.Serializable +import java.util.SortedSet +import java.util.TreeSet +import java.util.UUID +import java.util.concurrent.ArrayBlockingQueue +import kotlin.concurrent.thread +import kotlin.random.Random private val logger = KotlinLogging.logger {} @@ -82,11 +82,14 @@ class Sc20StreamingParquetTraceReader( if (selectedVms.isEmpty()) null else - FilterCompat.get(FilterApi.userDefined(FilterApi.binaryColumn("id"), - SelectedVmFilter( - TreeSet(selectedVms) + FilterCompat.get( + FilterApi.userDefined( + FilterApi.binaryColumn("id"), + SelectedVmFilter( + TreeSet(selectedVms) + ) ) - )) + ) /** * A poisonous fragment. @@ -235,7 +238,8 @@ class Sc20StreamingParquetTraceReader( Random(random.nextInt()) ) val vmWorkload = VmWorkload( - uid, "VM Workload $id", + uid, + "VM Workload $id", UnnamedUser, VmImage( uid, diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt index 56ddbb6d..a2ce3109 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt @@ -37,12 +37,6 @@ import com.github.ajalt.clikt.parameters.options.required import com.github.ajalt.clikt.parameters.options.split import com.github.ajalt.clikt.parameters.types.file import com.github.ajalt.clikt.parameters.types.long -import java.io.BufferedReader -import java.io.File -import java.io.FileReader -import java.util.Random -import kotlin.math.max -import kotlin.math.min import me.tongfei.progressbar.ProgressBar import org.apache.avro.Schema import org.apache.avro.SchemaBuilder @@ -51,6 +45,12 @@ import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName +import java.io.BufferedReader +import java.io.File +import java.io.FileReader +import java.util.Random +import kotlin.math.max +import kotlin.math.min /** * Represents the command for converting traces diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt index a46bb3e6..3a2ed4b7 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt @@ -30,9 +30,9 @@ import com.atlarge.opendc.experiments.sc20.experiment.model.CompositeWorkload import com.atlarge.opendc.experiments.sc20.experiment.model.SamplingStrategy import com.atlarge.opendc.experiments.sc20.experiment.model.Workload import com.atlarge.opendc.format.trace.TraceEntry +import mu.KotlinLogging import java.util.* import kotlin.random.Random -import mu.KotlinLogging private val logger = KotlinLogging.logger {} diff --git a/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt b/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt index a79e9a5a..5ecf7605 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt @@ -42,8 +42,6 @@ import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader import com.atlarge.opendc.format.environment.EnvironmentReader import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader import com.atlarge.opendc.format.trace.TraceReader -import java.io.File -import java.util.ServiceLoader import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.launch @@ -54,6 +52,8 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll +import java.io.File +import java.util.ServiceLoader /** * An integration test suite for the SC20 experiments. diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt index 1c3f70e6..4c4dcf37 100644 --- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt +++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt @@ -24,8 +24,8 @@ package com.atlarge.opendc.format.environment -import com.atlarge.odcsim.Domain import com.atlarge.opendc.core.Environment +import kotlinx.coroutines.CoroutineScope import java.io.Closeable /** @@ -33,7 +33,7 @@ import java.io.Closeable */ interface EnvironmentReader : Closeable { /** - * Construct an [Environment] in the specified domain. + * Construct an [Environment] in the specified [CoroutineScope]. */ - suspend fun construct(dom: Domain): Environment + suspend fun construct(coroutineScope: CoroutineScope): Environment } diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt index a9aa3337..2b608aef 100644 --- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt +++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt @@ -24,7 +24,7 @@ package com.atlarge.opendc.format.environment.sc18 -import com.atlarge.odcsim.Domain +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.MemoryUnit import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.ProcessingUnit @@ -39,6 +39,7 @@ import com.atlarge.opendc.format.environment.EnvironmentReader import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue +import kotlinx.coroutines.CoroutineScope import java.io.InputStream import java.util.UUID @@ -55,8 +56,8 @@ class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb */ private val setup: Setup = mapper.readValue(input) - override suspend fun construct(dom: Domain): Environment { - val provisioningDomain = dom.newDomain("provisioner") + override suspend fun construct(coroutineScope: CoroutineScope): Environment { + val clock = simulationContext.clock var counter = 0 val nodes = setup.rooms.flatMap { room -> @@ -78,7 +79,8 @@ class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb } } SimpleBareMetalDriver( - dom.newDomain("node-$counter"), + coroutineScope, + clock, UUID.randomUUID(), "node-${counter++}", emptyMap(), @@ -91,14 +93,16 @@ class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb } } - val provisioningService = SimpleProvisioningService(provisioningDomain) + val provisioningService = SimpleProvisioningService() for (node in nodes) { provisioningService.create(node) } val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService) val platform = Platform( - UUID.randomUUID(), "sc18-platform", listOf( + UUID.randomUUID(), + "sc18-platform", + listOf( Zone(UUID.randomUUID(), "zone", serviceRegistry) ) ) diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt index e34ee2dc..49118675 100644 --- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt +++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt @@ -24,7 +24,7 @@ package com.atlarge.opendc.format.environment.sc20 -import com.atlarge.odcsim.Domain +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.MemoryUnit import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.ProcessingUnit @@ -38,6 +38,7 @@ import com.atlarge.opendc.core.Platform import com.atlarge.opendc.core.Zone import com.atlarge.opendc.core.services.ServiceRegistry import com.atlarge.opendc.format.environment.EnvironmentReader +import kotlinx.coroutines.CoroutineScope import java.io.File import java.io.FileInputStream import java.io.InputStream @@ -56,7 +57,9 @@ class Sc20ClusterEnvironmentReader( constructor(file: File) : this(FileInputStream(file)) @Suppress("BlockingMethodInNonBlockingContext") - override suspend fun construct(dom: Domain): Environment { + override suspend fun construct(coroutineScope: CoroutineScope): Environment { + val clock = simulationContext.clock + var clusterIdCol = 0 var speedCol = 0 var numberOfHostsCol = 0 @@ -105,7 +108,8 @@ class Sc20ClusterEnvironmentReader( repeat(numberOfHosts) { nodes.add( SimpleBareMetalDriver( - dom.newDomain("node-$clusterId-$it"), + coroutineScope, + clock, UUID(random.nextLong(), random.nextLong()), "node-$clusterId-$it", mapOf(NODE_CLUSTER to clusterId), @@ -123,7 +127,7 @@ class Sc20ClusterEnvironmentReader( } } - val provisioningService = SimpleProvisioningService(dom.newDomain("provisioner")) + val provisioningService = SimpleProvisioningService() for (node in nodes) { provisioningService.create(node) } @@ -131,7 +135,9 @@ class Sc20ClusterEnvironmentReader( val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService) val platform = Platform( - UUID.randomUUID(), "sc20-platform", listOf( + UUID.randomUUID(), + "sc20-platform", + listOf( Zone(UUID.randomUUID(), "zone", serviceRegistry) ) ) diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt index 4b5d6fb7..f22f595f 100644 --- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt +++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt @@ -24,7 +24,7 @@ package com.atlarge.opendc.format.environment.sc20 -import com.atlarge.odcsim.Domain +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.MemoryUnit import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.ProcessingUnit @@ -40,6 +40,7 @@ import com.atlarge.opendc.format.environment.EnvironmentReader import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue +import kotlinx.coroutines.CoroutineScope import java.io.InputStream import java.util.UUID @@ -55,7 +56,8 @@ class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb */ private val setup: Setup = mapper.readValue(input) - override suspend fun construct(dom: Domain): Environment { + override suspend fun construct(coroutineScope: CoroutineScope): Environment { + val clock = simulationContext.clock var counter = 0 val nodes = setup.rooms.flatMap { room -> room.objects.flatMap { roomObject -> @@ -82,7 +84,8 @@ class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb } } SimpleBareMetalDriver( - dom.newDomain("node-$counter"), + coroutineScope, + clock, UUID.randomUUID(), "node-${counter++}", emptyMap(), @@ -99,7 +102,7 @@ class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb } } - val provisioningService = SimpleProvisioningService(dom.newDomain("provisioner")) + val provisioningService = SimpleProvisioningService() for (node in nodes) { provisioningService.create(node) } @@ -107,7 +110,9 @@ class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService) val platform = Platform( - UUID.randomUUID(), "sc20-platform", listOf( + UUID.randomUUID(), + "sc20-platform", + listOf( Zone(UUID.randomUUID(), "zone", serviceRegistry) ) ) diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt index 1cabc8bc..6ee43b6a 100644 --- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt +++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt @@ -129,7 +129,9 @@ class BitbrainsTraceReader( ) val vmWorkload = VmWorkload( - uuid, "VM Workload $vmId", UnnamedUser, + uuid, + "VM Workload $vmId", + UnnamedUser, VmImage( uuid, vmId.toString(), diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReader.kt index 3a4e2e89..6db3975e 100644 --- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReader.kt +++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReader.kt @@ -120,7 +120,8 @@ class GwfTraceReader(reader: BufferedReader) : TraceReader<Job> { } val workflow = entry.workload val task = Task( - UUID(0L, taskId), "<unnamed>", + UUID(0L, taskId), + "<unnamed>", FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), flops, cores), HashSet(), mapOf(WORKFLOW_TASK_DEADLINE to runtime) @@ -136,9 +137,11 @@ class GwfTraceReader(reader: BufferedReader) : TraceReader<Job> { // Fix dependencies and dependents for all tasks taskDependencies.forEach { (task, dependencies) -> - (task.dependencies as MutableSet<Task>).addAll(dependencies.map { taskId -> - tasks[taskId] ?: throw IllegalArgumentException("Dependency task with id $taskId not found") - }) + (task.dependencies as MutableSet<Task>).addAll( + dependencies.map { taskId -> + tasks[taskId] ?: throw IllegalArgumentException("Dependency task with id $taskId not found") + } + ) } // Create the entry iterator diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt index 8e34505a..28dc7793 100644 --- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt +++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt @@ -164,7 +164,9 @@ class Sc20TraceReader( Random(random.nextInt()) ) val vmWorkload = VmWorkload( - uuid, "VM Workload $vmId", UnnamedUser, + uuid, + "VM Workload $vmId", + UnnamedUser, VmImage( uuid, vmId, diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReader.kt index 2f6ce238..f7c74562 100644 --- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReader.kt +++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReader.kt @@ -115,7 +115,11 @@ class SwfTraceReader( for (tick in submitTime until (submitTime + waitTime - sliceDuration) step sliceDuration) { flopsHistory.add( FlopsHistoryFragment( - tick * 1000L, 0L, sliceDuration * 1000L, 0.0, cores + tick * 1000L, + 0L, + sliceDuration * 1000L, + 0.0, + cores ) ) slicedWaitTime += sliceDuration @@ -129,12 +133,18 @@ class SwfTraceReader( flopsPartialSlice = flopsPerSecond * runtimePartialSliceRemainder flopsFullSlice = flopsPerSecond * runTime - flopsPartialSlice - for (tick in (submitTime + slicedWaitTime) - until (submitTime + slicedWaitTime + runTime - sliceDuration) - step sliceDuration) { + for ( + tick in (submitTime + slicedWaitTime) + until (submitTime + slicedWaitTime + runTime - sliceDuration) + step sliceDuration + ) { flopsHistory.add( FlopsHistoryFragment( - tick * 1000L, flopsFullSlice / sliceDuration, sliceDuration * 1000L, 1.0, cores + tick * 1000L, + flopsFullSlice / sliceDuration, + sliceDuration * 1000L, + 1.0, + cores ) ) } @@ -153,7 +163,9 @@ class SwfTraceReader( val uuid = UUID(0L, jobNumber) val vmWorkload = VmWorkload( - uuid, "SWF Workload $jobNumber", UnnamedUser, + uuid, + "SWF Workload $jobNumber", + UnnamedUser, VmImage( uuid, jobNumber.toString(), diff --git a/simulator/opendc/opendc-format/src/test/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReaderTest.kt b/simulator/opendc/opendc-format/src/test/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReaderTest.kt index 94e4b0fc..41ad8aba 100644 --- a/simulator/opendc/opendc-format/src/test/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReaderTest.kt +++ b/simulator/opendc/opendc-format/src/test/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReaderTest.kt @@ -1,8 +1,8 @@ package com.atlarge.opendc.format.trace.swf -import java.io.File import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test +import java.io.File class SwfTraceReaderTest { @Test diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt index 807c119e..9cfe5531 100644 --- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt +++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt @@ -22,13 +22,13 @@ import com.mongodb.client.MongoClients import com.mongodb.client.MongoCollection import com.mongodb.client.MongoDatabase import com.mongodb.client.model.Filters -import java.io.File -import java.util.* -import kotlin.random.Random import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel import mu.KotlinLogging import org.bson.Document +import java.io.File +import java.util.* +import kotlin.random.Random private val logger = KotlinLogging.logger {} diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt index 39092653..c0b0ac31 100644 --- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt +++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt @@ -1,11 +1,11 @@ package com.atlarge.opendc.runner.web -import java.io.File import org.apache.spark.sql.Column import org.apache.spark.sql.Dataset import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.* +import java.io.File /** * A helper class for processing the experiment results using Apache Spark. @@ -175,13 +175,19 @@ class ResultProcessor(private val master: String, private val outputPath: File) val sliceLength = 5 * 60 * 1000 val states = map( - lit("ERROR"), lit(1), - lit("ACTIVE"), lit(0), - lit("SHUTOFF"), lit(0) + lit("ERROR"), + lit(1), + lit("ACTIVE"), + lit(0), + lit("SHUTOFF"), + lit(0) ) val oppositeStates = map( - lit("ERROR"), lit(0), - lit("ACTIVE"), lit(1), - lit("SHUTOFF"), lit(1) + lit("ERROR"), + lit(0), + lit("ACTIVE"), + lit(1), + lit("SHUTOFF"), + lit(1) ) } diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt index 40ffd282..6ec4995d 100644 --- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt +++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt @@ -3,8 +3,8 @@ package com.atlarge.opendc.runner.web import com.mongodb.client.MongoCollection import com.mongodb.client.model.Filters import com.mongodb.client.model.Updates -import java.time.Instant import org.bson.Document +import java.time.Instant /** * Manages the queue of scenarios that need to be processed. diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt index 499585ec..ab683985 100644 --- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt +++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt @@ -1,6 +1,6 @@ package com.atlarge.opendc.runner.web -import com.atlarge.odcsim.Domain +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.MemoryUnit import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.ProcessingUnit @@ -20,9 +20,10 @@ import com.mongodb.client.model.Aggregates import com.mongodb.client.model.Field import com.mongodb.client.model.Filters import com.mongodb.client.model.Projections -import java.util.* +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.launch import org.bson.Document +import java.util.* /** * A helper class that converts the MongoDB topology into an OpenDC environment. @@ -31,7 +32,8 @@ class TopologyParser(private val collection: MongoCollection<Document>, private /** * Parse the topology with the specified [id]. */ - override suspend fun construct(dom: Domain): Environment { + override suspend fun construct(coroutineScope: CoroutineScope): Environment { + val clock = simulationContext.clock val nodes = mutableListOf<SimpleBareMetalDriver>() val random = Random(0) @@ -59,7 +61,8 @@ class TopologyParser(private val collection: MongoCollection<Document>, private } nodes.add( SimpleBareMetalDriver( - dom.newDomain(machineId), + coroutineScope, + clock, UUID(random.nextLong(), random.nextLong()), "node-$clusterId-$position", mapOf(NODE_CLUSTER to clusterId), @@ -73,8 +76,8 @@ class TopologyParser(private val collection: MongoCollection<Document>, private ) } - val provisioningService = SimpleProvisioningService(dom.newDomain("provisioner")) - dom.launch { + val provisioningService = SimpleProvisioningService() + coroutineScope.launch { for (node in nodes) { provisioningService.create(node) } @@ -83,7 +86,9 @@ class TopologyParser(private val collection: MongoCollection<Document>, private val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService) val platform = Platform( - UUID.randomUUID(), "opendc-platform", listOf( + UUID.randomUUID(), + "opendc-platform", + listOf( Zone(UUID.randomUUID(), "zone", serviceRegistry) ) ) diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt index 1193f7b2..aea27972 100644 --- a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt @@ -24,9 +24,7 @@ package com.atlarge.opendc.workflows.service -import com.atlarge.odcsim.Domain import com.atlarge.odcsim.flow.EventFlow -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerEvent import com.atlarge.opendc.compute.core.ServerState @@ -39,22 +37,23 @@ import com.atlarge.opendc.workflows.service.stage.resource.ResourceSelectionPoli import com.atlarge.opendc.workflows.service.stage.task.TaskEligibilityPolicy import com.atlarge.opendc.workflows.service.stage.task.TaskOrderPolicy import com.atlarge.opendc.workflows.workload.Job -import java.util.PriorityQueue -import java.util.Queue import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch -import kotlinx.coroutines.withContext +import java.time.Clock +import java.util.PriorityQueue +import java.util.Queue /** * A [WorkflowService] that distributes work through a multi-stage process based on the Reference Architecture for * Topology Scheduling. */ class StageWorkflowService( - private val domain: Domain, + internal val coroutineScope: CoroutineScope, + internal val clock: Clock, private val provisioningService: ProvisioningService, mode: WorkflowSchedulerMode, jobAdmissionPolicy: JobAdmissionPolicy, @@ -63,7 +62,7 @@ class StageWorkflowService( taskOrderPolicy: TaskOrderPolicy, resourceFilterPolicy: ResourceFilterPolicy, resourceSelectionPolicy: ResourceSelectionPolicy -) : WorkflowService, CoroutineScope by domain { +) : WorkflowService { /** * The incoming jobs ready to be processed by the scheduler. @@ -175,7 +174,7 @@ class StageWorkflowService( private val eventFlow = EventFlow<WorkflowEvent>() init { - domain.launch { + coroutineScope.launch { nodes = provisioningService.nodes().toList() available.addAll(nodes) } @@ -191,9 +190,9 @@ class StageWorkflowService( override val events: Flow<WorkflowEvent> = eventFlow - override suspend fun submit(job: Job) = withContext(domain.coroutineContext) { + override suspend fun submit(job: Job) { // J1 Incoming Jobs - val jobInstance = JobState(job, simulationContext.clock.millis()) + val jobInstance = JobState(job, clock.millis()) val instances = job.tasks.associateWith { TaskState(jobInstance, it) } @@ -241,7 +240,7 @@ class StageWorkflowService( iterator.remove() jobQueue.add(jobInstance) activeJobs += jobInstance - eventFlow.emit(WorkflowEvent.JobStarted(this, jobInstance.job, simulationContext.clock.millis())) + eventFlow.emit(WorkflowEvent.JobStarted(this, jobInstance.job, clock.millis())) rootListener.jobStarted(jobInstance) } @@ -295,7 +294,7 @@ class StageWorkflowService( taskByServer[server] = instance server.events .onEach { event -> if (event is ServerEvent.StateChanged) stateChanged(event.server) } - .launchIn(this) + .launchIn(coroutineScope) activeTasks += instance taskQueue.poll() @@ -310,19 +309,19 @@ class StageWorkflowService( when (server.state) { ServerState.ACTIVE -> { val task = taskByServer.getValue(server) - task.startedAt = simulationContext.clock.millis() - eventFlow.emit(WorkflowEvent.TaskStarted(this@StageWorkflowService, task.job.job, task.task, simulationContext.clock.millis())) + task.startedAt = clock.millis() + eventFlow.emit(WorkflowEvent.TaskStarted(this@StageWorkflowService, task.job.job, task.task, clock.millis())) rootListener.taskStarted(task) } ServerState.SHUTOFF, ServerState.ERROR -> { val task = taskByServer.remove(server) ?: throw IllegalStateException() val job = task.job task.state = TaskStatus.FINISHED - task.finishedAt = simulationContext.clock.millis() + task.finishedAt = clock.millis() job.tasks.remove(task) available += task.host!! activeTasks -= task - eventFlow.emit(WorkflowEvent.TaskFinished(this@StageWorkflowService, task.job.job, task.task, simulationContext.clock.millis())) + eventFlow.emit(WorkflowEvent.TaskFinished(this@StageWorkflowService, task.job.job, task.task, clock.millis())) rootListener.taskFinished(task) // Add job roots to the scheduling queue @@ -347,7 +346,7 @@ class StageWorkflowService( private suspend fun finishJob(job: JobState) { activeJobs -= job - eventFlow.emit(WorkflowEvent.JobFinished(this, job.job, simulationContext.clock.millis())) + eventFlow.emit(WorkflowEvent.JobFinished(this, job.job, clock.millis())) rootListener.jobFinished(job) } diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt index 776f0b07..cb075b18 100644 --- a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt @@ -24,7 +24,6 @@ package com.atlarge.opendc.workflows.service -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.workflows.service.stage.StagePolicy import kotlinx.coroutines.delay import kotlinx.coroutines.launch @@ -66,13 +65,12 @@ sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Logic> { override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic { override suspend fun requestCycle() { - val ctx = simulationContext if (next == null) { // In batch mode, we assume that the scheduler runs at a fixed slot every time // quantum (e.g t=0, t=60, t=120). We calculate here the delay until the next scheduling slot. - val delay = quantum - (ctx.clock.millis() % quantum) + val delay = quantum - (scheduler.clock.millis() % quantum) - val job = ctx.domain.launch { + val job = scheduler.coroutineScope.launch { delay(delay) next = null scheduler.schedule() @@ -93,11 +91,10 @@ sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Logic> { override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic { override suspend fun requestCycle() { - val ctx = simulationContext if (next == null) { val delay = random.nextInt(200).toLong() - val job = ctx.domain.launch { + val job = scheduler.coroutineScope.launch { delay(delay) next = null scheduler.schedule() diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt index a60ba0e2..ad818dde 100644 --- a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt @@ -26,8 +26,8 @@ package com.atlarge.opendc.workflows.service import com.atlarge.opendc.core.services.AbstractServiceKey import com.atlarge.opendc.workflows.workload.Job -import java.util.UUID import kotlinx.coroutines.flow.Flow +import java.util.UUID /** * A service for cloud workflow management. diff --git a/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt b/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt index 5c129e37..655d8e1d 100644 --- a/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt +++ b/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt @@ -35,8 +35,6 @@ import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelec import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy -import java.util.ServiceLoader -import kotlin.math.max import kotlinx.coroutines.async import kotlinx.coroutines.delay import kotlinx.coroutines.flow.collect @@ -46,6 +44,8 @@ import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test +import java.util.ServiceLoader +import kotlin.math.max /** * Integration test suite for the [StageWorkflowService]. @@ -68,11 +68,13 @@ internal class StageWorkflowSchedulerIntegrationTest { val schedulerDomain = system.newDomain(name = "scheduler") val schedulerAsync = schedulerDomain.async { + val clock = simulationContext.clock val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json")) - .use { it.construct(system.newDomain("topology")) } + .use { it.construct(schedulerDomain) } StageWorkflowService( schedulerDomain, + clock, environment.platforms[0].zones[0].services[ProvisioningService], mode = WorkflowSchedulerMode.Batch(100), jobAdmissionPolicy = NullJobAdmissionPolicy, |
