diff options
54 files changed, 446 insertions, 356 deletions
diff --git a/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationContext.kt b/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationContext.kt index c51d1d8b..3423d43a 100644 --- a/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationContext.kt +++ b/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationContext.kt @@ -24,10 +24,10 @@ package com.atlarge.odcsim +import org.slf4j.Logger import java.time.Clock import kotlin.coroutines.CoroutineContext import kotlin.coroutines.coroutineContext -import org.slf4j.Logger /** * Represents the execution context of a simulation domain. diff --git a/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/EventFlow.kt b/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/EventFlow.kt index 0e18f82f..5d9af9ec 100644 --- a/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/EventFlow.kt +++ b/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/EventFlow.kt @@ -24,7 +24,6 @@ package com.atlarge.odcsim.flow -import java.util.WeakHashMap import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.InternalCoroutinesApi @@ -33,6 +32,7 @@ import kotlinx.coroutines.channels.SendChannel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.FlowCollector import kotlinx.coroutines.flow.consumeAsFlow +import java.util.WeakHashMap /** * A [Flow] that can be used to emit events. diff --git a/simulator/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt b/simulator/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt index e675b877..e4d0f4ac 100644 --- a/simulator/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt +++ b/simulator/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt @@ -28,13 +28,6 @@ import com.atlarge.odcsim.Domain import com.atlarge.odcsim.SimulationContext import com.atlarge.odcsim.SimulationEngine import com.atlarge.odcsim.engine.omega.logging.LoggerImpl -import java.time.Clock -import java.time.Instant -import java.time.ZoneId -import java.util.PriorityQueue -import java.util.UUID -import kotlin.coroutines.CoroutineContext -import kotlin.coroutines.coroutineContext import kotlinx.coroutines.CancellableContinuation import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineExceptionHandler @@ -48,6 +41,13 @@ import kotlinx.coroutines.Runnable import kotlinx.coroutines.SupervisorJob import org.jetbrains.annotations.Async import org.slf4j.Logger +import java.time.Clock +import java.time.Instant +import java.time.ZoneId +import java.util.PriorityQueue +import java.util.UUID +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.coroutineContext /** * The reference implementation of the [SimulationEngine] instance for the OpenDC simulation core. @@ -71,12 +71,14 @@ public class OmegaSimulationEngine(override val name: String) : SimulationEngine /** * The event queue to process */ - private val queue: PriorityQueue<Event> = PriorityQueue(Comparator<Event> { lhs, rhs -> - // Note that Comparator gives better performance than Comparable according to - // profiling - val cmp = lhs.time.compareTo(rhs.time) - if (cmp == 0) lhs.id.compareTo(rhs.id) else cmp - }) + private val queue: PriorityQueue<Event> = PriorityQueue( + Comparator<Event> { lhs, rhs -> + // Note that Comparator gives better performance than Comparable according to +// profiling + val cmp = lhs.time.compareTo(rhs.time) + if (cmp == 0) lhs.id.compareTo(rhs.id) else cmp + } + ) /** * The active processes in the simulation engine. diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt index fd0fc836..01968cd8 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt @@ -28,8 +28,8 @@ import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.core.resource.Resource import com.atlarge.opendc.core.resource.TagContainer import com.atlarge.opendc.core.services.ServiceRegistry -import java.util.UUID import kotlinx.coroutines.flow.Flow +import java.util.UUID /** * A server instance that is running on some physical or virtual machine. diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt index f770fa49..817bee4b 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt @@ -30,12 +30,18 @@ import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.core.services.ServiceKey import kotlinx.coroutines.selects.SelectClause0 import kotlinx.coroutines.selects.select +import java.time.Clock /** * Represents the execution context in which a bootable [Image] runs on a [Server]. */ public interface ServerContext { /** + * The virtual clock. + */ + public val clock: Clock + + /** * The server on which the image runs. */ public val server: Server diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt index c615d865..0e1af093 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt @@ -1,6 +1,5 @@ package com.atlarge.opendc.compute.core.image -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.execution.ServerContext import com.atlarge.opendc.core.resource.TagContainer import java.util.UUID @@ -16,8 +15,7 @@ class VmImage( ) : Image { override suspend fun invoke(ctx: ServerContext) { - val clock = simulationContext.clock - var offset = clock.millis() + var offset = ctx.clock.millis() val batch = flopsHistory.map { fragment -> val cores = min(fragment.cores, ctx.server.flavor.cpuCount) diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt index cb637aea..7cb4c0c5 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt @@ -27,8 +27,8 @@ package com.atlarge.opendc.compute.metal import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.core.Identity -import java.util.UUID import kotlinx.coroutines.flow.Flow +import java.util.UUID /** * A bare-metal compute node. diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt index 17d8ee53..41cec291 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt @@ -30,8 +30,8 @@ import com.atlarge.opendc.compute.metal.Node import com.atlarge.opendc.core.failure.FailureDomain import com.atlarge.opendc.core.power.Powerable import com.atlarge.opendc.core.services.AbstractServiceKey -import java.util.UUID import kotlinx.coroutines.flow.Flow +import java.util.UUID /** * A driver interface for the management interface of a bare-metal compute node. diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt index a453e459..c118cc3d 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt @@ -24,8 +24,6 @@ package com.atlarge.opendc.compute.metal.driver -import com.atlarge.odcsim.Domain -import com.atlarge.odcsim.SimulationContext import com.atlarge.odcsim.flow.EventFlow import com.atlarge.odcsim.flow.StateFlow import com.atlarge.opendc.compute.core.Flavor @@ -46,14 +44,6 @@ import com.atlarge.opendc.compute.metal.power.ConstantPowerModel import com.atlarge.opendc.core.power.PowerModel import com.atlarge.opendc.core.services.ServiceKey import com.atlarge.opendc.core.services.ServiceRegistry -import java.lang.Exception -import java.time.Clock -import java.util.UUID -import kotlin.coroutines.ContinuationInterceptor -import kotlin.math.ceil -import kotlin.math.max -import kotlin.math.min -import kotlin.random.Random import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Delay import kotlinx.coroutines.DisposableHandle @@ -67,12 +57,20 @@ import kotlinx.coroutines.intrinsics.startCoroutineCancellable import kotlinx.coroutines.launch import kotlinx.coroutines.selects.SelectClause0 import kotlinx.coroutines.selects.SelectInstance -import kotlinx.coroutines.withContext +import java.lang.Exception +import java.time.Clock +import java.util.UUID +import kotlin.coroutines.ContinuationInterceptor +import kotlin.math.ceil +import kotlin.math.max +import kotlin.math.min +import kotlin.random.Random /** * A basic implementation of the [BareMetalDriver] that simulates an [Image] running on a bare-metal machine. * - * @param domain The simulation domain the driver runs in. + * @param coroutineScope The [CoroutineScope] the driver runs in. + * @param clock The virtual clock to keep track of time. * @param uid The unique identifier of the machine. * @param name An optional name of the machine. * @param metadata The initial metadata of the node. @@ -82,7 +80,8 @@ import kotlinx.coroutines.withContext */ @OptIn(ExperimentalCoroutinesApi::class) public class SimpleBareMetalDriver( - private val domain: Domain, + private val coroutineScope: CoroutineScope, + private val clock: Clock, uid: UUID, name: String, metadata: Map<String, Any>, @@ -129,14 +128,14 @@ public class SimpleBareMetalDriver( */ private val random = Random(uid.leastSignificantBits xor uid.mostSignificantBits) - override suspend fun init(): Node = withContext(domain.coroutineContext) { - nodeState.value + override suspend fun init(): Node { + return nodeState.value } - override suspend fun start(): Node = withContext(domain.coroutineContext) { + override suspend fun start(): Node { val node = nodeState.value if (node.state != NodeState.SHUTOFF) { - return@withContext node + return node } val events = EventFlow<ServerEvent>() @@ -153,13 +152,13 @@ public class SimpleBareMetalDriver( setNode(node.copy(state = NodeState.BOOT, server = server)) serverContext = BareMetalServerContext(events) - return@withContext nodeState.value + return nodeState.value } - override suspend fun stop(): Node = withContext(domain.coroutineContext) { + override suspend fun stop(): Node { val node = nodeState.value if (node.state == NodeState.SHUTOFF) { - return@withContext node + return node } // We terminate the image running on the machine @@ -167,20 +166,20 @@ public class SimpleBareMetalDriver( serverContext = null setNode(node.copy(state = NodeState.SHUTOFF, server = null)) - return@withContext node + return node } - override suspend fun reboot(): Node = withContext(domain.coroutineContext) { + override suspend fun reboot(): Node { stop() - start() + return start() } - override suspend fun setImage(image: Image): Node = withContext(domain.coroutineContext) { + override suspend fun setImage(image: Image): Node { setNode(nodeState.value.copy(image = image)) - return@withContext nodeState.value + return nodeState.value } - override suspend fun refresh(): Node = withContext(domain.coroutineContext) { nodeState.value } + override suspend fun refresh(): Node = nodeState.value private fun setNode(value: Node) { val field = nodeState.value @@ -207,7 +206,10 @@ public class SimpleBareMetalDriver( override val server: Server get() = nodeState.value.server!! - private val job = domain.launch { + override val clock: Clock + get() = this@SimpleBareMetalDriver.clock + + private val job = coroutineScope.launch { delay(1) // TODO Introduce boot time init() try { @@ -265,18 +267,13 @@ public class SimpleBareMetalDriver( private var usageFlush: DisposableHandle? = null /** - * Cache the [Clock] for timing. - */ - private val clock = domain.coroutineContext[SimulationContext]!!.clock - - /** * Cache the [Delay] instance for timing. * * XXX We need to cache this before the call to [onRun] since doing this in [onRun] is too heavy. * XXX Note however that this is an ugly hack which may break in the future. */ @OptIn(InternalCoroutinesApi::class) - private val delay = domain.coroutineContext[ContinuationInterceptor] as Delay + private val delay = coroutineScope.coroutineContext[ContinuationInterceptor] as Delay @OptIn(InternalCoroutinesApi::class) override fun onRun( @@ -353,10 +350,10 @@ public class SimpleBareMetalDriver( currentDisposable?.dispose() // Schedule reset the usage of the machine since the call is returning - usageFlush = delay.invokeOnTimeout(1, Runnable { + usageFlush = delay.invokeOnTimeout(1) { usageState.value = 0.0 usageFlush = null - }) + } } select.disposeOnSelect(disposable) @@ -458,7 +455,7 @@ public class SimpleBareMetalDriver( } override val scope: CoroutineScope - get() = domain + get() = coroutineScope override suspend fun fail() { serverContext?.unavailable = true diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt index f6b236ae..f64f9b5a 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt @@ -24,44 +24,41 @@ package com.atlarge.opendc.compute.metal.service -import com.atlarge.odcsim.Domain import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.metal.Node import com.atlarge.opendc.compute.metal.driver.BareMetalDriver import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.withContext /** * A very basic implementation of the [ProvisioningService]. */ -public class SimpleProvisioningService(val domain: Domain) : ProvisioningService { +public class SimpleProvisioningService : ProvisioningService { /** * The active nodes in this service. */ private val nodes: MutableMap<Node, BareMetalDriver> = mutableMapOf() - override suspend fun create(driver: BareMetalDriver): Node = withContext(domain.coroutineContext) { + override suspend fun create(driver: BareMetalDriver): Node { val node = driver.init() nodes[node] = driver - return@withContext node + return node } - override suspend fun nodes(): Set<Node> = withContext(domain.coroutineContext) { nodes.keys } + override suspend fun nodes(): Set<Node> = nodes.keys - override suspend fun refresh(node: Node): Node = withContext(domain.coroutineContext) { - return@withContext nodes[node]!!.refresh() + override suspend fun refresh(node: Node): Node { + return nodes[node]!!.refresh() } - override suspend fun deploy(node: Node, image: Image): Node = withContext(domain.coroutineContext) { + override suspend fun deploy(node: Node, image: Image): Node { val driver = nodes[node]!! driver.setImage(image) - val newNode = driver.reboot() - return@withContext newNode + return driver.reboot() } - override suspend fun stop(node: Node): Node = withContext(domain.coroutineContext) { + override suspend fun stop(node: Node): Node { val driver = nodes[node]!! - try { + return try { driver.stop() } catch (e: CancellationException) { node diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt index 1e7e351f..69b0124d 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt @@ -25,8 +25,8 @@ package com.atlarge.opendc.compute.virt import com.atlarge.opendc.core.Identity -import java.util.UUID import kotlinx.coroutines.flow.Flow +import java.util.UUID /** * A hypervisor (or virtual machine monitor) is software or firmware that virtualizes the host compute environment diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt index 607759a8..bd395f0d 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt @@ -29,9 +29,9 @@ import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver import com.atlarge.opendc.compute.virt.driver.VirtDriver import com.atlarge.opendc.core.resource.TagContainer -import java.util.UUID import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.suspendCancellableCoroutine +import java.util.UUID /** * A hypervisor managing the VMs of a node. diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt index 192db413..df45f440 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt @@ -25,7 +25,6 @@ package com.atlarge.opendc.compute.virt.driver import com.atlarge.odcsim.flow.EventFlow -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.Server @@ -40,10 +39,6 @@ import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel import com.atlarge.opendc.compute.virt.HypervisorEvent import com.atlarge.opendc.core.services.ServiceKey import com.atlarge.opendc.core.services.ServiceRegistry -import java.util.UUID -import kotlin.math.ceil -import kotlin.math.max -import kotlin.math.min import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.DisposableHandle @@ -59,6 +54,17 @@ import kotlinx.coroutines.launch import kotlinx.coroutines.selects.SelectClause0 import kotlinx.coroutines.selects.SelectInstance import kotlinx.coroutines.selects.select +import mu.KotlinLogging +import java.time.Clock +import java.util.UUID +import kotlin.math.ceil +import kotlin.math.max +import kotlin.math.min + +/** + * The logging instance to use. + */ +private val logger = KotlinLogging.logger {} /** * A [VirtDriver] that is backed by a simple hypervisor implementation. @@ -97,7 +103,7 @@ class SimpleVirtDriver( scheduler() } catch (e: Exception) { if (e !is CancellationException) { - simulationContext.log.error("Hypervisor scheduler failed", e) + logger.error("Hypervisor scheduler failed", e) } throw e } @@ -117,8 +123,14 @@ class SimpleVirtDriver( val events = EventFlow<ServerEvent>() val server = Server( - UUID.randomUUID(), name, emptyMap(), flavor, image, ServerState.BUILD, - ServiceRegistry(), events + UUID.randomUUID(), + name, + emptyMap(), + flavor, + image, + ServerState.BUILD, + ServiceRegistry(), + events ) availableMemory -= requiredMemory vms.add(VmServerContext(server, events)) @@ -181,7 +193,7 @@ class SimpleVirtDriver( * The scheduling process of the hypervisor. */ private suspend fun scheduler() { - val clock = simulationContext.clock + val clock = hostContext.clock val maxUsage = hostContext.cpus.sumByDouble { it.frequency } val pCPUs = hostContext.cpus.indices.sortedBy { hostContext.cpus[it].frequency } @@ -561,6 +573,9 @@ class SimpleVirtDriver( override val cpus: List<ProcessingUnit> = hostContext.cpus.take(server.flavor.cpuCount) + override val clock: Clock + get() = hostContext.clock + init { vm = Vm(this) } diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt index b1844f67..1002d382 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt @@ -29,8 +29,8 @@ import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.virt.HypervisorEvent import com.atlarge.opendc.core.services.AbstractServiceKey -import java.util.UUID import kotlinx.coroutines.flow.Flow +import java.util.UUID /** * A driver interface for a hypervisor running on some host server and communicating with the central compute service to diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt index 79388bc3..6b2cfc40 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt @@ -1,8 +1,6 @@ package com.atlarge.opendc.compute.virt.service -import com.atlarge.odcsim.SimulationContext import com.atlarge.odcsim.flow.EventFlow -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerEvent @@ -16,29 +14,25 @@ import com.atlarge.opendc.compute.virt.driver.InsufficientMemoryOnServerExceptio import com.atlarge.opendc.compute.virt.driver.VirtDriver import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy import com.atlarge.opendc.core.services.ServiceKey -import kotlin.coroutines.Continuation -import kotlin.coroutines.resume -import kotlin.math.max -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.Job -import kotlinx.coroutines.delay +import kotlinx.coroutines.* import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.launch -import kotlinx.coroutines.suspendCancellableCoroutine -import kotlinx.coroutines.withContext import mu.KotlinLogging +import java.time.Clock +import kotlin.coroutines.Continuation +import kotlin.coroutines.resume +import kotlin.math.max private val logger = KotlinLogging.logger {} @OptIn(ExperimentalCoroutinesApi::class) class SimpleVirtProvisioningService( - public override val allocationPolicy: AllocationPolicy, - private val ctx: SimulationContext, - private val provisioningService: ProvisioningService -) : VirtProvisioningService, CoroutineScope by ctx.domain { + private val coroutineScope: CoroutineScope, + private val clock: Clock, + private val provisioningService: ProvisioningService, + override val allocationPolicy: AllocationPolicy +) : VirtProvisioningService { /** * The hypervisors that have been launched by the service. */ @@ -59,11 +53,11 @@ class SimpleVirtProvisioningService( */ private val activeImages: MutableSet<ImageView> = mutableSetOf() - public var submittedVms = 0 - public var queuedVms = 0 - public var runningVms = 0 - public var finishedVms = 0 - public var unscheduledVms = 0 + var submittedVms = 0 + var queuedVms = 0 + var runningVms = 0 + var finishedVms = 0 + var unscheduledVms = 0 private var maxCores = 0 private var maxMemory = 0L @@ -81,7 +75,7 @@ class SimpleVirtProvisioningService( override val events: Flow<VirtProvisioningEvent> = eventFlow init { - launch { + coroutineScope.launch { val provisionedNodes = provisioningService.nodes() provisionedNodes.forEach { node -> val hypervisorImage = HypervisorImage @@ -96,27 +90,29 @@ class SimpleVirtProvisioningService( } } - override suspend fun drivers(): Set<VirtDriver> = withContext(coroutineContext) { - availableHypervisors.map { it.driver }.toSet() + override suspend fun drivers(): Set<VirtDriver> { + return availableHypervisors.map { it.driver }.toSet() } override suspend fun deploy( name: String, image: Image, flavor: Flavor - ): Server = withContext(coroutineContext) { - eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( - this@SimpleVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, - ++submittedVms, - runningVms, - finishedVms, - ++queuedVms, - unscheduledVms - )) - - suspendCancellableCoroutine<Server> { cont -> + ): Server { + eventFlow.emit( + VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + ++submittedVms, + runningVms, + finishedVms, + ++queuedVms, + unscheduledVms + ) + ) + + return suspendCancellableCoroutine<Server> { cont -> val vmInstance = ImageView(name, image, flavor, cont) incomingImages += vmInstance requestCycle() @@ -139,9 +135,9 @@ class SimpleVirtProvisioningService( // We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120). // This is important because the slices of the VMs need to be aligned. // We calculate here the delay until the next scheduling slot. - val delay = quantum - (ctx.clock.millis() % quantum) + val delay = quantum - (clock.millis() % quantum) - val call = launch { + val call = coroutineScope.launch { delay(delay) this@SimpleVirtProvisioningService.call = null schedule() @@ -150,7 +146,6 @@ class SimpleVirtProvisioningService( } private suspend fun schedule() { - val clock = simulationContext.clock val imagesToBeScheduled = incomingImages.toSet() for (imageInstance in imagesToBeScheduled) { @@ -159,16 +154,18 @@ class SimpleVirtProvisioningService( if (selectedHv == null) { if (requiredMemory > maxMemory || imageInstance.flavor.cpuCount > maxCores) { - eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( - this@SimpleVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, - submittedVms, - runningVms, - finishedVms, - queuedVms, - ++unscheduledVms - )) + eventFlow.emit( + VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + runningVms, + finishedVms, + queuedVms, + ++unscheduledVms + ) + ) incomingImages -= imageInstance @@ -180,7 +177,7 @@ class SimpleVirtProvisioningService( } try { - logger.info { "[${ctx.clock.millis()}] Spawning ${imageInstance.image} on ${selectedHv.server.uid} ${selectedHv.server.name} ${selectedHv.server.flavor}" } + logger.info { "[${clock.millis()}] Spawning ${imageInstance.image} on ${selectedHv.server.uid} ${selectedHv.server.name} ${selectedHv.server.flavor}" } incomingImages -= imageInstance // Speculatively update the hypervisor view information to prevent other images in the queue from @@ -197,16 +194,18 @@ class SimpleVirtProvisioningService( imageInstance.server = server imageInstance.continuation.resume(server) - eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( - this@SimpleVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, - submittedVms, - ++runningVms, - finishedVms, - --queuedVms, - unscheduledVms - )) + eventFlow.emit( + VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + ++runningVms, + finishedVms, + --queuedVms, + unscheduledVms + ) + ) activeImages += imageInstance server.events @@ -214,18 +213,20 @@ class SimpleVirtProvisioningService( when (event) { is ServerEvent.StateChanged -> { if (event.server.state == ServerState.SHUTOFF) { - logger.info { "[${ctx.clock.millis()}] Server ${event.server.uid} ${event.server.name} ${event.server.flavor} finished." } - - eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( - this@SimpleVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, - submittedVms, - --runningVms, - ++finishedVms, - queuedVms, - unscheduledVms - )) + logger.info { "[${clock.millis()}] Server ${event.server.uid} ${event.server.name} ${event.server.flavor} finished." } + + eventFlow.emit( + VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + --runningVms, + ++finishedVms, + queuedVms, + unscheduledVms + ) + ) activeImages -= imageInstance selectedHv.provisionedCores -= server.flavor.cpuCount @@ -238,7 +239,7 @@ class SimpleVirtProvisioningService( } } } - .launchIn(this) + .launchIn(coroutineScope) } catch (e: InsufficientMemoryOnServerException) { logger.error("Failed to deploy VM", e) @@ -254,7 +255,7 @@ class SimpleVirtProvisioningService( private fun stateChanged(server: Server) { when (server.state) { ServerState.ACTIVE -> { - logger.debug { "[${ctx.clock.millis()}] Server ${server.uid} available: ${server.state}" } + logger.debug { "[${clock.millis()}] Server ${server.uid} available: ${server.state}" } if (server in hypervisors) { // Corner case for when the hypervisor already exists @@ -272,16 +273,18 @@ class SimpleVirtProvisioningService( hypervisors[server] = hv } - eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( - this@SimpleVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, - submittedVms, - runningVms, - finishedVms, - queuedVms, - unscheduledVms - )) + eventFlow.emit( + VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + runningVms, + finishedVms, + queuedVms, + unscheduledVms + ) + ) // Re-schedule on the new machine if (incomingImages.isNotEmpty()) { @@ -289,20 +292,22 @@ class SimpleVirtProvisioningService( } } ServerState.SHUTOFF, ServerState.ERROR -> { - logger.debug { "[${ctx.clock.millis()}] Server ${server.uid} unavailable: ${server.state}" } + logger.debug { "[${clock.millis()}] Server ${server.uid} unavailable: ${server.state}" } val hv = hypervisors[server] ?: return availableHypervisors -= hv - eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( - this@SimpleVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, - submittedVms, - runningVms, - finishedVms, - queuedVms, - unscheduledVms - )) + eventFlow.emit( + VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + runningVms, + finishedVms, + queuedVms, + unscheduledVms + ) + ) if (incomingImages.isNotEmpty()) { requestCycle() @@ -318,16 +323,18 @@ class SimpleVirtProvisioningService( hv.driver = server.services[VirtDriver] availableHypervisors += hv - eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( - this@SimpleVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, - submittedVms, - runningVms, - finishedVms, - queuedVms, - unscheduledVms - )) + eventFlow.emit( + VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + runningVms, + finishedVms, + queuedVms, + unscheduledVms + ) + ) hv.driver.events .onEach { event -> @@ -335,7 +342,7 @@ class SimpleVirtProvisioningService( hv.numberOfActiveServers = event.numberOfActiveServers hv.availableMemory = event.availableMemory } - }.launchIn(this) + }.launchIn(coroutineScope) requestCycle() } diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt index 1c7b751c..417db77d 100644 --- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt +++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt @@ -24,10 +24,10 @@ package com.atlarge.opendc.compute.core.image -import java.util.UUID import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows +import java.util.UUID /** * Test suite for [FlopsApplicationImage] diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt index af9d3421..80c9c547 100644 --- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt +++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt @@ -31,8 +31,6 @@ import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.ServerEvent import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.image.FlopsApplicationImage -import java.util.ServiceLoader -import java.util.UUID import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach @@ -41,6 +39,8 @@ import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withContext import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test +import java.util.ServiceLoader +import java.util.UUID internal class SimpleBareMetalDriverTest { /** @@ -56,7 +56,7 @@ internal class SimpleBareMetalDriverTest { val dom = root.newDomain(name = "driver") val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4) val cpus = List(4) { ProcessingUnit(cpuNode, it, 2400.0) } - val driver = SimpleBareMetalDriver(dom, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) + val driver = SimpleBareMetalDriver(dom, simulationContext.clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1_000, 2) // Batch driver commands diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt index ed2256c0..37cd5898 100644 --- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt +++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt @@ -25,17 +25,18 @@ package com.atlarge.opendc.compute.metal.service import com.atlarge.odcsim.SimulationEngineProvider +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.image.FlopsApplicationImage import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver -import java.util.ServiceLoader -import java.util.UUID import kotlinx.coroutines.delay import kotlinx.coroutines.flow.collect import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.Test +import java.util.ServiceLoader +import java.util.UUID /** * Test suite for the [SimpleProvisioningService]. @@ -50,14 +51,15 @@ internal class SimpleProvisioningServiceTest { val system = provider("sim") val root = system.newDomain(name = "root") root.launch { + val clock = simulationContext.clock val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1000, 2) val dom = root.newDomain("provisioner") val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4) val cpus = List(4) { ProcessingUnit(cpuNode, it, 2400.0) } - val driver = SimpleBareMetalDriver(dom.newDomain(), UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) + val driver = SimpleBareMetalDriver(dom.newDomain(), clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) - val provisioner = SimpleProvisioningService(dom) + val provisioner = SimpleProvisioningService() provisioner.create(driver) delay(5) val nodes = provisioner.nodes() diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt index 622b185e..528434b1 100644 --- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt +++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt @@ -25,6 +25,7 @@ package com.atlarge.opendc.compute.virt import com.atlarge.odcsim.SimulationEngineProvider +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.ProcessingUnit @@ -33,8 +34,6 @@ import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment import com.atlarge.opendc.compute.core.image.VmImage import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver import com.atlarge.opendc.compute.virt.driver.VirtDriver -import java.util.ServiceLoader -import java.util.UUID import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.delay import kotlinx.coroutines.flow.launchIn @@ -45,6 +44,8 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Disabled import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll +import java.util.ServiceLoader +import java.util.UUID /** * Basic test-suite for the hypervisor. @@ -62,6 +63,7 @@ internal class HypervisorTest { val root = system.newDomain("root") root.launch { + val clock = simulationContext.clock val vmm = HypervisorImage val workloadA = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1_000, 1) val workloadB = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 2_000, 1) @@ -70,7 +72,7 @@ internal class HypervisorTest { val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 1) val cpus = List(1) { ProcessingUnit(cpuNode, it, 2000.0) } - val metalDriver = SimpleBareMetalDriver(driverDom, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) + val metalDriver = SimpleBareMetalDriver(driverDom, clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) metalDriver.init() metalDriver.setImage(vmm) @@ -108,26 +110,41 @@ internal class HypervisorTest { var overcommissionedBurst = 0L root.launch { + val clock = simulationContext.clock val vmm = HypervisorImage val duration = 5 * 60L - val vmImageA = VmImage(UUID.randomUUID(), "<unnamed>", emptyMap(), sequenceOf( - FlopsHistoryFragment(0, 28L * duration, duration * 1000, 28.0, 2), - FlopsHistoryFragment(0, 3500L * duration, duration * 1000, 3500.0, 2), - FlopsHistoryFragment(0, 0, duration * 1000, 0.0, 2), - FlopsHistoryFragment(0, 183L * duration, duration * 1000, 183.0, 2) - ), 2, 0) - val vmImageB = VmImage(UUID.randomUUID(), "<unnamed>", emptyMap(), sequenceOf( - FlopsHistoryFragment(0, 28L * duration, duration * 1000, 28.0, 2), - FlopsHistoryFragment(0, 3100L * duration, duration * 1000, 3100.0, 2), - FlopsHistoryFragment(0, 0, duration * 1000, 0.0, 2), - FlopsHistoryFragment(0, 73L * duration, duration * 1000, 73.0, 2) - ), 2, 0) + val vmImageA = VmImage( + UUID.randomUUID(), + "<unnamed>", + emptyMap(), + sequenceOf( + FlopsHistoryFragment(0, 28L * duration, duration * 1000, 28.0, 2), + FlopsHistoryFragment(0, 3500L * duration, duration * 1000, 3500.0, 2), + FlopsHistoryFragment(0, 0, duration * 1000, 0.0, 2), + FlopsHistoryFragment(0, 183L * duration, duration * 1000, 183.0, 2) + ), + 2, + 0 + ) + val vmImageB = VmImage( + UUID.randomUUID(), + "<unnamed>", + emptyMap(), + sequenceOf( + FlopsHistoryFragment(0, 28L * duration, duration * 1000, 28.0, 2), + FlopsHistoryFragment(0, 3100L * duration, duration * 1000, 3100.0, 2), + FlopsHistoryFragment(0, 0, duration * 1000, 0.0, 2), + FlopsHistoryFragment(0, 73L * duration, duration * 1000, 73.0, 2) + ), + 2, + 0 + ) val driverDom = root.newDomain("driver") val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) val cpus = List(2) { ProcessingUnit(cpuNode, it, 3200.0) } - val metalDriver = SimpleBareMetalDriver(driverDom, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) + val metalDriver = SimpleBareMetalDriver(driverDom, clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) metalDriver.init() metalDriver.setImage(vmm) diff --git a/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt b/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt index f77a581e..87d6b7bd 100644 --- a/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt +++ b/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt @@ -24,23 +24,20 @@ package com.atlarge.opendc.core.failure -import com.atlarge.odcsim.Domain -import com.atlarge.odcsim.simulationContext +import kotlinx.coroutines.* +import java.time.Clock import kotlin.math.exp import kotlin.math.max import kotlin.random.Random import kotlin.random.asJavaRandom -import kotlinx.coroutines.Job -import kotlinx.coroutines.delay -import kotlinx.coroutines.ensureActive -import kotlinx.coroutines.launch /** * A [FaultInjector] that injects fault in the system which are correlated to each other. Failures do not occur in * isolation, but will trigger other faults. */ public class CorrelatedFaultInjector( - private val domain: Domain, + private val coroutineScope: CoroutineScope, + private val clock: Clock, private val iatScale: Double, private val iatShape: Double, private val sizeScale: Double, @@ -72,7 +69,7 @@ public class CorrelatedFaultInjector( // Clean up the domain if it finishes domain.scope.coroutineContext[Job]!!.invokeOnCompletion { - this@CorrelatedFaultInjector.domain.launch { + this@CorrelatedFaultInjector.coroutineScope.launch { active -= domain if (active.isEmpty()) { @@ -86,7 +83,7 @@ public class CorrelatedFaultInjector( return } - job = this.domain.launch { + job = this.coroutineScope.launch { while (active.isNotEmpty()) { ensureActive() @@ -94,7 +91,7 @@ public class CorrelatedFaultInjector( val d = lognvariate(iatScale, iatShape) * 3.6e6 // Handle long overflow - if (simulationContext.clock.millis() + d <= 0) { + if (clock.millis() + d <= 0) { return@launch } @@ -111,7 +108,7 @@ public class CorrelatedFaultInjector( val df = max(lognvariate(dScale, dShape) * 6e4, 15 * 6e4) // Handle long overflow - if (simulationContext.clock.millis() + df <= 0) { + if (clock.millis() + df <= 0) { return@launch } diff --git a/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt b/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt index 0f62667f..1b896858 100644 --- a/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt +++ b/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt @@ -25,11 +25,11 @@ package com.atlarge.opendc.core.failure import com.atlarge.odcsim.simulationContext +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch import kotlin.math.ln1p import kotlin.math.pow import kotlin.random.Random -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch /** * A [FaultInjector] that injects uncorrelated faults into the system, meaning that failures of the subsystems are diff --git a/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt b/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt index 7659b18e..c7577824 100644 --- a/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt +++ b/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt @@ -38,9 +38,6 @@ import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelec import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy -import java.io.File -import java.util.ServiceLoader -import kotlin.math.max import kotlinx.coroutines.async import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay @@ -48,6 +45,9 @@ import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking +import java.io.File +import java.util.ServiceLoader +import kotlin.math.max /** * Main entry point of the experiment. @@ -68,10 +68,11 @@ fun main(args: Array<String>) { val schedulerDomain = system.newDomain(name = "scheduler") val schedulerAsync = schedulerDomain.async { val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/env/setup-test.json")) - .use { it.construct(system.newDomain("topology")) } + .use { it.construct(schedulerDomain) } StageWorkflowService( schedulerDomain, + simulationContext.clock, environment.platforms[0].zones[0].services[ProvisioningService], mode = WorkflowSchedulerMode.Batch(100), jobAdmissionPolicy = NullJobAdmissionPolicy, diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt index ec721ff0..cd85351e 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt @@ -41,9 +41,9 @@ import com.github.ajalt.clikt.parameters.options.required import com.github.ajalt.clikt.parameters.types.choice import com.github.ajalt.clikt.parameters.types.file import com.github.ajalt.clikt.parameters.types.int +import mu.KotlinLogging import java.io.File import java.io.InputStream -import mu.KotlinLogging /** * The logger for this experiment. diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt index b09c0dbb..3765f307 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt @@ -25,6 +25,7 @@ package com.atlarge.opendc.experiments.sc20.experiment import com.atlarge.odcsim.Domain +import com.atlarge.odcsim.SimulationContext import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.ServerEvent @@ -45,10 +46,6 @@ import com.atlarge.opendc.experiments.sc20.experiment.monitor.ExperimentMonitor import com.atlarge.opendc.experiments.sc20.trace.Sc20StreamingParquetTraceReader import com.atlarge.opendc.format.environment.EnvironmentReader import com.atlarge.opendc.format.trace.TraceReader -import java.io.File -import kotlin.math.ln -import kotlin.math.max -import kotlin.random.Random import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay @@ -59,6 +56,10 @@ import kotlinx.coroutines.flow.takeWhile import kotlinx.coroutines.launch import kotlinx.coroutines.withContext import mu.KotlinLogging +import java.io.File +import kotlin.math.ln +import kotlin.math.max +import kotlin.random.Random /** * The logger for this experiment. @@ -85,7 +86,7 @@ suspend fun createFailureDomain( val injector = injectors.getOrPut(cluster) { createFaultInjector( - simulationContext.domain, + simulationContext, random, failureInterval ) @@ -99,11 +100,12 @@ suspend fun createFailureDomain( /** * Obtain the [FaultInjector] to use for the experiments. */ -fun createFaultInjector(domain: Domain, random: Random, failureInterval: Double): FaultInjector { +fun createFaultInjector(simulationContext: SimulationContext, random: Random, failureInterval: Double): FaultInjector { // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 // GRID'5000 return CorrelatedFaultInjector( - domain, + simulationContext.domain, + simulationContext.clock, iatScale = ln(failureInterval), iatShape = 1.03, // Hours sizeScale = ln(2.0), sizeShape = ln(1.0), // Expect 2 machines, with variation of 1 dScale = ln(60.0), dShape = ln(60.0 * 8), // Minutes @@ -137,7 +139,7 @@ suspend fun createProvisioner( // Wait for the bare metal nodes to be spawned delay(10) - val scheduler = SimpleVirtProvisioningService(allocationPolicy, simulationContext, bareMetalProvisioner) + val scheduler = SimpleVirtProvisioningService(simulationContext.domain, simulationContext.clock, bareMetalProvisioner, allocationPolicy) // Wait for the hypervisors to be spawned delay(10) @@ -219,7 +221,8 @@ suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtP domain.launch { chan.send(Unit) val server = scheduler.deploy( - workload.image.name, workload.image, + workload.image.name, + workload.image, Flavor(workload.image.maxCores, workload.image.requiredMemory) ) // Monitor server events diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt index 1580e4dd..7b42b095 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt @@ -38,13 +38,13 @@ import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionC import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader -import java.io.File -import java.util.ServiceLoader -import kotlin.random.Random import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.launch import mu.KotlinLogging +import java.io.File +import java.util.ServiceLoader +import kotlin.random.Random /** * The logger for the experiment scenario. diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt index b931fef9..a06317cb 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt @@ -31,8 +31,8 @@ import com.atlarge.opendc.experiments.sc20.telemetry.HostEvent import com.atlarge.opendc.experiments.sc20.telemetry.ProvisionerEvent import com.atlarge.opendc.experiments.sc20.telemetry.parquet.ParquetHostEventWriter import com.atlarge.opendc.experiments.sc20.telemetry.parquet.ParquetProvisionerEventWriter -import java.io.File import mu.KotlinLogging +import java.io.File /** * The logger instance to use. diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt index a8ee59a8..ddd64560 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt @@ -25,12 +25,12 @@ package com.atlarge.opendc.experiments.sc20.runner.execution import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor -import java.util.concurrent.Executors import kotlinx.coroutines.asCoroutineDispatcher import kotlinx.coroutines.launch import kotlinx.coroutines.supervisorScope import kotlinx.coroutines.sync.Semaphore import kotlinx.coroutines.withContext +import java.util.concurrent.Executors /** * An [ExperimentScheduler] that runs experiments using a local thread pool. diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt index 28a19172..3b80276f 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt @@ -30,8 +30,8 @@ import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionC import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionListener import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionResult import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentScheduler -import java.util.concurrent.ConcurrentHashMap import kotlinx.coroutines.runBlocking +import java.util.concurrent.ConcurrentHashMap /** * The default implementation of the [ExperimentRunner] interface. diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt index afa21f93..0a310027 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt @@ -25,17 +25,17 @@ package com.atlarge.opendc.experiments.sc20.telemetry.parquet import com.atlarge.opendc.experiments.sc20.telemetry.Event -import java.io.Closeable -import java.io.File -import java.util.concurrent.ArrayBlockingQueue -import java.util.concurrent.BlockingQueue -import kotlin.concurrent.thread import mu.KotlinLogging import org.apache.avro.Schema import org.apache.avro.generic.GenericData import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName +import java.io.Closeable +import java.io.File +import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.BlockingQueue +import kotlin.concurrent.thread /** * The logging instance to use. diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt index 9fa4e0fb..3bc09435 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt @@ -25,10 +25,10 @@ package com.atlarge.opendc.experiments.sc20.telemetry.parquet import com.atlarge.opendc.experiments.sc20.telemetry.HostEvent -import java.io.File import org.apache.avro.Schema import org.apache.avro.SchemaBuilder import org.apache.avro.generic.GenericData +import java.io.File /** * A Parquet event writer for [HostEvent]s. diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt index 3d28860c..1f3b0472 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt @@ -25,10 +25,10 @@ package com.atlarge.opendc.experiments.sc20.telemetry.parquet import com.atlarge.opendc.experiments.sc20.telemetry.ProvisionerEvent -import java.io.File import org.apache.avro.Schema import org.apache.avro.SchemaBuilder import org.apache.avro.generic.GenericData +import java.io.File /** * A Parquet event writer for [ProvisionerEvent]s. diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt index 043e4670..98afe3b8 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt @@ -25,10 +25,10 @@ package com.atlarge.opendc.experiments.sc20.telemetry.parquet import com.atlarge.opendc.experiments.sc20.telemetry.RunEvent -import java.io.File import org.apache.avro.Schema import org.apache.avro.SchemaBuilder import org.apache.avro.generic.GenericData +import java.io.File /** * A Parquet event writer for [RunEvent]s. diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt index f9709b9f..f1c1dc25 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt @@ -30,12 +30,12 @@ import com.atlarge.opendc.compute.core.workload.VmWorkload import com.atlarge.opendc.core.User import com.atlarge.opendc.format.trace.TraceEntry import com.atlarge.opendc.format.trace.TraceReader -import java.io.File -import java.util.UUID import mu.KotlinLogging import org.apache.avro.generic.GenericData import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetReader +import java.io.File +import java.util.UUID private val logger = KotlinLogging.logger {} @@ -113,7 +113,8 @@ class Sc20RawParquetTraceReader(private val path: File) { val vmFragments = fragments.getValue(id).asSequence() val totalLoad = vmFragments.sumByDouble { it.usage } * 5 * 60 // avg MHz * duration = MFLOPs val vmWorkload = VmWorkload( - uid, id, + uid, + id, UnnamedUser, VmImage( uid, diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt index 8b7b222f..9fa33c3f 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt @@ -32,14 +32,6 @@ import com.atlarge.opendc.compute.core.workload.VmWorkload import com.atlarge.opendc.core.User import com.atlarge.opendc.format.trace.TraceEntry import com.atlarge.opendc.format.trace.TraceReader -import java.io.File -import java.io.Serializable -import java.util.SortedSet -import java.util.TreeSet -import java.util.UUID -import java.util.concurrent.ArrayBlockingQueue -import kotlin.concurrent.thread -import kotlin.random.Random import mu.KotlinLogging import org.apache.avro.generic.GenericData import org.apache.hadoop.fs.Path @@ -49,6 +41,14 @@ import org.apache.parquet.filter2.predicate.FilterApi import org.apache.parquet.filter2.predicate.Statistics import org.apache.parquet.filter2.predicate.UserDefinedPredicate import org.apache.parquet.io.api.Binary +import java.io.File +import java.io.Serializable +import java.util.SortedSet +import java.util.TreeSet +import java.util.UUID +import java.util.concurrent.ArrayBlockingQueue +import kotlin.concurrent.thread +import kotlin.random.Random private val logger = KotlinLogging.logger {} @@ -82,11 +82,14 @@ class Sc20StreamingParquetTraceReader( if (selectedVms.isEmpty()) null else - FilterCompat.get(FilterApi.userDefined(FilterApi.binaryColumn("id"), - SelectedVmFilter( - TreeSet(selectedVms) + FilterCompat.get( + FilterApi.userDefined( + FilterApi.binaryColumn("id"), + SelectedVmFilter( + TreeSet(selectedVms) + ) ) - )) + ) /** * A poisonous fragment. @@ -235,7 +238,8 @@ class Sc20StreamingParquetTraceReader( Random(random.nextInt()) ) val vmWorkload = VmWorkload( - uid, "VM Workload $id", + uid, + "VM Workload $id", UnnamedUser, VmImage( uid, diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt index 56ddbb6d..a2ce3109 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt @@ -37,12 +37,6 @@ import com.github.ajalt.clikt.parameters.options.required import com.github.ajalt.clikt.parameters.options.split import com.github.ajalt.clikt.parameters.types.file import com.github.ajalt.clikt.parameters.types.long -import java.io.BufferedReader -import java.io.File -import java.io.FileReader -import java.util.Random -import kotlin.math.max -import kotlin.math.min import me.tongfei.progressbar.ProgressBar import org.apache.avro.Schema import org.apache.avro.SchemaBuilder @@ -51,6 +45,12 @@ import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName +import java.io.BufferedReader +import java.io.File +import java.io.FileReader +import java.util.Random +import kotlin.math.max +import kotlin.math.min /** * Represents the command for converting traces diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt index a46bb3e6..3a2ed4b7 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt @@ -30,9 +30,9 @@ import com.atlarge.opendc.experiments.sc20.experiment.model.CompositeWorkload import com.atlarge.opendc.experiments.sc20.experiment.model.SamplingStrategy import com.atlarge.opendc.experiments.sc20.experiment.model.Workload import com.atlarge.opendc.format.trace.TraceEntry +import mu.KotlinLogging import java.util.* import kotlin.random.Random -import mu.KotlinLogging private val logger = KotlinLogging.logger {} diff --git a/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt b/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt index a79e9a5a..5ecf7605 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt @@ -42,8 +42,6 @@ import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader import com.atlarge.opendc.format.environment.EnvironmentReader import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader import com.atlarge.opendc.format.trace.TraceReader -import java.io.File -import java.util.ServiceLoader import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.launch @@ -54,6 +52,8 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll +import java.io.File +import java.util.ServiceLoader /** * An integration test suite for the SC20 experiments. diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt index 1c3f70e6..4c4dcf37 100644 --- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt +++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt @@ -24,8 +24,8 @@ package com.atlarge.opendc.format.environment -import com.atlarge.odcsim.Domain import com.atlarge.opendc.core.Environment +import kotlinx.coroutines.CoroutineScope import java.io.Closeable /** @@ -33,7 +33,7 @@ import java.io.Closeable */ interface EnvironmentReader : Closeable { /** - * Construct an [Environment] in the specified domain. + * Construct an [Environment] in the specified [CoroutineScope]. */ - suspend fun construct(dom: Domain): Environment + suspend fun construct(coroutineScope: CoroutineScope): Environment } diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt index a9aa3337..2b608aef 100644 --- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt +++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt @@ -24,7 +24,7 @@ package com.atlarge.opendc.format.environment.sc18 -import com.atlarge.odcsim.Domain +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.MemoryUnit import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.ProcessingUnit @@ -39,6 +39,7 @@ import com.atlarge.opendc.format.environment.EnvironmentReader import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue +import kotlinx.coroutines.CoroutineScope import java.io.InputStream import java.util.UUID @@ -55,8 +56,8 @@ class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb */ private val setup: Setup = mapper.readValue(input) - override suspend fun construct(dom: Domain): Environment { - val provisioningDomain = dom.newDomain("provisioner") + override suspend fun construct(coroutineScope: CoroutineScope): Environment { + val clock = simulationContext.clock var counter = 0 val nodes = setup.rooms.flatMap { room -> @@ -78,7 +79,8 @@ class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb } } SimpleBareMetalDriver( - dom.newDomain("node-$counter"), + coroutineScope, + clock, UUID.randomUUID(), "node-${counter++}", emptyMap(), @@ -91,14 +93,16 @@ class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb } } - val provisioningService = SimpleProvisioningService(provisioningDomain) + val provisioningService = SimpleProvisioningService() for (node in nodes) { provisioningService.create(node) } val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService) val platform = Platform( - UUID.randomUUID(), "sc18-platform", listOf( + UUID.randomUUID(), + "sc18-platform", + listOf( Zone(UUID.randomUUID(), "zone", serviceRegistry) ) ) diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt index e34ee2dc..49118675 100644 --- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt +++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt @@ -24,7 +24,7 @@ package com.atlarge.opendc.format.environment.sc20 -import com.atlarge.odcsim.Domain +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.MemoryUnit import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.ProcessingUnit @@ -38,6 +38,7 @@ import com.atlarge.opendc.core.Platform import com.atlarge.opendc.core.Zone import com.atlarge.opendc.core.services.ServiceRegistry import com.atlarge.opendc.format.environment.EnvironmentReader +import kotlinx.coroutines.CoroutineScope import java.io.File import java.io.FileInputStream import java.io.InputStream @@ -56,7 +57,9 @@ class Sc20ClusterEnvironmentReader( constructor(file: File) : this(FileInputStream(file)) @Suppress("BlockingMethodInNonBlockingContext") - override suspend fun construct(dom: Domain): Environment { + override suspend fun construct(coroutineScope: CoroutineScope): Environment { + val clock = simulationContext.clock + var clusterIdCol = 0 var speedCol = 0 var numberOfHostsCol = 0 @@ -105,7 +108,8 @@ class Sc20ClusterEnvironmentReader( repeat(numberOfHosts) { nodes.add( SimpleBareMetalDriver( - dom.newDomain("node-$clusterId-$it"), + coroutineScope, + clock, UUID(random.nextLong(), random.nextLong()), "node-$clusterId-$it", mapOf(NODE_CLUSTER to clusterId), @@ -123,7 +127,7 @@ class Sc20ClusterEnvironmentReader( } } - val provisioningService = SimpleProvisioningService(dom.newDomain("provisioner")) + val provisioningService = SimpleProvisioningService() for (node in nodes) { provisioningService.create(node) } @@ -131,7 +135,9 @@ class Sc20ClusterEnvironmentReader( val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService) val platform = Platform( - UUID.randomUUID(), "sc20-platform", listOf( + UUID.randomUUID(), + "sc20-platform", + listOf( Zone(UUID.randomUUID(), "zone", serviceRegistry) ) ) diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt index 4b5d6fb7..f22f595f 100644 --- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt +++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt @@ -24,7 +24,7 @@ package com.atlarge.opendc.format.environment.sc20 -import com.atlarge.odcsim.Domain +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.MemoryUnit import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.ProcessingUnit @@ -40,6 +40,7 @@ import com.atlarge.opendc.format.environment.EnvironmentReader import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue +import kotlinx.coroutines.CoroutineScope import java.io.InputStream import java.util.UUID @@ -55,7 +56,8 @@ class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb */ private val setup: Setup = mapper.readValue(input) - override suspend fun construct(dom: Domain): Environment { + override suspend fun construct(coroutineScope: CoroutineScope): Environment { + val clock = simulationContext.clock var counter = 0 val nodes = setup.rooms.flatMap { room -> room.objects.flatMap { roomObject -> @@ -82,7 +84,8 @@ class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb } } SimpleBareMetalDriver( - dom.newDomain("node-$counter"), + coroutineScope, + clock, UUID.randomUUID(), "node-${counter++}", emptyMap(), @@ -99,7 +102,7 @@ class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb } } - val provisioningService = SimpleProvisioningService(dom.newDomain("provisioner")) + val provisioningService = SimpleProvisioningService() for (node in nodes) { provisioningService.create(node) } @@ -107,7 +110,9 @@ class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService) val platform = Platform( - UUID.randomUUID(), "sc20-platform", listOf( + UUID.randomUUID(), + "sc20-platform", + listOf( Zone(UUID.randomUUID(), "zone", serviceRegistry) ) ) diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt index 1cabc8bc..6ee43b6a 100644 --- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt +++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt @@ -129,7 +129,9 @@ class BitbrainsTraceReader( ) val vmWorkload = VmWorkload( - uuid, "VM Workload $vmId", UnnamedUser, + uuid, + "VM Workload $vmId", + UnnamedUser, VmImage( uuid, vmId.toString(), diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReader.kt index 3a4e2e89..6db3975e 100644 --- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReader.kt +++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReader.kt @@ -120,7 +120,8 @@ class GwfTraceReader(reader: BufferedReader) : TraceReader<Job> { } val workflow = entry.workload val task = Task( - UUID(0L, taskId), "<unnamed>", + UUID(0L, taskId), + "<unnamed>", FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), flops, cores), HashSet(), mapOf(WORKFLOW_TASK_DEADLINE to runtime) @@ -136,9 +137,11 @@ class GwfTraceReader(reader: BufferedReader) : TraceReader<Job> { // Fix dependencies and dependents for all tasks taskDependencies.forEach { (task, dependencies) -> - (task.dependencies as MutableSet<Task>).addAll(dependencies.map { taskId -> - tasks[taskId] ?: throw IllegalArgumentException("Dependency task with id $taskId not found") - }) + (task.dependencies as MutableSet<Task>).addAll( + dependencies.map { taskId -> + tasks[taskId] ?: throw IllegalArgumentException("Dependency task with id $taskId not found") + } + ) } // Create the entry iterator diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt index 8e34505a..28dc7793 100644 --- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt +++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt @@ -164,7 +164,9 @@ class Sc20TraceReader( Random(random.nextInt()) ) val vmWorkload = VmWorkload( - uuid, "VM Workload $vmId", UnnamedUser, + uuid, + "VM Workload $vmId", + UnnamedUser, VmImage( uuid, vmId, diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReader.kt index 2f6ce238..f7c74562 100644 --- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReader.kt +++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReader.kt @@ -115,7 +115,11 @@ class SwfTraceReader( for (tick in submitTime until (submitTime + waitTime - sliceDuration) step sliceDuration) { flopsHistory.add( FlopsHistoryFragment( - tick * 1000L, 0L, sliceDuration * 1000L, 0.0, cores + tick * 1000L, + 0L, + sliceDuration * 1000L, + 0.0, + cores ) ) slicedWaitTime += sliceDuration @@ -129,12 +133,18 @@ class SwfTraceReader( flopsPartialSlice = flopsPerSecond * runtimePartialSliceRemainder flopsFullSlice = flopsPerSecond * runTime - flopsPartialSlice - for (tick in (submitTime + slicedWaitTime) - until (submitTime + slicedWaitTime + runTime - sliceDuration) - step sliceDuration) { + for ( + tick in (submitTime + slicedWaitTime) + until (submitTime + slicedWaitTime + runTime - sliceDuration) + step sliceDuration + ) { flopsHistory.add( FlopsHistoryFragment( - tick * 1000L, flopsFullSlice / sliceDuration, sliceDuration * 1000L, 1.0, cores + tick * 1000L, + flopsFullSlice / sliceDuration, + sliceDuration * 1000L, + 1.0, + cores ) ) } @@ -153,7 +163,9 @@ class SwfTraceReader( val uuid = UUID(0L, jobNumber) val vmWorkload = VmWorkload( - uuid, "SWF Workload $jobNumber", UnnamedUser, + uuid, + "SWF Workload $jobNumber", + UnnamedUser, VmImage( uuid, jobNumber.toString(), diff --git a/simulator/opendc/opendc-format/src/test/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReaderTest.kt b/simulator/opendc/opendc-format/src/test/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReaderTest.kt index 94e4b0fc..41ad8aba 100644 --- a/simulator/opendc/opendc-format/src/test/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReaderTest.kt +++ b/simulator/opendc/opendc-format/src/test/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReaderTest.kt @@ -1,8 +1,8 @@ package com.atlarge.opendc.format.trace.swf -import java.io.File import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test +import java.io.File class SwfTraceReaderTest { @Test diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt index 807c119e..9cfe5531 100644 --- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt +++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt @@ -22,13 +22,13 @@ import com.mongodb.client.MongoClients import com.mongodb.client.MongoCollection import com.mongodb.client.MongoDatabase import com.mongodb.client.model.Filters -import java.io.File -import java.util.* -import kotlin.random.Random import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel import mu.KotlinLogging import org.bson.Document +import java.io.File +import java.util.* +import kotlin.random.Random private val logger = KotlinLogging.logger {} diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt index 39092653..c0b0ac31 100644 --- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt +++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt @@ -1,11 +1,11 @@ package com.atlarge.opendc.runner.web -import java.io.File import org.apache.spark.sql.Column import org.apache.spark.sql.Dataset import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.* +import java.io.File /** * A helper class for processing the experiment results using Apache Spark. @@ -175,13 +175,19 @@ class ResultProcessor(private val master: String, private val outputPath: File) val sliceLength = 5 * 60 * 1000 val states = map( - lit("ERROR"), lit(1), - lit("ACTIVE"), lit(0), - lit("SHUTOFF"), lit(0) + lit("ERROR"), + lit(1), + lit("ACTIVE"), + lit(0), + lit("SHUTOFF"), + lit(0) ) val oppositeStates = map( - lit("ERROR"), lit(0), - lit("ACTIVE"), lit(1), - lit("SHUTOFF"), lit(1) + lit("ERROR"), + lit(0), + lit("ACTIVE"), + lit(1), + lit("SHUTOFF"), + lit(1) ) } diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt index 40ffd282..6ec4995d 100644 --- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt +++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt @@ -3,8 +3,8 @@ package com.atlarge.opendc.runner.web import com.mongodb.client.MongoCollection import com.mongodb.client.model.Filters import com.mongodb.client.model.Updates -import java.time.Instant import org.bson.Document +import java.time.Instant /** * Manages the queue of scenarios that need to be processed. diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt index 499585ec..ab683985 100644 --- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt +++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt @@ -1,6 +1,6 @@ package com.atlarge.opendc.runner.web -import com.atlarge.odcsim.Domain +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.MemoryUnit import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.ProcessingUnit @@ -20,9 +20,10 @@ import com.mongodb.client.model.Aggregates import com.mongodb.client.model.Field import com.mongodb.client.model.Filters import com.mongodb.client.model.Projections -import java.util.* +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.launch import org.bson.Document +import java.util.* /** * A helper class that converts the MongoDB topology into an OpenDC environment. @@ -31,7 +32,8 @@ class TopologyParser(private val collection: MongoCollection<Document>, private /** * Parse the topology with the specified [id]. */ - override suspend fun construct(dom: Domain): Environment { + override suspend fun construct(coroutineScope: CoroutineScope): Environment { + val clock = simulationContext.clock val nodes = mutableListOf<SimpleBareMetalDriver>() val random = Random(0) @@ -59,7 +61,8 @@ class TopologyParser(private val collection: MongoCollection<Document>, private } nodes.add( SimpleBareMetalDriver( - dom.newDomain(machineId), + coroutineScope, + clock, UUID(random.nextLong(), random.nextLong()), "node-$clusterId-$position", mapOf(NODE_CLUSTER to clusterId), @@ -73,8 +76,8 @@ class TopologyParser(private val collection: MongoCollection<Document>, private ) } - val provisioningService = SimpleProvisioningService(dom.newDomain("provisioner")) - dom.launch { + val provisioningService = SimpleProvisioningService() + coroutineScope.launch { for (node in nodes) { provisioningService.create(node) } @@ -83,7 +86,9 @@ class TopologyParser(private val collection: MongoCollection<Document>, private val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService) val platform = Platform( - UUID.randomUUID(), "opendc-platform", listOf( + UUID.randomUUID(), + "opendc-platform", + listOf( Zone(UUID.randomUUID(), "zone", serviceRegistry) ) ) diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt index 1193f7b2..aea27972 100644 --- a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt @@ -24,9 +24,7 @@ package com.atlarge.opendc.workflows.service -import com.atlarge.odcsim.Domain import com.atlarge.odcsim.flow.EventFlow -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerEvent import com.atlarge.opendc.compute.core.ServerState @@ -39,22 +37,23 @@ import com.atlarge.opendc.workflows.service.stage.resource.ResourceSelectionPoli import com.atlarge.opendc.workflows.service.stage.task.TaskEligibilityPolicy import com.atlarge.opendc.workflows.service.stage.task.TaskOrderPolicy import com.atlarge.opendc.workflows.workload.Job -import java.util.PriorityQueue -import java.util.Queue import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch -import kotlinx.coroutines.withContext +import java.time.Clock +import java.util.PriorityQueue +import java.util.Queue /** * A [WorkflowService] that distributes work through a multi-stage process based on the Reference Architecture for * Topology Scheduling. */ class StageWorkflowService( - private val domain: Domain, + internal val coroutineScope: CoroutineScope, + internal val clock: Clock, private val provisioningService: ProvisioningService, mode: WorkflowSchedulerMode, jobAdmissionPolicy: JobAdmissionPolicy, @@ -63,7 +62,7 @@ class StageWorkflowService( taskOrderPolicy: TaskOrderPolicy, resourceFilterPolicy: ResourceFilterPolicy, resourceSelectionPolicy: ResourceSelectionPolicy -) : WorkflowService, CoroutineScope by domain { +) : WorkflowService { /** * The incoming jobs ready to be processed by the scheduler. @@ -175,7 +174,7 @@ class StageWorkflowService( private val eventFlow = EventFlow<WorkflowEvent>() init { - domain.launch { + coroutineScope.launch { nodes = provisioningService.nodes().toList() available.addAll(nodes) } @@ -191,9 +190,9 @@ class StageWorkflowService( override val events: Flow<WorkflowEvent> = eventFlow - override suspend fun submit(job: Job) = withContext(domain.coroutineContext) { + override suspend fun submit(job: Job) { // J1 Incoming Jobs - val jobInstance = JobState(job, simulationContext.clock.millis()) + val jobInstance = JobState(job, clock.millis()) val instances = job.tasks.associateWith { TaskState(jobInstance, it) } @@ -241,7 +240,7 @@ class StageWorkflowService( iterator.remove() jobQueue.add(jobInstance) activeJobs += jobInstance - eventFlow.emit(WorkflowEvent.JobStarted(this, jobInstance.job, simulationContext.clock.millis())) + eventFlow.emit(WorkflowEvent.JobStarted(this, jobInstance.job, clock.millis())) rootListener.jobStarted(jobInstance) } @@ -295,7 +294,7 @@ class StageWorkflowService( taskByServer[server] = instance server.events .onEach { event -> if (event is ServerEvent.StateChanged) stateChanged(event.server) } - .launchIn(this) + .launchIn(coroutineScope) activeTasks += instance taskQueue.poll() @@ -310,19 +309,19 @@ class StageWorkflowService( when (server.state) { ServerState.ACTIVE -> { val task = taskByServer.getValue(server) - task.startedAt = simulationContext.clock.millis() - eventFlow.emit(WorkflowEvent.TaskStarted(this@StageWorkflowService, task.job.job, task.task, simulationContext.clock.millis())) + task.startedAt = clock.millis() + eventFlow.emit(WorkflowEvent.TaskStarted(this@StageWorkflowService, task.job.job, task.task, clock.millis())) rootListener.taskStarted(task) } ServerState.SHUTOFF, ServerState.ERROR -> { val task = taskByServer.remove(server) ?: throw IllegalStateException() val job = task.job task.state = TaskStatus.FINISHED - task.finishedAt = simulationContext.clock.millis() + task.finishedAt = clock.millis() job.tasks.remove(task) available += task.host!! activeTasks -= task - eventFlow.emit(WorkflowEvent.TaskFinished(this@StageWorkflowService, task.job.job, task.task, simulationContext.clock.millis())) + eventFlow.emit(WorkflowEvent.TaskFinished(this@StageWorkflowService, task.job.job, task.task, clock.millis())) rootListener.taskFinished(task) // Add job roots to the scheduling queue @@ -347,7 +346,7 @@ class StageWorkflowService( private suspend fun finishJob(job: JobState) { activeJobs -= job - eventFlow.emit(WorkflowEvent.JobFinished(this, job.job, simulationContext.clock.millis())) + eventFlow.emit(WorkflowEvent.JobFinished(this, job.job, clock.millis())) rootListener.jobFinished(job) } diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt index 776f0b07..cb075b18 100644 --- a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt @@ -24,7 +24,6 @@ package com.atlarge.opendc.workflows.service -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.workflows.service.stage.StagePolicy import kotlinx.coroutines.delay import kotlinx.coroutines.launch @@ -66,13 +65,12 @@ sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Logic> { override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic { override suspend fun requestCycle() { - val ctx = simulationContext if (next == null) { // In batch mode, we assume that the scheduler runs at a fixed slot every time // quantum (e.g t=0, t=60, t=120). We calculate here the delay until the next scheduling slot. - val delay = quantum - (ctx.clock.millis() % quantum) + val delay = quantum - (scheduler.clock.millis() % quantum) - val job = ctx.domain.launch { + val job = scheduler.coroutineScope.launch { delay(delay) next = null scheduler.schedule() @@ -93,11 +91,10 @@ sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Logic> { override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic { override suspend fun requestCycle() { - val ctx = simulationContext if (next == null) { val delay = random.nextInt(200).toLong() - val job = ctx.domain.launch { + val job = scheduler.coroutineScope.launch { delay(delay) next = null scheduler.schedule() diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt index a60ba0e2..ad818dde 100644 --- a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt @@ -26,8 +26,8 @@ package com.atlarge.opendc.workflows.service import com.atlarge.opendc.core.services.AbstractServiceKey import com.atlarge.opendc.workflows.workload.Job -import java.util.UUID import kotlinx.coroutines.flow.Flow +import java.util.UUID /** * A service for cloud workflow management. diff --git a/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt b/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt index 5c129e37..655d8e1d 100644 --- a/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt +++ b/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt @@ -35,8 +35,6 @@ import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelec import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy -import java.util.ServiceLoader -import kotlin.math.max import kotlinx.coroutines.async import kotlinx.coroutines.delay import kotlinx.coroutines.flow.collect @@ -46,6 +44,8 @@ import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test +import java.util.ServiceLoader +import kotlin.math.max /** * Integration test suite for the [StageWorkflowService]. @@ -68,11 +68,13 @@ internal class StageWorkflowSchedulerIntegrationTest { val schedulerDomain = system.newDomain(name = "scheduler") val schedulerAsync = schedulerDomain.async { + val clock = simulationContext.clock val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json")) - .use { it.construct(system.newDomain("topology")) } + .use { it.construct(schedulerDomain) } StageWorkflowService( schedulerDomain, + clock, environment.platforms[0].zones[0].services[ProvisioningService], mode = WorkflowSchedulerMode.Batch(100), jobAdmissionPolicy = NullJobAdmissionPolicy, |
