summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationContext.kt2
-rw-r--r--simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/EventFlow.kt2
-rw-r--r--simulator/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt28
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt2
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt6
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt4
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt2
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt2
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt69
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt23
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt2
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt2
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt33
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt2
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt221
-rw-r--r--simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt2
-rw-r--r--simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt6
-rw-r--r--simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt10
-rw-r--r--simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt49
-rw-r--r--simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt19
-rw-r--r--simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt4
-rw-r--r--simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt9
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt2
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt21
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt6
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt2
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt2
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt2
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt10
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt2
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt2
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt2
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt7
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt30
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt12
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt2
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt4
-rw-r--r--simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt6
-rw-r--r--simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt16
-rw-r--r--simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt16
-rw-r--r--simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt15
-rw-r--r--simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt4
-rw-r--r--simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReader.kt11
-rw-r--r--simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt4
-rw-r--r--simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReader.kt24
-rw-r--r--simulator/opendc/opendc-format/src/test/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReaderTest.kt2
-rw-r--r--simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt6
-rw-r--r--simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt20
-rw-r--r--simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt2
-rw-r--r--simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt19
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt33
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt9
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt2
-rw-r--r--simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt8
54 files changed, 446 insertions, 356 deletions
diff --git a/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationContext.kt b/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationContext.kt
index c51d1d8b..3423d43a 100644
--- a/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationContext.kt
+++ b/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationContext.kt
@@ -24,10 +24,10 @@
package com.atlarge.odcsim
+import org.slf4j.Logger
import java.time.Clock
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
-import org.slf4j.Logger
/**
* Represents the execution context of a simulation domain.
diff --git a/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/EventFlow.kt b/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/EventFlow.kt
index 0e18f82f..5d9af9ec 100644
--- a/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/EventFlow.kt
+++ b/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/EventFlow.kt
@@ -24,7 +24,6 @@
package com.atlarge.odcsim.flow
-import java.util.WeakHashMap
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.InternalCoroutinesApi
@@ -33,6 +32,7 @@ import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.consumeAsFlow
+import java.util.WeakHashMap
/**
* A [Flow] that can be used to emit events.
diff --git a/simulator/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt b/simulator/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt
index e675b877..e4d0f4ac 100644
--- a/simulator/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt
+++ b/simulator/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt
@@ -28,13 +28,6 @@ import com.atlarge.odcsim.Domain
import com.atlarge.odcsim.SimulationContext
import com.atlarge.odcsim.SimulationEngine
import com.atlarge.odcsim.engine.omega.logging.LoggerImpl
-import java.time.Clock
-import java.time.Instant
-import java.time.ZoneId
-import java.util.PriorityQueue
-import java.util.UUID
-import kotlin.coroutines.CoroutineContext
-import kotlin.coroutines.coroutineContext
import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineExceptionHandler
@@ -48,6 +41,13 @@ import kotlinx.coroutines.Runnable
import kotlinx.coroutines.SupervisorJob
import org.jetbrains.annotations.Async
import org.slf4j.Logger
+import java.time.Clock
+import java.time.Instant
+import java.time.ZoneId
+import java.util.PriorityQueue
+import java.util.UUID
+import kotlin.coroutines.CoroutineContext
+import kotlin.coroutines.coroutineContext
/**
* The reference implementation of the [SimulationEngine] instance for the OpenDC simulation core.
@@ -71,12 +71,14 @@ public class OmegaSimulationEngine(override val name: String) : SimulationEngine
/**
* The event queue to process
*/
- private val queue: PriorityQueue<Event> = PriorityQueue(Comparator<Event> { lhs, rhs ->
- // Note that Comparator gives better performance than Comparable according to
- // profiling
- val cmp = lhs.time.compareTo(rhs.time)
- if (cmp == 0) lhs.id.compareTo(rhs.id) else cmp
- })
+ private val queue: PriorityQueue<Event> = PriorityQueue(
+ Comparator<Event> { lhs, rhs ->
+ // Note that Comparator gives better performance than Comparable according to
+// profiling
+ val cmp = lhs.time.compareTo(rhs.time)
+ if (cmp == 0) lhs.id.compareTo(rhs.id) else cmp
+ }
+ )
/**
* The active processes in the simulation engine.
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt
index fd0fc836..01968cd8 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt
@@ -28,8 +28,8 @@ import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.core.resource.Resource
import com.atlarge.opendc.core.resource.TagContainer
import com.atlarge.opendc.core.services.ServiceRegistry
-import java.util.UUID
import kotlinx.coroutines.flow.Flow
+import java.util.UUID
/**
* A server instance that is running on some physical or virtual machine.
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt
index f770fa49..817bee4b 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt
@@ -30,12 +30,18 @@ import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.core.services.ServiceKey
import kotlinx.coroutines.selects.SelectClause0
import kotlinx.coroutines.selects.select
+import java.time.Clock
/**
* Represents the execution context in which a bootable [Image] runs on a [Server].
*/
public interface ServerContext {
/**
+ * The virtual clock.
+ */
+ public val clock: Clock
+
+ /**
* The server on which the image runs.
*/
public val server: Server
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
index c615d865..0e1af093 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
@@ -1,6 +1,5 @@
package com.atlarge.opendc.compute.core.image
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.execution.ServerContext
import com.atlarge.opendc.core.resource.TagContainer
import java.util.UUID
@@ -16,8 +15,7 @@ class VmImage(
) : Image {
override suspend fun invoke(ctx: ServerContext) {
- val clock = simulationContext.clock
- var offset = clock.millis()
+ var offset = ctx.clock.millis()
val batch = flopsHistory.map { fragment ->
val cores = min(fragment.cores, ctx.server.flavor.cpuCount)
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt
index cb637aea..7cb4c0c5 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt
@@ -27,8 +27,8 @@ package com.atlarge.opendc.compute.metal
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.core.Identity
-import java.util.UUID
import kotlinx.coroutines.flow.Flow
+import java.util.UUID
/**
* A bare-metal compute node.
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt
index 17d8ee53..41cec291 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt
@@ -30,8 +30,8 @@ import com.atlarge.opendc.compute.metal.Node
import com.atlarge.opendc.core.failure.FailureDomain
import com.atlarge.opendc.core.power.Powerable
import com.atlarge.opendc.core.services.AbstractServiceKey
-import java.util.UUID
import kotlinx.coroutines.flow.Flow
+import java.util.UUID
/**
* A driver interface for the management interface of a bare-metal compute node.
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
index a453e459..c118cc3d 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
@@ -24,8 +24,6 @@
package com.atlarge.opendc.compute.metal.driver
-import com.atlarge.odcsim.Domain
-import com.atlarge.odcsim.SimulationContext
import com.atlarge.odcsim.flow.EventFlow
import com.atlarge.odcsim.flow.StateFlow
import com.atlarge.opendc.compute.core.Flavor
@@ -46,14 +44,6 @@ import com.atlarge.opendc.compute.metal.power.ConstantPowerModel
import com.atlarge.opendc.core.power.PowerModel
import com.atlarge.opendc.core.services.ServiceKey
import com.atlarge.opendc.core.services.ServiceRegistry
-import java.lang.Exception
-import java.time.Clock
-import java.util.UUID
-import kotlin.coroutines.ContinuationInterceptor
-import kotlin.math.ceil
-import kotlin.math.max
-import kotlin.math.min
-import kotlin.random.Random
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Delay
import kotlinx.coroutines.DisposableHandle
@@ -67,12 +57,20 @@ import kotlinx.coroutines.intrinsics.startCoroutineCancellable
import kotlinx.coroutines.launch
import kotlinx.coroutines.selects.SelectClause0
import kotlinx.coroutines.selects.SelectInstance
-import kotlinx.coroutines.withContext
+import java.lang.Exception
+import java.time.Clock
+import java.util.UUID
+import kotlin.coroutines.ContinuationInterceptor
+import kotlin.math.ceil
+import kotlin.math.max
+import kotlin.math.min
+import kotlin.random.Random
/**
* A basic implementation of the [BareMetalDriver] that simulates an [Image] running on a bare-metal machine.
*
- * @param domain The simulation domain the driver runs in.
+ * @param coroutineScope The [CoroutineScope] the driver runs in.
+ * @param clock The virtual clock to keep track of time.
* @param uid The unique identifier of the machine.
* @param name An optional name of the machine.
* @param metadata The initial metadata of the node.
@@ -82,7 +80,8 @@ import kotlinx.coroutines.withContext
*/
@OptIn(ExperimentalCoroutinesApi::class)
public class SimpleBareMetalDriver(
- private val domain: Domain,
+ private val coroutineScope: CoroutineScope,
+ private val clock: Clock,
uid: UUID,
name: String,
metadata: Map<String, Any>,
@@ -129,14 +128,14 @@ public class SimpleBareMetalDriver(
*/
private val random = Random(uid.leastSignificantBits xor uid.mostSignificantBits)
- override suspend fun init(): Node = withContext(domain.coroutineContext) {
- nodeState.value
+ override suspend fun init(): Node {
+ return nodeState.value
}
- override suspend fun start(): Node = withContext(domain.coroutineContext) {
+ override suspend fun start(): Node {
val node = nodeState.value
if (node.state != NodeState.SHUTOFF) {
- return@withContext node
+ return node
}
val events = EventFlow<ServerEvent>()
@@ -153,13 +152,13 @@ public class SimpleBareMetalDriver(
setNode(node.copy(state = NodeState.BOOT, server = server))
serverContext = BareMetalServerContext(events)
- return@withContext nodeState.value
+ return nodeState.value
}
- override suspend fun stop(): Node = withContext(domain.coroutineContext) {
+ override suspend fun stop(): Node {
val node = nodeState.value
if (node.state == NodeState.SHUTOFF) {
- return@withContext node
+ return node
}
// We terminate the image running on the machine
@@ -167,20 +166,20 @@ public class SimpleBareMetalDriver(
serverContext = null
setNode(node.copy(state = NodeState.SHUTOFF, server = null))
- return@withContext node
+ return node
}
- override suspend fun reboot(): Node = withContext(domain.coroutineContext) {
+ override suspend fun reboot(): Node {
stop()
- start()
+ return start()
}
- override suspend fun setImage(image: Image): Node = withContext(domain.coroutineContext) {
+ override suspend fun setImage(image: Image): Node {
setNode(nodeState.value.copy(image = image))
- return@withContext nodeState.value
+ return nodeState.value
}
- override suspend fun refresh(): Node = withContext(domain.coroutineContext) { nodeState.value }
+ override suspend fun refresh(): Node = nodeState.value
private fun setNode(value: Node) {
val field = nodeState.value
@@ -207,7 +206,10 @@ public class SimpleBareMetalDriver(
override val server: Server
get() = nodeState.value.server!!
- private val job = domain.launch {
+ override val clock: Clock
+ get() = this@SimpleBareMetalDriver.clock
+
+ private val job = coroutineScope.launch {
delay(1) // TODO Introduce boot time
init()
try {
@@ -265,18 +267,13 @@ public class SimpleBareMetalDriver(
private var usageFlush: DisposableHandle? = null
/**
- * Cache the [Clock] for timing.
- */
- private val clock = domain.coroutineContext[SimulationContext]!!.clock
-
- /**
* Cache the [Delay] instance for timing.
*
* XXX We need to cache this before the call to [onRun] since doing this in [onRun] is too heavy.
* XXX Note however that this is an ugly hack which may break in the future.
*/
@OptIn(InternalCoroutinesApi::class)
- private val delay = domain.coroutineContext[ContinuationInterceptor] as Delay
+ private val delay = coroutineScope.coroutineContext[ContinuationInterceptor] as Delay
@OptIn(InternalCoroutinesApi::class)
override fun onRun(
@@ -353,10 +350,10 @@ public class SimpleBareMetalDriver(
currentDisposable?.dispose()
// Schedule reset the usage of the machine since the call is returning
- usageFlush = delay.invokeOnTimeout(1, Runnable {
+ usageFlush = delay.invokeOnTimeout(1) {
usageState.value = 0.0
usageFlush = null
- })
+ }
}
select.disposeOnSelect(disposable)
@@ -458,7 +455,7 @@ public class SimpleBareMetalDriver(
}
override val scope: CoroutineScope
- get() = domain
+ get() = coroutineScope
override suspend fun fail() {
serverContext?.unavailable = true
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt
index f6b236ae..f64f9b5a 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt
@@ -24,44 +24,41 @@
package com.atlarge.opendc.compute.metal.service
-import com.atlarge.odcsim.Domain
import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.compute.metal.Node
import com.atlarge.opendc.compute.metal.driver.BareMetalDriver
import kotlinx.coroutines.CancellationException
-import kotlinx.coroutines.withContext
/**
* A very basic implementation of the [ProvisioningService].
*/
-public class SimpleProvisioningService(val domain: Domain) : ProvisioningService {
+public class SimpleProvisioningService : ProvisioningService {
/**
* The active nodes in this service.
*/
private val nodes: MutableMap<Node, BareMetalDriver> = mutableMapOf()
- override suspend fun create(driver: BareMetalDriver): Node = withContext(domain.coroutineContext) {
+ override suspend fun create(driver: BareMetalDriver): Node {
val node = driver.init()
nodes[node] = driver
- return@withContext node
+ return node
}
- override suspend fun nodes(): Set<Node> = withContext(domain.coroutineContext) { nodes.keys }
+ override suspend fun nodes(): Set<Node> = nodes.keys
- override suspend fun refresh(node: Node): Node = withContext(domain.coroutineContext) {
- return@withContext nodes[node]!!.refresh()
+ override suspend fun refresh(node: Node): Node {
+ return nodes[node]!!.refresh()
}
- override suspend fun deploy(node: Node, image: Image): Node = withContext(domain.coroutineContext) {
+ override suspend fun deploy(node: Node, image: Image): Node {
val driver = nodes[node]!!
driver.setImage(image)
- val newNode = driver.reboot()
- return@withContext newNode
+ return driver.reboot()
}
- override suspend fun stop(node: Node): Node = withContext(domain.coroutineContext) {
+ override suspend fun stop(node: Node): Node {
val driver = nodes[node]!!
- try {
+ return try {
driver.stop()
} catch (e: CancellationException) {
node
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt
index 1e7e351f..69b0124d 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt
@@ -25,8 +25,8 @@
package com.atlarge.opendc.compute.virt
import com.atlarge.opendc.core.Identity
-import java.util.UUID
import kotlinx.coroutines.flow.Flow
+import java.util.UUID
/**
* A hypervisor (or virtual machine monitor) is software or firmware that virtualizes the host compute environment
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt
index 607759a8..bd395f0d 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt
@@ -29,9 +29,9 @@ import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver
import com.atlarge.opendc.compute.virt.driver.VirtDriver
import com.atlarge.opendc.core.resource.TagContainer
-import java.util.UUID
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.suspendCancellableCoroutine
+import java.util.UUID
/**
* A hypervisor managing the VMs of a node.
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
index 192db413..df45f440 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
@@ -25,7 +25,6 @@
package com.atlarge.opendc.compute.virt.driver
import com.atlarge.odcsim.flow.EventFlow
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.Server
@@ -40,10 +39,6 @@ import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
import com.atlarge.opendc.compute.virt.HypervisorEvent
import com.atlarge.opendc.core.services.ServiceKey
import com.atlarge.opendc.core.services.ServiceRegistry
-import java.util.UUID
-import kotlin.math.ceil
-import kotlin.math.max
-import kotlin.math.min
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.DisposableHandle
@@ -59,6 +54,17 @@ import kotlinx.coroutines.launch
import kotlinx.coroutines.selects.SelectClause0
import kotlinx.coroutines.selects.SelectInstance
import kotlinx.coroutines.selects.select
+import mu.KotlinLogging
+import java.time.Clock
+import java.util.UUID
+import kotlin.math.ceil
+import kotlin.math.max
+import kotlin.math.min
+
+/**
+ * The logging instance to use.
+ */
+private val logger = KotlinLogging.logger {}
/**
* A [VirtDriver] that is backed by a simple hypervisor implementation.
@@ -97,7 +103,7 @@ class SimpleVirtDriver(
scheduler()
} catch (e: Exception) {
if (e !is CancellationException) {
- simulationContext.log.error("Hypervisor scheduler failed", e)
+ logger.error("Hypervisor scheduler failed", e)
}
throw e
}
@@ -117,8 +123,14 @@ class SimpleVirtDriver(
val events = EventFlow<ServerEvent>()
val server = Server(
- UUID.randomUUID(), name, emptyMap(), flavor, image, ServerState.BUILD,
- ServiceRegistry(), events
+ UUID.randomUUID(),
+ name,
+ emptyMap(),
+ flavor,
+ image,
+ ServerState.BUILD,
+ ServiceRegistry(),
+ events
)
availableMemory -= requiredMemory
vms.add(VmServerContext(server, events))
@@ -181,7 +193,7 @@ class SimpleVirtDriver(
* The scheduling process of the hypervisor.
*/
private suspend fun scheduler() {
- val clock = simulationContext.clock
+ val clock = hostContext.clock
val maxUsage = hostContext.cpus.sumByDouble { it.frequency }
val pCPUs = hostContext.cpus.indices.sortedBy { hostContext.cpus[it].frequency }
@@ -561,6 +573,9 @@ class SimpleVirtDriver(
override val cpus: List<ProcessingUnit> = hostContext.cpus.take(server.flavor.cpuCount)
+ override val clock: Clock
+ get() = hostContext.clock
+
init {
vm = Vm(this)
}
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt
index b1844f67..1002d382 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt
@@ -29,8 +29,8 @@ import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.compute.virt.HypervisorEvent
import com.atlarge.opendc.core.services.AbstractServiceKey
-import java.util.UUID
import kotlinx.coroutines.flow.Flow
+import java.util.UUID
/**
* A driver interface for a hypervisor running on some host server and communicating with the central compute service to
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
index 79388bc3..6b2cfc40 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
@@ -1,8 +1,6 @@
package com.atlarge.opendc.compute.virt.service
-import com.atlarge.odcsim.SimulationContext
import com.atlarge.odcsim.flow.EventFlow
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.ServerEvent
@@ -16,29 +14,25 @@ import com.atlarge.opendc.compute.virt.driver.InsufficientMemoryOnServerExceptio
import com.atlarge.opendc.compute.virt.driver.VirtDriver
import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy
import com.atlarge.opendc.core.services.ServiceKey
-import kotlin.coroutines.Continuation
-import kotlin.coroutines.resume
-import kotlin.math.max
-import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.ExperimentalCoroutinesApi
-import kotlinx.coroutines.Job
-import kotlinx.coroutines.delay
+import kotlinx.coroutines.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.suspendCancellableCoroutine
-import kotlinx.coroutines.withContext
import mu.KotlinLogging
+import java.time.Clock
+import kotlin.coroutines.Continuation
+import kotlin.coroutines.resume
+import kotlin.math.max
private val logger = KotlinLogging.logger {}
@OptIn(ExperimentalCoroutinesApi::class)
class SimpleVirtProvisioningService(
- public override val allocationPolicy: AllocationPolicy,
- private val ctx: SimulationContext,
- private val provisioningService: ProvisioningService
-) : VirtProvisioningService, CoroutineScope by ctx.domain {
+ private val coroutineScope: CoroutineScope,
+ private val clock: Clock,
+ private val provisioningService: ProvisioningService,
+ override val allocationPolicy: AllocationPolicy
+) : VirtProvisioningService {
/**
* The hypervisors that have been launched by the service.
*/
@@ -59,11 +53,11 @@ class SimpleVirtProvisioningService(
*/
private val activeImages: MutableSet<ImageView> = mutableSetOf()
- public var submittedVms = 0
- public var queuedVms = 0
- public var runningVms = 0
- public var finishedVms = 0
- public var unscheduledVms = 0
+ var submittedVms = 0
+ var queuedVms = 0
+ var runningVms = 0
+ var finishedVms = 0
+ var unscheduledVms = 0
private var maxCores = 0
private var maxMemory = 0L
@@ -81,7 +75,7 @@ class SimpleVirtProvisioningService(
override val events: Flow<VirtProvisioningEvent> = eventFlow
init {
- launch {
+ coroutineScope.launch {
val provisionedNodes = provisioningService.nodes()
provisionedNodes.forEach { node ->
val hypervisorImage = HypervisorImage
@@ -96,27 +90,29 @@ class SimpleVirtProvisioningService(
}
}
- override suspend fun drivers(): Set<VirtDriver> = withContext(coroutineContext) {
- availableHypervisors.map { it.driver }.toSet()
+ override suspend fun drivers(): Set<VirtDriver> {
+ return availableHypervisors.map { it.driver }.toSet()
}
override suspend fun deploy(
name: String,
image: Image,
flavor: Flavor
- ): Server = withContext(coroutineContext) {
- eventFlow.emit(VirtProvisioningEvent.MetricsAvailable(
- this@SimpleVirtProvisioningService,
- hypervisors.size,
- availableHypervisors.size,
- ++submittedVms,
- runningVms,
- finishedVms,
- ++queuedVms,
- unscheduledVms
- ))
-
- suspendCancellableCoroutine<Server> { cont ->
+ ): Server {
+ eventFlow.emit(
+ VirtProvisioningEvent.MetricsAvailable(
+ this@SimpleVirtProvisioningService,
+ hypervisors.size,
+ availableHypervisors.size,
+ ++submittedVms,
+ runningVms,
+ finishedVms,
+ ++queuedVms,
+ unscheduledVms
+ )
+ )
+
+ return suspendCancellableCoroutine<Server> { cont ->
val vmInstance = ImageView(name, image, flavor, cont)
incomingImages += vmInstance
requestCycle()
@@ -139,9 +135,9 @@ class SimpleVirtProvisioningService(
// We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120).
// This is important because the slices of the VMs need to be aligned.
// We calculate here the delay until the next scheduling slot.
- val delay = quantum - (ctx.clock.millis() % quantum)
+ val delay = quantum - (clock.millis() % quantum)
- val call = launch {
+ val call = coroutineScope.launch {
delay(delay)
this@SimpleVirtProvisioningService.call = null
schedule()
@@ -150,7 +146,6 @@ class SimpleVirtProvisioningService(
}
private suspend fun schedule() {
- val clock = simulationContext.clock
val imagesToBeScheduled = incomingImages.toSet()
for (imageInstance in imagesToBeScheduled) {
@@ -159,16 +154,18 @@ class SimpleVirtProvisioningService(
if (selectedHv == null) {
if (requiredMemory > maxMemory || imageInstance.flavor.cpuCount > maxCores) {
- eventFlow.emit(VirtProvisioningEvent.MetricsAvailable(
- this@SimpleVirtProvisioningService,
- hypervisors.size,
- availableHypervisors.size,
- submittedVms,
- runningVms,
- finishedVms,
- queuedVms,
- ++unscheduledVms
- ))
+ eventFlow.emit(
+ VirtProvisioningEvent.MetricsAvailable(
+ this@SimpleVirtProvisioningService,
+ hypervisors.size,
+ availableHypervisors.size,
+ submittedVms,
+ runningVms,
+ finishedVms,
+ queuedVms,
+ ++unscheduledVms
+ )
+ )
incomingImages -= imageInstance
@@ -180,7 +177,7 @@ class SimpleVirtProvisioningService(
}
try {
- logger.info { "[${ctx.clock.millis()}] Spawning ${imageInstance.image} on ${selectedHv.server.uid} ${selectedHv.server.name} ${selectedHv.server.flavor}" }
+ logger.info { "[${clock.millis()}] Spawning ${imageInstance.image} on ${selectedHv.server.uid} ${selectedHv.server.name} ${selectedHv.server.flavor}" }
incomingImages -= imageInstance
// Speculatively update the hypervisor view information to prevent other images in the queue from
@@ -197,16 +194,18 @@ class SimpleVirtProvisioningService(
imageInstance.server = server
imageInstance.continuation.resume(server)
- eventFlow.emit(VirtProvisioningEvent.MetricsAvailable(
- this@SimpleVirtProvisioningService,
- hypervisors.size,
- availableHypervisors.size,
- submittedVms,
- ++runningVms,
- finishedVms,
- --queuedVms,
- unscheduledVms
- ))
+ eventFlow.emit(
+ VirtProvisioningEvent.MetricsAvailable(
+ this@SimpleVirtProvisioningService,
+ hypervisors.size,
+ availableHypervisors.size,
+ submittedVms,
+ ++runningVms,
+ finishedVms,
+ --queuedVms,
+ unscheduledVms
+ )
+ )
activeImages += imageInstance
server.events
@@ -214,18 +213,20 @@ class SimpleVirtProvisioningService(
when (event) {
is ServerEvent.StateChanged -> {
if (event.server.state == ServerState.SHUTOFF) {
- logger.info { "[${ctx.clock.millis()}] Server ${event.server.uid} ${event.server.name} ${event.server.flavor} finished." }
-
- eventFlow.emit(VirtProvisioningEvent.MetricsAvailable(
- this@SimpleVirtProvisioningService,
- hypervisors.size,
- availableHypervisors.size,
- submittedVms,
- --runningVms,
- ++finishedVms,
- queuedVms,
- unscheduledVms
- ))
+ logger.info { "[${clock.millis()}] Server ${event.server.uid} ${event.server.name} ${event.server.flavor} finished." }
+
+ eventFlow.emit(
+ VirtProvisioningEvent.MetricsAvailable(
+ this@SimpleVirtProvisioningService,
+ hypervisors.size,
+ availableHypervisors.size,
+ submittedVms,
+ --runningVms,
+ ++finishedVms,
+ queuedVms,
+ unscheduledVms
+ )
+ )
activeImages -= imageInstance
selectedHv.provisionedCores -= server.flavor.cpuCount
@@ -238,7 +239,7 @@ class SimpleVirtProvisioningService(
}
}
}
- .launchIn(this)
+ .launchIn(coroutineScope)
} catch (e: InsufficientMemoryOnServerException) {
logger.error("Failed to deploy VM", e)
@@ -254,7 +255,7 @@ class SimpleVirtProvisioningService(
private fun stateChanged(server: Server) {
when (server.state) {
ServerState.ACTIVE -> {
- logger.debug { "[${ctx.clock.millis()}] Server ${server.uid} available: ${server.state}" }
+ logger.debug { "[${clock.millis()}] Server ${server.uid} available: ${server.state}" }
if (server in hypervisors) {
// Corner case for when the hypervisor already exists
@@ -272,16 +273,18 @@ class SimpleVirtProvisioningService(
hypervisors[server] = hv
}
- eventFlow.emit(VirtProvisioningEvent.MetricsAvailable(
- this@SimpleVirtProvisioningService,
- hypervisors.size,
- availableHypervisors.size,
- submittedVms,
- runningVms,
- finishedVms,
- queuedVms,
- unscheduledVms
- ))
+ eventFlow.emit(
+ VirtProvisioningEvent.MetricsAvailable(
+ this@SimpleVirtProvisioningService,
+ hypervisors.size,
+ availableHypervisors.size,
+ submittedVms,
+ runningVms,
+ finishedVms,
+ queuedVms,
+ unscheduledVms
+ )
+ )
// Re-schedule on the new machine
if (incomingImages.isNotEmpty()) {
@@ -289,20 +292,22 @@ class SimpleVirtProvisioningService(
}
}
ServerState.SHUTOFF, ServerState.ERROR -> {
- logger.debug { "[${ctx.clock.millis()}] Server ${server.uid} unavailable: ${server.state}" }
+ logger.debug { "[${clock.millis()}] Server ${server.uid} unavailable: ${server.state}" }
val hv = hypervisors[server] ?: return
availableHypervisors -= hv
- eventFlow.emit(VirtProvisioningEvent.MetricsAvailable(
- this@SimpleVirtProvisioningService,
- hypervisors.size,
- availableHypervisors.size,
- submittedVms,
- runningVms,
- finishedVms,
- queuedVms,
- unscheduledVms
- ))
+ eventFlow.emit(
+ VirtProvisioningEvent.MetricsAvailable(
+ this@SimpleVirtProvisioningService,
+ hypervisors.size,
+ availableHypervisors.size,
+ submittedVms,
+ runningVms,
+ finishedVms,
+ queuedVms,
+ unscheduledVms
+ )
+ )
if (incomingImages.isNotEmpty()) {
requestCycle()
@@ -318,16 +323,18 @@ class SimpleVirtProvisioningService(
hv.driver = server.services[VirtDriver]
availableHypervisors += hv
- eventFlow.emit(VirtProvisioningEvent.MetricsAvailable(
- this@SimpleVirtProvisioningService,
- hypervisors.size,
- availableHypervisors.size,
- submittedVms,
- runningVms,
- finishedVms,
- queuedVms,
- unscheduledVms
- ))
+ eventFlow.emit(
+ VirtProvisioningEvent.MetricsAvailable(
+ this@SimpleVirtProvisioningService,
+ hypervisors.size,
+ availableHypervisors.size,
+ submittedVms,
+ runningVms,
+ finishedVms,
+ queuedVms,
+ unscheduledVms
+ )
+ )
hv.driver.events
.onEach { event ->
@@ -335,7 +342,7 @@ class SimpleVirtProvisioningService(
hv.numberOfActiveServers = event.numberOfActiveServers
hv.availableMemory = event.availableMemory
}
- }.launchIn(this)
+ }.launchIn(coroutineScope)
requestCycle()
}
diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt
index 1c7b751c..417db77d 100644
--- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt
+++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt
@@ -24,10 +24,10 @@
package com.atlarge.opendc.compute.core.image
-import java.util.UUID
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
+import java.util.UUID
/**
* Test suite for [FlopsApplicationImage]
diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
index af9d3421..80c9c547 100644
--- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
+++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
@@ -31,8 +31,6 @@ import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.ServerEvent
import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.core.image.FlopsApplicationImage
-import java.util.ServiceLoader
-import java.util.UUID
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
@@ -41,6 +39,8 @@ import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
+import java.util.ServiceLoader
+import java.util.UUID
internal class SimpleBareMetalDriverTest {
/**
@@ -56,7 +56,7 @@ internal class SimpleBareMetalDriverTest {
val dom = root.newDomain(name = "driver")
val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4)
val cpus = List(4) { ProcessingUnit(cpuNode, it, 2400.0) }
- val driver = SimpleBareMetalDriver(dom, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
+ val driver = SimpleBareMetalDriver(dom, simulationContext.clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1_000, 2)
// Batch driver commands
diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt
index ed2256c0..37cd5898 100644
--- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt
+++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt
@@ -25,17 +25,18 @@
package com.atlarge.opendc.compute.metal.service
import com.atlarge.odcsim.SimulationEngineProvider
+import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.ProcessingNode
import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.image.FlopsApplicationImage
import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver
-import java.util.ServiceLoader
-import java.util.UUID
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Test
+import java.util.ServiceLoader
+import java.util.UUID
/**
* Test suite for the [SimpleProvisioningService].
@@ -50,14 +51,15 @@ internal class SimpleProvisioningServiceTest {
val system = provider("sim")
val root = system.newDomain(name = "root")
root.launch {
+ val clock = simulationContext.clock
val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1000, 2)
val dom = root.newDomain("provisioner")
val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4)
val cpus = List(4) { ProcessingUnit(cpuNode, it, 2400.0) }
- val driver = SimpleBareMetalDriver(dom.newDomain(), UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
+ val driver = SimpleBareMetalDriver(dom.newDomain(), clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
- val provisioner = SimpleProvisioningService(dom)
+ val provisioner = SimpleProvisioningService()
provisioner.create(driver)
delay(5)
val nodes = provisioner.nodes()
diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt
index 622b185e..528434b1 100644
--- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt
+++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt
@@ -25,6 +25,7 @@
package com.atlarge.opendc.compute.virt
import com.atlarge.odcsim.SimulationEngineProvider
+import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.ProcessingNode
import com.atlarge.opendc.compute.core.ProcessingUnit
@@ -33,8 +34,6 @@ import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment
import com.atlarge.opendc.compute.core.image.VmImage
import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver
import com.atlarge.opendc.compute.virt.driver.VirtDriver
-import java.util.ServiceLoader
-import java.util.UUID
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.launchIn
@@ -45,6 +44,8 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
+import java.util.ServiceLoader
+import java.util.UUID
/**
* Basic test-suite for the hypervisor.
@@ -62,6 +63,7 @@ internal class HypervisorTest {
val root = system.newDomain("root")
root.launch {
+ val clock = simulationContext.clock
val vmm = HypervisorImage
val workloadA = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1_000, 1)
val workloadB = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 2_000, 1)
@@ -70,7 +72,7 @@ internal class HypervisorTest {
val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 1)
val cpus = List(1) { ProcessingUnit(cpuNode, it, 2000.0) }
- val metalDriver = SimpleBareMetalDriver(driverDom, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
+ val metalDriver = SimpleBareMetalDriver(driverDom, clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
metalDriver.init()
metalDriver.setImage(vmm)
@@ -108,26 +110,41 @@ internal class HypervisorTest {
var overcommissionedBurst = 0L
root.launch {
+ val clock = simulationContext.clock
val vmm = HypervisorImage
val duration = 5 * 60L
- val vmImageA = VmImage(UUID.randomUUID(), "<unnamed>", emptyMap(), sequenceOf(
- FlopsHistoryFragment(0, 28L * duration, duration * 1000, 28.0, 2),
- FlopsHistoryFragment(0, 3500L * duration, duration * 1000, 3500.0, 2),
- FlopsHistoryFragment(0, 0, duration * 1000, 0.0, 2),
- FlopsHistoryFragment(0, 183L * duration, duration * 1000, 183.0, 2)
- ), 2, 0)
- val vmImageB = VmImage(UUID.randomUUID(), "<unnamed>", emptyMap(), sequenceOf(
- FlopsHistoryFragment(0, 28L * duration, duration * 1000, 28.0, 2),
- FlopsHistoryFragment(0, 3100L * duration, duration * 1000, 3100.0, 2),
- FlopsHistoryFragment(0, 0, duration * 1000, 0.0, 2),
- FlopsHistoryFragment(0, 73L * duration, duration * 1000, 73.0, 2)
- ), 2, 0)
+ val vmImageA = VmImage(
+ UUID.randomUUID(),
+ "<unnamed>",
+ emptyMap(),
+ sequenceOf(
+ FlopsHistoryFragment(0, 28L * duration, duration * 1000, 28.0, 2),
+ FlopsHistoryFragment(0, 3500L * duration, duration * 1000, 3500.0, 2),
+ FlopsHistoryFragment(0, 0, duration * 1000, 0.0, 2),
+ FlopsHistoryFragment(0, 183L * duration, duration * 1000, 183.0, 2)
+ ),
+ 2,
+ 0
+ )
+ val vmImageB = VmImage(
+ UUID.randomUUID(),
+ "<unnamed>",
+ emptyMap(),
+ sequenceOf(
+ FlopsHistoryFragment(0, 28L * duration, duration * 1000, 28.0, 2),
+ FlopsHistoryFragment(0, 3100L * duration, duration * 1000, 3100.0, 2),
+ FlopsHistoryFragment(0, 0, duration * 1000, 0.0, 2),
+ FlopsHistoryFragment(0, 73L * duration, duration * 1000, 73.0, 2)
+ ),
+ 2,
+ 0
+ )
val driverDom = root.newDomain("driver")
val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2)
val cpus = List(2) { ProcessingUnit(cpuNode, it, 3200.0) }
- val metalDriver = SimpleBareMetalDriver(driverDom, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
+ val metalDriver = SimpleBareMetalDriver(driverDom, clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
metalDriver.init()
metalDriver.setImage(vmm)
diff --git a/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt b/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt
index f77a581e..87d6b7bd 100644
--- a/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt
+++ b/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt
@@ -24,23 +24,20 @@
package com.atlarge.opendc.core.failure
-import com.atlarge.odcsim.Domain
-import com.atlarge.odcsim.simulationContext
+import kotlinx.coroutines.*
+import java.time.Clock
import kotlin.math.exp
import kotlin.math.max
import kotlin.random.Random
import kotlin.random.asJavaRandom
-import kotlinx.coroutines.Job
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.ensureActive
-import kotlinx.coroutines.launch
/**
* A [FaultInjector] that injects fault in the system which are correlated to each other. Failures do not occur in
* isolation, but will trigger other faults.
*/
public class CorrelatedFaultInjector(
- private val domain: Domain,
+ private val coroutineScope: CoroutineScope,
+ private val clock: Clock,
private val iatScale: Double,
private val iatShape: Double,
private val sizeScale: Double,
@@ -72,7 +69,7 @@ public class CorrelatedFaultInjector(
// Clean up the domain if it finishes
domain.scope.coroutineContext[Job]!!.invokeOnCompletion {
- this@CorrelatedFaultInjector.domain.launch {
+ this@CorrelatedFaultInjector.coroutineScope.launch {
active -= domain
if (active.isEmpty()) {
@@ -86,7 +83,7 @@ public class CorrelatedFaultInjector(
return
}
- job = this.domain.launch {
+ job = this.coroutineScope.launch {
while (active.isNotEmpty()) {
ensureActive()
@@ -94,7 +91,7 @@ public class CorrelatedFaultInjector(
val d = lognvariate(iatScale, iatShape) * 3.6e6
// Handle long overflow
- if (simulationContext.clock.millis() + d <= 0) {
+ if (clock.millis() + d <= 0) {
return@launch
}
@@ -111,7 +108,7 @@ public class CorrelatedFaultInjector(
val df = max(lognvariate(dScale, dShape) * 6e4, 15 * 6e4)
// Handle long overflow
- if (simulationContext.clock.millis() + df <= 0) {
+ if (clock.millis() + df <= 0) {
return@launch
}
diff --git a/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt b/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt
index 0f62667f..1b896858 100644
--- a/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt
+++ b/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt
@@ -25,11 +25,11 @@
package com.atlarge.opendc.core.failure
import com.atlarge.odcsim.simulationContext
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
import kotlin.math.ln1p
import kotlin.math.pow
import kotlin.random.Random
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.launch
/**
* A [FaultInjector] that injects uncorrelated faults into the system, meaning that failures of the subsystems are
diff --git a/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt b/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt
index 7659b18e..c7577824 100644
--- a/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt
+++ b/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt
@@ -38,9 +38,6 @@ import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelec
import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy
import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy
import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy
-import java.io.File
-import java.util.ServiceLoader
-import kotlin.math.max
import kotlinx.coroutines.async
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
@@ -48,6 +45,9 @@ import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
+import java.io.File
+import java.util.ServiceLoader
+import kotlin.math.max
/**
* Main entry point of the experiment.
@@ -68,10 +68,11 @@ fun main(args: Array<String>) {
val schedulerDomain = system.newDomain(name = "scheduler")
val schedulerAsync = schedulerDomain.async {
val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/env/setup-test.json"))
- .use { it.construct(system.newDomain("topology")) }
+ .use { it.construct(schedulerDomain) }
StageWorkflowService(
schedulerDomain,
+ simulationContext.clock,
environment.platforms[0].zones[0].services[ProvisioningService],
mode = WorkflowSchedulerMode.Batch(100),
jobAdmissionPolicy = NullJobAdmissionPolicy,
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt
index ec721ff0..cd85351e 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt
@@ -41,9 +41,9 @@ import com.github.ajalt.clikt.parameters.options.required
import com.github.ajalt.clikt.parameters.types.choice
import com.github.ajalt.clikt.parameters.types.file
import com.github.ajalt.clikt.parameters.types.int
+import mu.KotlinLogging
import java.io.File
import java.io.InputStream
-import mu.KotlinLogging
/**
* The logger for this experiment.
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
index b09c0dbb..3765f307 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
@@ -25,6 +25,7 @@
package com.atlarge.opendc.experiments.sc20.experiment
import com.atlarge.odcsim.Domain
+import com.atlarge.odcsim.SimulationContext
import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.ServerEvent
@@ -45,10 +46,6 @@ import com.atlarge.opendc.experiments.sc20.experiment.monitor.ExperimentMonitor
import com.atlarge.opendc.experiments.sc20.trace.Sc20StreamingParquetTraceReader
import com.atlarge.opendc.format.environment.EnvironmentReader
import com.atlarge.opendc.format.trace.TraceReader
-import java.io.File
-import kotlin.math.ln
-import kotlin.math.max
-import kotlin.random.Random
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
@@ -59,6 +56,10 @@ import kotlinx.coroutines.flow.takeWhile
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import mu.KotlinLogging
+import java.io.File
+import kotlin.math.ln
+import kotlin.math.max
+import kotlin.random.Random
/**
* The logger for this experiment.
@@ -85,7 +86,7 @@ suspend fun createFailureDomain(
val injector =
injectors.getOrPut(cluster) {
createFaultInjector(
- simulationContext.domain,
+ simulationContext,
random,
failureInterval
)
@@ -99,11 +100,12 @@ suspend fun createFailureDomain(
/**
* Obtain the [FaultInjector] to use for the experiments.
*/
-fun createFaultInjector(domain: Domain, random: Random, failureInterval: Double): FaultInjector {
+fun createFaultInjector(simulationContext: SimulationContext, random: Random, failureInterval: Double): FaultInjector {
// Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009
// GRID'5000
return CorrelatedFaultInjector(
- domain,
+ simulationContext.domain,
+ simulationContext.clock,
iatScale = ln(failureInterval), iatShape = 1.03, // Hours
sizeScale = ln(2.0), sizeShape = ln(1.0), // Expect 2 machines, with variation of 1
dScale = ln(60.0), dShape = ln(60.0 * 8), // Minutes
@@ -137,7 +139,7 @@ suspend fun createProvisioner(
// Wait for the bare metal nodes to be spawned
delay(10)
- val scheduler = SimpleVirtProvisioningService(allocationPolicy, simulationContext, bareMetalProvisioner)
+ val scheduler = SimpleVirtProvisioningService(simulationContext.domain, simulationContext.clock, bareMetalProvisioner, allocationPolicy)
// Wait for the hypervisors to be spawned
delay(10)
@@ -219,7 +221,8 @@ suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtP
domain.launch {
chan.send(Unit)
val server = scheduler.deploy(
- workload.image.name, workload.image,
+ workload.image.name,
+ workload.image,
Flavor(workload.image.maxCores, workload.image.requiredMemory)
)
// Monitor server events
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt
index 1580e4dd..7b42b095 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt
@@ -38,13 +38,13 @@ import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionC
import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader
import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader
import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
-import java.io.File
-import java.util.ServiceLoader
-import kotlin.random.Random
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import mu.KotlinLogging
+import java.io.File
+import java.util.ServiceLoader
+import kotlin.random.Random
/**
* The logger for the experiment scenario.
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt
index b931fef9..a06317cb 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt
@@ -31,8 +31,8 @@ import com.atlarge.opendc.experiments.sc20.telemetry.HostEvent
import com.atlarge.opendc.experiments.sc20.telemetry.ProvisionerEvent
import com.atlarge.opendc.experiments.sc20.telemetry.parquet.ParquetHostEventWriter
import com.atlarge.opendc.experiments.sc20.telemetry.parquet.ParquetProvisionerEventWriter
-import java.io.File
import mu.KotlinLogging
+import java.io.File
/**
* The logger instance to use.
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt
index a8ee59a8..ddd64560 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt
@@ -25,12 +25,12 @@
package com.atlarge.opendc.experiments.sc20.runner.execution
import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor
-import java.util.concurrent.Executors
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.launch
import kotlinx.coroutines.supervisorScope
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.withContext
+import java.util.concurrent.Executors
/**
* An [ExperimentScheduler] that runs experiments using a local thread pool.
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt
index 28a19172..3b80276f 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt
@@ -30,8 +30,8 @@ import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionC
import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionListener
import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionResult
import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentScheduler
-import java.util.concurrent.ConcurrentHashMap
import kotlinx.coroutines.runBlocking
+import java.util.concurrent.ConcurrentHashMap
/**
* The default implementation of the [ExperimentRunner] interface.
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt
index afa21f93..0a310027 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt
@@ -25,17 +25,17 @@
package com.atlarge.opendc.experiments.sc20.telemetry.parquet
import com.atlarge.opendc.experiments.sc20.telemetry.Event
-import java.io.Closeable
-import java.io.File
-import java.util.concurrent.ArrayBlockingQueue
-import java.util.concurrent.BlockingQueue
-import kotlin.concurrent.thread
import mu.KotlinLogging
import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
+import java.io.Closeable
+import java.io.File
+import java.util.concurrent.ArrayBlockingQueue
+import java.util.concurrent.BlockingQueue
+import kotlin.concurrent.thread
/**
* The logging instance to use.
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt
index 9fa4e0fb..3bc09435 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt
@@ -25,10 +25,10 @@
package com.atlarge.opendc.experiments.sc20.telemetry.parquet
import com.atlarge.opendc.experiments.sc20.telemetry.HostEvent
-import java.io.File
import org.apache.avro.Schema
import org.apache.avro.SchemaBuilder
import org.apache.avro.generic.GenericData
+import java.io.File
/**
* A Parquet event writer for [HostEvent]s.
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt
index 3d28860c..1f3b0472 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt
@@ -25,10 +25,10 @@
package com.atlarge.opendc.experiments.sc20.telemetry.parquet
import com.atlarge.opendc.experiments.sc20.telemetry.ProvisionerEvent
-import java.io.File
import org.apache.avro.Schema
import org.apache.avro.SchemaBuilder
import org.apache.avro.generic.GenericData
+import java.io.File
/**
* A Parquet event writer for [ProvisionerEvent]s.
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt
index 043e4670..98afe3b8 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt
@@ -25,10 +25,10 @@
package com.atlarge.opendc.experiments.sc20.telemetry.parquet
import com.atlarge.opendc.experiments.sc20.telemetry.RunEvent
-import java.io.File
import org.apache.avro.Schema
import org.apache.avro.SchemaBuilder
import org.apache.avro.generic.GenericData
+import java.io.File
/**
* A Parquet event writer for [RunEvent]s.
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt
index f9709b9f..f1c1dc25 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt
@@ -30,12 +30,12 @@ import com.atlarge.opendc.compute.core.workload.VmWorkload
import com.atlarge.opendc.core.User
import com.atlarge.opendc.format.trace.TraceEntry
import com.atlarge.opendc.format.trace.TraceReader
-import java.io.File
-import java.util.UUID
import mu.KotlinLogging
import org.apache.avro.generic.GenericData
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetReader
+import java.io.File
+import java.util.UUID
private val logger = KotlinLogging.logger {}
@@ -113,7 +113,8 @@ class Sc20RawParquetTraceReader(private val path: File) {
val vmFragments = fragments.getValue(id).asSequence()
val totalLoad = vmFragments.sumByDouble { it.usage } * 5 * 60 // avg MHz * duration = MFLOPs
val vmWorkload = VmWorkload(
- uid, id,
+ uid,
+ id,
UnnamedUser,
VmImage(
uid,
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt
index 8b7b222f..9fa33c3f 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt
@@ -32,14 +32,6 @@ import com.atlarge.opendc.compute.core.workload.VmWorkload
import com.atlarge.opendc.core.User
import com.atlarge.opendc.format.trace.TraceEntry
import com.atlarge.opendc.format.trace.TraceReader
-import java.io.File
-import java.io.Serializable
-import java.util.SortedSet
-import java.util.TreeSet
-import java.util.UUID
-import java.util.concurrent.ArrayBlockingQueue
-import kotlin.concurrent.thread
-import kotlin.random.Random
import mu.KotlinLogging
import org.apache.avro.generic.GenericData
import org.apache.hadoop.fs.Path
@@ -49,6 +41,14 @@ import org.apache.parquet.filter2.predicate.FilterApi
import org.apache.parquet.filter2.predicate.Statistics
import org.apache.parquet.filter2.predicate.UserDefinedPredicate
import org.apache.parquet.io.api.Binary
+import java.io.File
+import java.io.Serializable
+import java.util.SortedSet
+import java.util.TreeSet
+import java.util.UUID
+import java.util.concurrent.ArrayBlockingQueue
+import kotlin.concurrent.thread
+import kotlin.random.Random
private val logger = KotlinLogging.logger {}
@@ -82,11 +82,14 @@ class Sc20StreamingParquetTraceReader(
if (selectedVms.isEmpty())
null
else
- FilterCompat.get(FilterApi.userDefined(FilterApi.binaryColumn("id"),
- SelectedVmFilter(
- TreeSet(selectedVms)
+ FilterCompat.get(
+ FilterApi.userDefined(
+ FilterApi.binaryColumn("id"),
+ SelectedVmFilter(
+ TreeSet(selectedVms)
+ )
)
- ))
+ )
/**
* A poisonous fragment.
@@ -235,7 +238,8 @@ class Sc20StreamingParquetTraceReader(
Random(random.nextInt())
)
val vmWorkload = VmWorkload(
- uid, "VM Workload $id",
+ uid,
+ "VM Workload $id",
UnnamedUser,
VmImage(
uid,
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt
index 56ddbb6d..a2ce3109 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt
@@ -37,12 +37,6 @@ import com.github.ajalt.clikt.parameters.options.required
import com.github.ajalt.clikt.parameters.options.split
import com.github.ajalt.clikt.parameters.types.file
import com.github.ajalt.clikt.parameters.types.long
-import java.io.BufferedReader
-import java.io.File
-import java.io.FileReader
-import java.util.Random
-import kotlin.math.max
-import kotlin.math.min
import me.tongfei.progressbar.ProgressBar
import org.apache.avro.Schema
import org.apache.avro.SchemaBuilder
@@ -51,6 +45,12 @@ import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
+import java.io.BufferedReader
+import java.io.File
+import java.io.FileReader
+import java.util.Random
+import kotlin.math.max
+import kotlin.math.min
/**
* Represents the command for converting traces
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt
index a46bb3e6..3a2ed4b7 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt
@@ -30,9 +30,9 @@ import com.atlarge.opendc.experiments.sc20.experiment.model.CompositeWorkload
import com.atlarge.opendc.experiments.sc20.experiment.model.SamplingStrategy
import com.atlarge.opendc.experiments.sc20.experiment.model.Workload
import com.atlarge.opendc.format.trace.TraceEntry
+import mu.KotlinLogging
import java.util.*
import kotlin.random.Random
-import mu.KotlinLogging
private val logger = KotlinLogging.logger {}
diff --git a/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt b/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
index a79e9a5a..5ecf7605 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
@@ -42,8 +42,6 @@ import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader
import com.atlarge.opendc.format.environment.EnvironmentReader
import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
import com.atlarge.opendc.format.trace.TraceReader
-import java.io.File
-import java.util.ServiceLoader
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
@@ -54,6 +52,8 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
+import java.io.File
+import java.util.ServiceLoader
/**
* An integration test suite for the SC20 experiments.
diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt
index 1c3f70e6..4c4dcf37 100644
--- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt
+++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt
@@ -24,8 +24,8 @@
package com.atlarge.opendc.format.environment
-import com.atlarge.odcsim.Domain
import com.atlarge.opendc.core.Environment
+import kotlinx.coroutines.CoroutineScope
import java.io.Closeable
/**
@@ -33,7 +33,7 @@ import java.io.Closeable
*/
interface EnvironmentReader : Closeable {
/**
- * Construct an [Environment] in the specified domain.
+ * Construct an [Environment] in the specified [CoroutineScope].
*/
- suspend fun construct(dom: Domain): Environment
+ suspend fun construct(coroutineScope: CoroutineScope): Environment
}
diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
index a9aa3337..2b608aef 100644
--- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
+++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
@@ -24,7 +24,7 @@
package com.atlarge.opendc.format.environment.sc18
-import com.atlarge.odcsim.Domain
+import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.MemoryUnit
import com.atlarge.opendc.compute.core.ProcessingNode
import com.atlarge.opendc.compute.core.ProcessingUnit
@@ -39,6 +39,7 @@ import com.atlarge.opendc.format.environment.EnvironmentReader
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
+import kotlinx.coroutines.CoroutineScope
import java.io.InputStream
import java.util.UUID
@@ -55,8 +56,8 @@ class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb
*/
private val setup: Setup = mapper.readValue(input)
- override suspend fun construct(dom: Domain): Environment {
- val provisioningDomain = dom.newDomain("provisioner")
+ override suspend fun construct(coroutineScope: CoroutineScope): Environment {
+ val clock = simulationContext.clock
var counter = 0
val nodes = setup.rooms.flatMap { room ->
@@ -78,7 +79,8 @@ class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb
}
}
SimpleBareMetalDriver(
- dom.newDomain("node-$counter"),
+ coroutineScope,
+ clock,
UUID.randomUUID(),
"node-${counter++}",
emptyMap(),
@@ -91,14 +93,16 @@ class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb
}
}
- val provisioningService = SimpleProvisioningService(provisioningDomain)
+ val provisioningService = SimpleProvisioningService()
for (node in nodes) {
provisioningService.create(node)
}
val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService)
val platform = Platform(
- UUID.randomUUID(), "sc18-platform", listOf(
+ UUID.randomUUID(),
+ "sc18-platform",
+ listOf(
Zone(UUID.randomUUID(), "zone", serviceRegistry)
)
)
diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt
index e34ee2dc..49118675 100644
--- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt
+++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt
@@ -24,7 +24,7 @@
package com.atlarge.opendc.format.environment.sc20
-import com.atlarge.odcsim.Domain
+import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.MemoryUnit
import com.atlarge.opendc.compute.core.ProcessingNode
import com.atlarge.opendc.compute.core.ProcessingUnit
@@ -38,6 +38,7 @@ import com.atlarge.opendc.core.Platform
import com.atlarge.opendc.core.Zone
import com.atlarge.opendc.core.services.ServiceRegistry
import com.atlarge.opendc.format.environment.EnvironmentReader
+import kotlinx.coroutines.CoroutineScope
import java.io.File
import java.io.FileInputStream
import java.io.InputStream
@@ -56,7 +57,9 @@ class Sc20ClusterEnvironmentReader(
constructor(file: File) : this(FileInputStream(file))
@Suppress("BlockingMethodInNonBlockingContext")
- override suspend fun construct(dom: Domain): Environment {
+ override suspend fun construct(coroutineScope: CoroutineScope): Environment {
+ val clock = simulationContext.clock
+
var clusterIdCol = 0
var speedCol = 0
var numberOfHostsCol = 0
@@ -105,7 +108,8 @@ class Sc20ClusterEnvironmentReader(
repeat(numberOfHosts) {
nodes.add(
SimpleBareMetalDriver(
- dom.newDomain("node-$clusterId-$it"),
+ coroutineScope,
+ clock,
UUID(random.nextLong(), random.nextLong()),
"node-$clusterId-$it",
mapOf(NODE_CLUSTER to clusterId),
@@ -123,7 +127,7 @@ class Sc20ClusterEnvironmentReader(
}
}
- val provisioningService = SimpleProvisioningService(dom.newDomain("provisioner"))
+ val provisioningService = SimpleProvisioningService()
for (node in nodes) {
provisioningService.create(node)
}
@@ -131,7 +135,9 @@ class Sc20ClusterEnvironmentReader(
val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService)
val platform = Platform(
- UUID.randomUUID(), "sc20-platform", listOf(
+ UUID.randomUUID(),
+ "sc20-platform",
+ listOf(
Zone(UUID.randomUUID(), "zone", serviceRegistry)
)
)
diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt
index 4b5d6fb7..f22f595f 100644
--- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt
+++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt
@@ -24,7 +24,7 @@
package com.atlarge.opendc.format.environment.sc20
-import com.atlarge.odcsim.Domain
+import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.MemoryUnit
import com.atlarge.opendc.compute.core.ProcessingNode
import com.atlarge.opendc.compute.core.ProcessingUnit
@@ -40,6 +40,7 @@ import com.atlarge.opendc.format.environment.EnvironmentReader
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
+import kotlinx.coroutines.CoroutineScope
import java.io.InputStream
import java.util.UUID
@@ -55,7 +56,8 @@ class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb
*/
private val setup: Setup = mapper.readValue(input)
- override suspend fun construct(dom: Domain): Environment {
+ override suspend fun construct(coroutineScope: CoroutineScope): Environment {
+ val clock = simulationContext.clock
var counter = 0
val nodes = setup.rooms.flatMap { room ->
room.objects.flatMap { roomObject ->
@@ -82,7 +84,8 @@ class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb
}
}
SimpleBareMetalDriver(
- dom.newDomain("node-$counter"),
+ coroutineScope,
+ clock,
UUID.randomUUID(),
"node-${counter++}",
emptyMap(),
@@ -99,7 +102,7 @@ class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb
}
}
- val provisioningService = SimpleProvisioningService(dom.newDomain("provisioner"))
+ val provisioningService = SimpleProvisioningService()
for (node in nodes) {
provisioningService.create(node)
}
@@ -107,7 +110,9 @@ class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb
val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService)
val platform = Platform(
- UUID.randomUUID(), "sc20-platform", listOf(
+ UUID.randomUUID(),
+ "sc20-platform",
+ listOf(
Zone(UUID.randomUUID(), "zone", serviceRegistry)
)
)
diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt
index 1cabc8bc..6ee43b6a 100644
--- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt
+++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt
@@ -129,7 +129,9 @@ class BitbrainsTraceReader(
)
val vmWorkload = VmWorkload(
- uuid, "VM Workload $vmId", UnnamedUser,
+ uuid,
+ "VM Workload $vmId",
+ UnnamedUser,
VmImage(
uuid,
vmId.toString(),
diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReader.kt
index 3a4e2e89..6db3975e 100644
--- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReader.kt
+++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReader.kt
@@ -120,7 +120,8 @@ class GwfTraceReader(reader: BufferedReader) : TraceReader<Job> {
}
val workflow = entry.workload
val task = Task(
- UUID(0L, taskId), "<unnamed>",
+ UUID(0L, taskId),
+ "<unnamed>",
FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), flops, cores),
HashSet(),
mapOf(WORKFLOW_TASK_DEADLINE to runtime)
@@ -136,9 +137,11 @@ class GwfTraceReader(reader: BufferedReader) : TraceReader<Job> {
// Fix dependencies and dependents for all tasks
taskDependencies.forEach { (task, dependencies) ->
- (task.dependencies as MutableSet<Task>).addAll(dependencies.map { taskId ->
- tasks[taskId] ?: throw IllegalArgumentException("Dependency task with id $taskId not found")
- })
+ (task.dependencies as MutableSet<Task>).addAll(
+ dependencies.map { taskId ->
+ tasks[taskId] ?: throw IllegalArgumentException("Dependency task with id $taskId not found")
+ }
+ )
}
// Create the entry iterator
diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt
index 8e34505a..28dc7793 100644
--- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt
+++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt
@@ -164,7 +164,9 @@ class Sc20TraceReader(
Random(random.nextInt())
)
val vmWorkload = VmWorkload(
- uuid, "VM Workload $vmId", UnnamedUser,
+ uuid,
+ "VM Workload $vmId",
+ UnnamedUser,
VmImage(
uuid,
vmId,
diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReader.kt
index 2f6ce238..f7c74562 100644
--- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReader.kt
+++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReader.kt
@@ -115,7 +115,11 @@ class SwfTraceReader(
for (tick in submitTime until (submitTime + waitTime - sliceDuration) step sliceDuration) {
flopsHistory.add(
FlopsHistoryFragment(
- tick * 1000L, 0L, sliceDuration * 1000L, 0.0, cores
+ tick * 1000L,
+ 0L,
+ sliceDuration * 1000L,
+ 0.0,
+ cores
)
)
slicedWaitTime += sliceDuration
@@ -129,12 +133,18 @@ class SwfTraceReader(
flopsPartialSlice = flopsPerSecond * runtimePartialSliceRemainder
flopsFullSlice = flopsPerSecond * runTime - flopsPartialSlice
- for (tick in (submitTime + slicedWaitTime)
- until (submitTime + slicedWaitTime + runTime - sliceDuration)
- step sliceDuration) {
+ for (
+ tick in (submitTime + slicedWaitTime)
+ until (submitTime + slicedWaitTime + runTime - sliceDuration)
+ step sliceDuration
+ ) {
flopsHistory.add(
FlopsHistoryFragment(
- tick * 1000L, flopsFullSlice / sliceDuration, sliceDuration * 1000L, 1.0, cores
+ tick * 1000L,
+ flopsFullSlice / sliceDuration,
+ sliceDuration * 1000L,
+ 1.0,
+ cores
)
)
}
@@ -153,7 +163,9 @@ class SwfTraceReader(
val uuid = UUID(0L, jobNumber)
val vmWorkload = VmWorkload(
- uuid, "SWF Workload $jobNumber", UnnamedUser,
+ uuid,
+ "SWF Workload $jobNumber",
+ UnnamedUser,
VmImage(
uuid,
jobNumber.toString(),
diff --git a/simulator/opendc/opendc-format/src/test/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReaderTest.kt b/simulator/opendc/opendc-format/src/test/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReaderTest.kt
index 94e4b0fc..41ad8aba 100644
--- a/simulator/opendc/opendc-format/src/test/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReaderTest.kt
+++ b/simulator/opendc/opendc-format/src/test/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReaderTest.kt
@@ -1,8 +1,8 @@
package com.atlarge.opendc.format.trace.swf
-import java.io.File
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
+import java.io.File
class SwfTraceReaderTest {
@Test
diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt
index 807c119e..9cfe5531 100644
--- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt
+++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt
@@ -22,13 +22,13 @@ import com.mongodb.client.MongoClients
import com.mongodb.client.MongoCollection
import com.mongodb.client.MongoDatabase
import com.mongodb.client.model.Filters
-import java.io.File
-import java.util.*
-import kotlin.random.Random
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import mu.KotlinLogging
import org.bson.Document
+import java.io.File
+import java.util.*
+import kotlin.random.Random
private val logger = KotlinLogging.logger {}
diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt
index 39092653..c0b0ac31 100644
--- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt
+++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt
@@ -1,11 +1,11 @@
package com.atlarge.opendc.runner.web
-import java.io.File
import org.apache.spark.sql.Column
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.*
+import java.io.File
/**
* A helper class for processing the experiment results using Apache Spark.
@@ -175,13 +175,19 @@ class ResultProcessor(private val master: String, private val outputPath: File)
val sliceLength = 5 * 60 * 1000
val states = map(
- lit("ERROR"), lit(1),
- lit("ACTIVE"), lit(0),
- lit("SHUTOFF"), lit(0)
+ lit("ERROR"),
+ lit(1),
+ lit("ACTIVE"),
+ lit(0),
+ lit("SHUTOFF"),
+ lit(0)
)
val oppositeStates = map(
- lit("ERROR"), lit(0),
- lit("ACTIVE"), lit(1),
- lit("SHUTOFF"), lit(1)
+ lit("ERROR"),
+ lit(0),
+ lit("ACTIVE"),
+ lit(1),
+ lit("SHUTOFF"),
+ lit(1)
)
}
diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt
index 40ffd282..6ec4995d 100644
--- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt
+++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt
@@ -3,8 +3,8 @@ package com.atlarge.opendc.runner.web
import com.mongodb.client.MongoCollection
import com.mongodb.client.model.Filters
import com.mongodb.client.model.Updates
-import java.time.Instant
import org.bson.Document
+import java.time.Instant
/**
* Manages the queue of scenarios that need to be processed.
diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt
index 499585ec..ab683985 100644
--- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt
+++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt
@@ -1,6 +1,6 @@
package com.atlarge.opendc.runner.web
-import com.atlarge.odcsim.Domain
+import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.MemoryUnit
import com.atlarge.opendc.compute.core.ProcessingNode
import com.atlarge.opendc.compute.core.ProcessingUnit
@@ -20,9 +20,10 @@ import com.mongodb.client.model.Aggregates
import com.mongodb.client.model.Field
import com.mongodb.client.model.Filters
import com.mongodb.client.model.Projections
-import java.util.*
+import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import org.bson.Document
+import java.util.*
/**
* A helper class that converts the MongoDB topology into an OpenDC environment.
@@ -31,7 +32,8 @@ class TopologyParser(private val collection: MongoCollection<Document>, private
/**
* Parse the topology with the specified [id].
*/
- override suspend fun construct(dom: Domain): Environment {
+ override suspend fun construct(coroutineScope: CoroutineScope): Environment {
+ val clock = simulationContext.clock
val nodes = mutableListOf<SimpleBareMetalDriver>()
val random = Random(0)
@@ -59,7 +61,8 @@ class TopologyParser(private val collection: MongoCollection<Document>, private
}
nodes.add(
SimpleBareMetalDriver(
- dom.newDomain(machineId),
+ coroutineScope,
+ clock,
UUID(random.nextLong(), random.nextLong()),
"node-$clusterId-$position",
mapOf(NODE_CLUSTER to clusterId),
@@ -73,8 +76,8 @@ class TopologyParser(private val collection: MongoCollection<Document>, private
)
}
- val provisioningService = SimpleProvisioningService(dom.newDomain("provisioner"))
- dom.launch {
+ val provisioningService = SimpleProvisioningService()
+ coroutineScope.launch {
for (node in nodes) {
provisioningService.create(node)
}
@@ -83,7 +86,9 @@ class TopologyParser(private val collection: MongoCollection<Document>, private
val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService)
val platform = Platform(
- UUID.randomUUID(), "opendc-platform", listOf(
+ UUID.randomUUID(),
+ "opendc-platform",
+ listOf(
Zone(UUID.randomUUID(), "zone", serviceRegistry)
)
)
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
index 1193f7b2..aea27972 100644
--- a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
@@ -24,9 +24,7 @@
package com.atlarge.opendc.workflows.service
-import com.atlarge.odcsim.Domain
import com.atlarge.odcsim.flow.EventFlow
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.ServerEvent
import com.atlarge.opendc.compute.core.ServerState
@@ -39,22 +37,23 @@ import com.atlarge.opendc.workflows.service.stage.resource.ResourceSelectionPoli
import com.atlarge.opendc.workflows.service.stage.task.TaskEligibilityPolicy
import com.atlarge.opendc.workflows.service.stage.task.TaskOrderPolicy
import com.atlarge.opendc.workflows.workload.Job
-import java.util.PriorityQueue
-import java.util.Queue
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
-import kotlinx.coroutines.withContext
+import java.time.Clock
+import java.util.PriorityQueue
+import java.util.Queue
/**
* A [WorkflowService] that distributes work through a multi-stage process based on the Reference Architecture for
* Topology Scheduling.
*/
class StageWorkflowService(
- private val domain: Domain,
+ internal val coroutineScope: CoroutineScope,
+ internal val clock: Clock,
private val provisioningService: ProvisioningService,
mode: WorkflowSchedulerMode,
jobAdmissionPolicy: JobAdmissionPolicy,
@@ -63,7 +62,7 @@ class StageWorkflowService(
taskOrderPolicy: TaskOrderPolicy,
resourceFilterPolicy: ResourceFilterPolicy,
resourceSelectionPolicy: ResourceSelectionPolicy
-) : WorkflowService, CoroutineScope by domain {
+) : WorkflowService {
/**
* The incoming jobs ready to be processed by the scheduler.
@@ -175,7 +174,7 @@ class StageWorkflowService(
private val eventFlow = EventFlow<WorkflowEvent>()
init {
- domain.launch {
+ coroutineScope.launch {
nodes = provisioningService.nodes().toList()
available.addAll(nodes)
}
@@ -191,9 +190,9 @@ class StageWorkflowService(
override val events: Flow<WorkflowEvent> = eventFlow
- override suspend fun submit(job: Job) = withContext(domain.coroutineContext) {
+ override suspend fun submit(job: Job) {
// J1 Incoming Jobs
- val jobInstance = JobState(job, simulationContext.clock.millis())
+ val jobInstance = JobState(job, clock.millis())
val instances = job.tasks.associateWith {
TaskState(jobInstance, it)
}
@@ -241,7 +240,7 @@ class StageWorkflowService(
iterator.remove()
jobQueue.add(jobInstance)
activeJobs += jobInstance
- eventFlow.emit(WorkflowEvent.JobStarted(this, jobInstance.job, simulationContext.clock.millis()))
+ eventFlow.emit(WorkflowEvent.JobStarted(this, jobInstance.job, clock.millis()))
rootListener.jobStarted(jobInstance)
}
@@ -295,7 +294,7 @@ class StageWorkflowService(
taskByServer[server] = instance
server.events
.onEach { event -> if (event is ServerEvent.StateChanged) stateChanged(event.server) }
- .launchIn(this)
+ .launchIn(coroutineScope)
activeTasks += instance
taskQueue.poll()
@@ -310,19 +309,19 @@ class StageWorkflowService(
when (server.state) {
ServerState.ACTIVE -> {
val task = taskByServer.getValue(server)
- task.startedAt = simulationContext.clock.millis()
- eventFlow.emit(WorkflowEvent.TaskStarted(this@StageWorkflowService, task.job.job, task.task, simulationContext.clock.millis()))
+ task.startedAt = clock.millis()
+ eventFlow.emit(WorkflowEvent.TaskStarted(this@StageWorkflowService, task.job.job, task.task, clock.millis()))
rootListener.taskStarted(task)
}
ServerState.SHUTOFF, ServerState.ERROR -> {
val task = taskByServer.remove(server) ?: throw IllegalStateException()
val job = task.job
task.state = TaskStatus.FINISHED
- task.finishedAt = simulationContext.clock.millis()
+ task.finishedAt = clock.millis()
job.tasks.remove(task)
available += task.host!!
activeTasks -= task
- eventFlow.emit(WorkflowEvent.TaskFinished(this@StageWorkflowService, task.job.job, task.task, simulationContext.clock.millis()))
+ eventFlow.emit(WorkflowEvent.TaskFinished(this@StageWorkflowService, task.job.job, task.task, clock.millis()))
rootListener.taskFinished(task)
// Add job roots to the scheduling queue
@@ -347,7 +346,7 @@ class StageWorkflowService(
private suspend fun finishJob(job: JobState) {
activeJobs -= job
- eventFlow.emit(WorkflowEvent.JobFinished(this, job.job, simulationContext.clock.millis()))
+ eventFlow.emit(WorkflowEvent.JobFinished(this, job.job, clock.millis()))
rootListener.jobFinished(job)
}
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt
index 776f0b07..cb075b18 100644
--- a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt
@@ -24,7 +24,6 @@
package com.atlarge.opendc.workflows.service
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.workflows.service.stage.StagePolicy
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
@@ -66,13 +65,12 @@ sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Logic> {
override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic {
override suspend fun requestCycle() {
- val ctx = simulationContext
if (next == null) {
// In batch mode, we assume that the scheduler runs at a fixed slot every time
// quantum (e.g t=0, t=60, t=120). We calculate here the delay until the next scheduling slot.
- val delay = quantum - (ctx.clock.millis() % quantum)
+ val delay = quantum - (scheduler.clock.millis() % quantum)
- val job = ctx.domain.launch {
+ val job = scheduler.coroutineScope.launch {
delay(delay)
next = null
scheduler.schedule()
@@ -93,11 +91,10 @@ sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Logic> {
override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic {
override suspend fun requestCycle() {
- val ctx = simulationContext
if (next == null) {
val delay = random.nextInt(200).toLong()
- val job = ctx.domain.launch {
+ val job = scheduler.coroutineScope.launch {
delay(delay)
next = null
scheduler.schedule()
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt
index a60ba0e2..ad818dde 100644
--- a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt
@@ -26,8 +26,8 @@ package com.atlarge.opendc.workflows.service
import com.atlarge.opendc.core.services.AbstractServiceKey
import com.atlarge.opendc.workflows.workload.Job
-import java.util.UUID
import kotlinx.coroutines.flow.Flow
+import java.util.UUID
/**
* A service for cloud workflow management.
diff --git a/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt b/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
index 5c129e37..655d8e1d 100644
--- a/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
+++ b/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
@@ -35,8 +35,6 @@ import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelec
import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy
import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy
import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy
-import java.util.ServiceLoader
-import kotlin.math.max
import kotlinx.coroutines.async
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
@@ -46,6 +44,8 @@ import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
+import java.util.ServiceLoader
+import kotlin.math.max
/**
* Integration test suite for the [StageWorkflowService].
@@ -68,11 +68,13 @@ internal class StageWorkflowSchedulerIntegrationTest {
val schedulerDomain = system.newDomain(name = "scheduler")
val schedulerAsync = schedulerDomain.async {
+ val clock = simulationContext.clock
val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json"))
- .use { it.construct(system.newDomain("topology")) }
+ .use { it.construct(schedulerDomain) }
StageWorkflowService(
schedulerDomain,
+ clock,
environment.platforms[0].zones[0].services[ProvisioningService],
mode = WorkflowSchedulerMode.Batch(100),
jobAdmissionPolicy = NullJobAdmissionPolicy,