summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-09-30 21:14:20 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-09-30 23:40:57 +0200
commitc41d201343263346ac84855a0b2254051ed33c21 (patch)
tree9141a382f9e1b2d924e9a191e53cc6daa9107563
parentc543f55e961f9f7468e19c1c0f5f20566d07dfb5 (diff)
Eliminate use of Domain and simulationContext in OpenDC
This change takes the first step in eliminating the explict use of Domain and simulationContext from OpenDC. In this way, we decouple the logic of various datacenter services from simulation logic, which should promote re-use.
-rw-r--r--simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationContext.kt2
-rw-r--r--simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/EventFlow.kt2
-rw-r--r--simulator/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt28
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt2
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt6
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt4
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt2
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt2
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt69
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt23
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt2
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt2
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt33
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt2
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt221
-rw-r--r--simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt2
-rw-r--r--simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt6
-rw-r--r--simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt10
-rw-r--r--simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt49
-rw-r--r--simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt19
-rw-r--r--simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt4
-rw-r--r--simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt9
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt2
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt21
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt6
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt2
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt2
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt2
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt10
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt2
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt2
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt2
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt7
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt30
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt12
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt2
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt4
-rw-r--r--simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt6
-rw-r--r--simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt16
-rw-r--r--simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt16
-rw-r--r--simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt15
-rw-r--r--simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt4
-rw-r--r--simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReader.kt11
-rw-r--r--simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt4
-rw-r--r--simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReader.kt24
-rw-r--r--simulator/opendc/opendc-format/src/test/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReaderTest.kt2
-rw-r--r--simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt6
-rw-r--r--simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt20
-rw-r--r--simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt2
-rw-r--r--simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt19
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt33
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt9
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt2
-rw-r--r--simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt8
54 files changed, 446 insertions, 356 deletions
diff --git a/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationContext.kt b/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationContext.kt
index c51d1d8b..3423d43a 100644
--- a/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationContext.kt
+++ b/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationContext.kt
@@ -24,10 +24,10 @@
package com.atlarge.odcsim
+import org.slf4j.Logger
import java.time.Clock
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
-import org.slf4j.Logger
/**
* Represents the execution context of a simulation domain.
diff --git a/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/EventFlow.kt b/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/EventFlow.kt
index 0e18f82f..5d9af9ec 100644
--- a/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/EventFlow.kt
+++ b/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/EventFlow.kt
@@ -24,7 +24,6 @@
package com.atlarge.odcsim.flow
-import java.util.WeakHashMap
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.InternalCoroutinesApi
@@ -33,6 +32,7 @@ import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.consumeAsFlow
+import java.util.WeakHashMap
/**
* A [Flow] that can be used to emit events.
diff --git a/simulator/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt b/simulator/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt
index e675b877..e4d0f4ac 100644
--- a/simulator/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt
+++ b/simulator/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt
@@ -28,13 +28,6 @@ import com.atlarge.odcsim.Domain
import com.atlarge.odcsim.SimulationContext
import com.atlarge.odcsim.SimulationEngine
import com.atlarge.odcsim.engine.omega.logging.LoggerImpl
-import java.time.Clock
-import java.time.Instant
-import java.time.ZoneId
-import java.util.PriorityQueue
-import java.util.UUID
-import kotlin.coroutines.CoroutineContext
-import kotlin.coroutines.coroutineContext
import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineExceptionHandler
@@ -48,6 +41,13 @@ import kotlinx.coroutines.Runnable
import kotlinx.coroutines.SupervisorJob
import org.jetbrains.annotations.Async
import org.slf4j.Logger
+import java.time.Clock
+import java.time.Instant
+import java.time.ZoneId
+import java.util.PriorityQueue
+import java.util.UUID
+import kotlin.coroutines.CoroutineContext
+import kotlin.coroutines.coroutineContext
/**
* The reference implementation of the [SimulationEngine] instance for the OpenDC simulation core.
@@ -71,12 +71,14 @@ public class OmegaSimulationEngine(override val name: String) : SimulationEngine
/**
* The event queue to process
*/
- private val queue: PriorityQueue<Event> = PriorityQueue(Comparator<Event> { lhs, rhs ->
- // Note that Comparator gives better performance than Comparable according to
- // profiling
- val cmp = lhs.time.compareTo(rhs.time)
- if (cmp == 0) lhs.id.compareTo(rhs.id) else cmp
- })
+ private val queue: PriorityQueue<Event> = PriorityQueue(
+ Comparator<Event> { lhs, rhs ->
+ // Note that Comparator gives better performance than Comparable according to
+// profiling
+ val cmp = lhs.time.compareTo(rhs.time)
+ if (cmp == 0) lhs.id.compareTo(rhs.id) else cmp
+ }
+ )
/**
* The active processes in the simulation engine.
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt
index fd0fc836..01968cd8 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt
@@ -28,8 +28,8 @@ import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.core.resource.Resource
import com.atlarge.opendc.core.resource.TagContainer
import com.atlarge.opendc.core.services.ServiceRegistry
-import java.util.UUID
import kotlinx.coroutines.flow.Flow
+import java.util.UUID
/**
* A server instance that is running on some physical or virtual machine.
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt
index f770fa49..817bee4b 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt
@@ -30,12 +30,18 @@ import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.core.services.ServiceKey
import kotlinx.coroutines.selects.SelectClause0
import kotlinx.coroutines.selects.select
+import java.time.Clock
/**
* Represents the execution context in which a bootable [Image] runs on a [Server].
*/
public interface ServerContext {
/**
+ * The virtual clock.
+ */
+ public val clock: Clock
+
+ /**
* The server on which the image runs.
*/
public val server: Server
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
index c615d865..0e1af093 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
@@ -1,6 +1,5 @@
package com.atlarge.opendc.compute.core.image
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.execution.ServerContext
import com.atlarge.opendc.core.resource.TagContainer
import java.util.UUID
@@ -16,8 +15,7 @@ class VmImage(
) : Image {
override suspend fun invoke(ctx: ServerContext) {
- val clock = simulationContext.clock
- var offset = clock.millis()
+ var offset = ctx.clock.millis()
val batch = flopsHistory.map { fragment ->
val cores = min(fragment.cores, ctx.server.flavor.cpuCount)
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt
index cb637aea..7cb4c0c5 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt
@@ -27,8 +27,8 @@ package com.atlarge.opendc.compute.metal
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.core.Identity
-import java.util.UUID
import kotlinx.coroutines.flow.Flow
+import java.util.UUID
/**
* A bare-metal compute node.
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt
index 17d8ee53..41cec291 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt
@@ -30,8 +30,8 @@ import com.atlarge.opendc.compute.metal.Node
import com.atlarge.opendc.core.failure.FailureDomain
import com.atlarge.opendc.core.power.Powerable
import com.atlarge.opendc.core.services.AbstractServiceKey
-import java.util.UUID
import kotlinx.coroutines.flow.Flow
+import java.util.UUID
/**
* A driver interface for the management interface of a bare-metal compute node.
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
index a453e459..c118cc3d 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
@@ -24,8 +24,6 @@
package com.atlarge.opendc.compute.metal.driver
-import com.atlarge.odcsim.Domain
-import com.atlarge.odcsim.SimulationContext
import com.atlarge.odcsim.flow.EventFlow
import com.atlarge.odcsim.flow.StateFlow
import com.atlarge.opendc.compute.core.Flavor
@@ -46,14 +44,6 @@ import com.atlarge.opendc.compute.metal.power.ConstantPowerModel
import com.atlarge.opendc.core.power.PowerModel
import com.atlarge.opendc.core.services.ServiceKey
import com.atlarge.opendc.core.services.ServiceRegistry
-import java.lang.Exception
-import java.time.Clock
-import java.util.UUID
-import kotlin.coroutines.ContinuationInterceptor
-import kotlin.math.ceil
-import kotlin.math.max
-import kotlin.math.min
-import kotlin.random.Random
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Delay
import kotlinx.coroutines.DisposableHandle
@@ -67,12 +57,20 @@ import kotlinx.coroutines.intrinsics.startCoroutineCancellable
import kotlinx.coroutines.launch
import kotlinx.coroutines.selects.SelectClause0
import kotlinx.coroutines.selects.SelectInstance
-import kotlinx.coroutines.withContext
+import java.lang.Exception
+import java.time.Clock
+import java.util.UUID
+import kotlin.coroutines.ContinuationInterceptor
+import kotlin.math.ceil
+import kotlin.math.max
+import kotlin.math.min
+import kotlin.random.Random
/**
* A basic implementation of the [BareMetalDriver] that simulates an [Image] running on a bare-metal machine.
*
- * @param domain The simulation domain the driver runs in.
+ * @param coroutineScope The [CoroutineScope] the driver runs in.
+ * @param clock The virtual clock to keep track of time.
* @param uid The unique identifier of the machine.
* @param name An optional name of the machine.
* @param metadata The initial metadata of the node.
@@ -82,7 +80,8 @@ import kotlinx.coroutines.withContext
*/
@OptIn(ExperimentalCoroutinesApi::class)
public class SimpleBareMetalDriver(
- private val domain: Domain,
+ private val coroutineScope: CoroutineScope,
+ private val clock: Clock,
uid: UUID,
name: String,
metadata: Map<String, Any>,
@@ -129,14 +128,14 @@ public class SimpleBareMetalDriver(
*/
private val random = Random(uid.leastSignificantBits xor uid.mostSignificantBits)
- override suspend fun init(): Node = withContext(domain.coroutineContext) {
- nodeState.value
+ override suspend fun init(): Node {
+ return nodeState.value
}
- override suspend fun start(): Node = withContext(domain.coroutineContext) {
+ override suspend fun start(): Node {
val node = nodeState.value
if (node.state != NodeState.SHUTOFF) {
- return@withContext node
+ return node
}
val events = EventFlow<ServerEvent>()
@@ -153,13 +152,13 @@ public class SimpleBareMetalDriver(
setNode(node.copy(state = NodeState.BOOT, server = server))
serverContext = BareMetalServerContext(events)
- return@withContext nodeState.value
+ return nodeState.value
}
- override suspend fun stop(): Node = withContext(domain.coroutineContext) {
+ override suspend fun stop(): Node {
val node = nodeState.value
if (node.state == NodeState.SHUTOFF) {
- return@withContext node
+ return node
}
// We terminate the image running on the machine
@@ -167,20 +166,20 @@ public class SimpleBareMetalDriver(
serverContext = null
setNode(node.copy(state = NodeState.SHUTOFF, server = null))
- return@withContext node
+ return node
}
- override suspend fun reboot(): Node = withContext(domain.coroutineContext) {
+ override suspend fun reboot(): Node {
stop()
- start()
+ return start()
}
- override suspend fun setImage(image: Image): Node = withContext(domain.coroutineContext) {
+ override suspend fun setImage(image: Image): Node {
setNode(nodeState.value.copy(image = image))
- return@withContext nodeState.value
+ return nodeState.value
}
- override suspend fun refresh(): Node = withContext(domain.coroutineContext) { nodeState.value }
+ override suspend fun refresh(): Node = nodeState.value
private fun setNode(value: Node) {
val field = nodeState.value
@@ -207,7 +206,10 @@ public class SimpleBareMetalDriver(
override val server: Server
get() = nodeState.value.server!!
- private val job = domain.launch {
+ override val clock: Clock
+ get() = this@SimpleBareMetalDriver.clock
+
+ private val job = coroutineScope.launch {
delay(1) // TODO Introduce boot time
init()
try {
@@ -265,18 +267,13 @@ public class SimpleBareMetalDriver(
private var usageFlush: DisposableHandle? = null
/**
- * Cache the [Clock] for timing.
- */
- private val clock = domain.coroutineContext[SimulationContext]!!.clock
-
- /**
* Cache the [Delay] instance for timing.
*
* XXX We need to cache this before the call to [onRun] since doing this in [onRun] is too heavy.
* XXX Note however that this is an ugly hack which may break in the future.
*/
@OptIn(InternalCoroutinesApi::class)
- private val delay = domain.coroutineContext[ContinuationInterceptor] as Delay
+ private val delay = coroutineScope.coroutineContext[ContinuationInterceptor] as Delay
@OptIn(InternalCoroutinesApi::class)
override fun onRun(
@@ -353,10 +350,10 @@ public class SimpleBareMetalDriver(
currentDisposable?.dispose()
// Schedule reset the usage of the machine since the call is returning
- usageFlush = delay.invokeOnTimeout(1, Runnable {
+ usageFlush = delay.invokeOnTimeout(1) {
usageState.value = 0.0
usageFlush = null
- })
+ }
}
select.disposeOnSelect(disposable)
@@ -458,7 +455,7 @@ public class SimpleBareMetalDriver(
}
override val scope: CoroutineScope
- get() = domain
+ get() = coroutineScope
override suspend fun fail() {
serverContext?.unavailable = true
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt
index f6b236ae..f64f9b5a 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt
@@ -24,44 +24,41 @@
package com.atlarge.opendc.compute.metal.service
-import com.atlarge.odcsim.Domain
import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.compute.metal.Node
import com.atlarge.opendc.compute.metal.driver.BareMetalDriver
import kotlinx.coroutines.CancellationException
-import kotlinx.coroutines.withContext
/**
* A very basic implementation of the [ProvisioningService].
*/
-public class SimpleProvisioningService(val domain: Domain) : ProvisioningService {
+public class SimpleProvisioningService : ProvisioningService {
/**
* The active nodes in this service.
*/
private val nodes: MutableMap<Node, BareMetalDriver> = mutableMapOf()
- override suspend fun create(driver: BareMetalDriver): Node = withContext(domain.coroutineContext) {
+ override suspend fun create(driver: BareMetalDriver): Node {
val node = driver.init()
nodes[node] = driver
- return@withContext node
+ return node
}
- override suspend fun nodes(): Set<Node> = withContext(domain.coroutineContext) { nodes.keys }
+ override suspend fun nodes(): Set<Node> = nodes.keys
- override suspend fun refresh(node: Node): Node = withContext(domain.coroutineContext) {
- return@withContext nodes[node]!!.refresh()
+ override suspend fun refresh(node: Node): Node {
+ return nodes[node]!!.refresh()
}
- override suspend fun deploy(node: Node, image: Image): Node = withContext(domain.coroutineContext) {
+ override suspend fun deploy(node: Node, image: Image): Node {
val driver = nodes[node]!!
driver.setImage(image)
- val newNode = driver.reboot()
- return@withContext newNode
+ return driver.reboot()
}
- override suspend fun stop(node: Node): Node = withContext(domain.coroutineContext) {
+ override suspend fun stop(node: Node): Node {
val driver = nodes[node]!!
- try {
+ return try {
driver.stop()
} catch (e: CancellationException) {
node
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt
index 1e7e351f..69b0124d 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt
@@ -25,8 +25,8 @@
package com.atlarge.opendc.compute.virt
import com.atlarge.opendc.core.Identity
-import java.util.UUID
import kotlinx.coroutines.flow.Flow
+import java.util.UUID
/**
* A hypervisor (or virtual machine monitor) is software or firmware that virtualizes the host compute environment
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt
index 607759a8..bd395f0d 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt
@@ -29,9 +29,9 @@ import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver
import com.atlarge.opendc.compute.virt.driver.VirtDriver
import com.atlarge.opendc.core.resource.TagContainer
-import java.util.UUID
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.suspendCancellableCoroutine
+import java.util.UUID
/**
* A hypervisor managing the VMs of a node.
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
index 192db413..df45f440 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
@@ -25,7 +25,6 @@
package com.atlarge.opendc.compute.virt.driver
import com.atlarge.odcsim.flow.EventFlow
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.Server
@@ -40,10 +39,6 @@ import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
import com.atlarge.opendc.compute.virt.HypervisorEvent
import com.atlarge.opendc.core.services.ServiceKey
import com.atlarge.opendc.core.services.ServiceRegistry
-import java.util.UUID
-import kotlin.math.ceil
-import kotlin.math.max
-import kotlin.math.min
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.DisposableHandle
@@ -59,6 +54,17 @@ import kotlinx.coroutines.launch
import kotlinx.coroutines.selects.SelectClause0
import kotlinx.coroutines.selects.SelectInstance
import kotlinx.coroutines.selects.select
+import mu.KotlinLogging
+import java.time.Clock
+import java.util.UUID
+import kotlin.math.ceil
+import kotlin.math.max
+import kotlin.math.min
+
+/**
+ * The logging instance to use.
+ */
+private val logger = KotlinLogging.logger {}
/**
* A [VirtDriver] that is backed by a simple hypervisor implementation.
@@ -97,7 +103,7 @@ class SimpleVirtDriver(
scheduler()
} catch (e: Exception) {
if (e !is CancellationException) {
- simulationContext.log.error("Hypervisor scheduler failed", e)
+ logger.error("Hypervisor scheduler failed", e)
}
throw e
}
@@ -117,8 +123,14 @@ class SimpleVirtDriver(
val events = EventFlow<ServerEvent>()
val server = Server(
- UUID.randomUUID(), name, emptyMap(), flavor, image, ServerState.BUILD,
- ServiceRegistry(), events
+ UUID.randomUUID(),
+ name,
+ emptyMap(),
+ flavor,
+ image,
+ ServerState.BUILD,
+ ServiceRegistry(),
+ events
)
availableMemory -= requiredMemory
vms.add(VmServerContext(server, events))
@@ -181,7 +193,7 @@ class SimpleVirtDriver(
* The scheduling process of the hypervisor.
*/
private suspend fun scheduler() {
- val clock = simulationContext.clock
+ val clock = hostContext.clock
val maxUsage = hostContext.cpus.sumByDouble { it.frequency }
val pCPUs = hostContext.cpus.indices.sortedBy { hostContext.cpus[it].frequency }
@@ -561,6 +573,9 @@ class SimpleVirtDriver(
override val cpus: List<ProcessingUnit> = hostContext.cpus.take(server.flavor.cpuCount)
+ override val clock: Clock
+ get() = hostContext.clock
+
init {
vm = Vm(this)
}
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt
index b1844f67..1002d382 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt
@@ -29,8 +29,8 @@ import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.compute.virt.HypervisorEvent
import com.atlarge.opendc.core.services.AbstractServiceKey
-import java.util.UUID
import kotlinx.coroutines.flow.Flow
+import java.util.UUID
/**
* A driver interface for a hypervisor running on some host server and communicating with the central compute service to
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
index 79388bc3..6b2cfc40 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
@@ -1,8 +1,6 @@
package com.atlarge.opendc.compute.virt.service
-import com.atlarge.odcsim.SimulationContext
import com.atlarge.odcsim.flow.EventFlow
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.ServerEvent
@@ -16,29 +14,25 @@ import com.atlarge.opendc.compute.virt.driver.InsufficientMemoryOnServerExceptio
import com.atlarge.opendc.compute.virt.driver.VirtDriver
import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy
import com.atlarge.opendc.core.services.ServiceKey
-import kotlin.coroutines.Continuation
-import kotlin.coroutines.resume
-import kotlin.math.max
-import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.ExperimentalCoroutinesApi
-import kotlinx.coroutines.Job
-import kotlinx.coroutines.delay
+import kotlinx.coroutines.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.suspendCancellableCoroutine
-import kotlinx.coroutines.withContext
import mu.KotlinLogging
+import java.time.Clock
+import kotlin.coroutines.Continuation
+import kotlin.coroutines.resume
+import kotlin.math.max
private val logger = KotlinLogging.logger {}
@OptIn(ExperimentalCoroutinesApi::class)
class SimpleVirtProvisioningService(
- public override val allocationPolicy: AllocationPolicy,
- private val ctx: SimulationContext,
- private val provisioningService: ProvisioningService
-) : VirtProvisioningService, CoroutineScope by ctx.domain {
+ private val coroutineScope: CoroutineScope,
+ private val clock: Clock,
+ private val provisioningService: ProvisioningService,
+ override val allocationPolicy: AllocationPolicy
+) : VirtProvisioningService {
/**
* The hypervisors that have been launched by the service.
*/
@@ -59,11 +53,11 @@ class SimpleVirtProvisioningService(
*/
private val activeImages: MutableSet<ImageView> = mutableSetOf()
- public var submittedVms = 0
- public var queuedVms = 0
- public var runningVms = 0
- public var finishedVms = 0
- public var unscheduledVms = 0
+ var submittedVms = 0
+ var queuedVms = 0
+ var runningVms = 0
+ var finishedVms = 0
+ var unscheduledVms = 0
private var maxCores = 0
private var maxMemory = 0L
@@ -81,7 +75,7 @@ class SimpleVirtProvisioningService(
override val events: Flow<VirtProvisioningEvent> = eventFlow
init {
- launch {
+ coroutineScope.launch {
val provisionedNodes = provisioningService.nodes()
provisionedNodes.forEach { node ->
val hypervisorImage = HypervisorImage
@@ -96,27 +90,29 @@ class SimpleVirtProvisioningService(
}
}
- override suspend fun drivers(): Set<VirtDriver> = withContext(coroutineContext) {
- availableHypervisors.map { it.driver }.toSet()
+ override suspend fun drivers(): Set<VirtDriver> {
+ return availableHypervisors.map { it.driver }.toSet()
}
override suspend fun deploy(
name: String,
image: Image,
flavor: Flavor
- ): Server = withContext(coroutineContext) {
- eventFlow.emit(VirtProvisioningEvent.MetricsAvailable(
- this@SimpleVirtProvisioningService,
- hypervisors.size,
- availableHypervisors.size,
- ++submittedVms,
- runningVms,
- finishedVms,
- ++queuedVms,
- unscheduledVms
- ))
-
- suspendCancellableCoroutine<Server> { cont ->
+ ): Server {
+ eventFlow.emit(
+ VirtProvisioningEvent.MetricsAvailable(
+ this@SimpleVirtProvisioningService,
+ hypervisors.size,
+ availableHypervisors.size,
+ ++submittedVms,
+ runningVms,
+ finishedVms,
+ ++queuedVms,
+ unscheduledVms
+ )
+ )
+
+ return suspendCancellableCoroutine<Server> { cont ->
val vmInstance = ImageView(name, image, flavor, cont)
incomingImages += vmInstance
requestCycle()
@@ -139,9 +135,9 @@ class SimpleVirtProvisioningService(
// We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120).
// This is important because the slices of the VMs need to be aligned.
// We calculate here the delay until the next scheduling slot.
- val delay = quantum - (ctx.clock.millis() % quantum)
+ val delay = quantum - (clock.millis() % quantum)
- val call = launch {
+ val call = coroutineScope.launch {
delay(delay)
this@SimpleVirtProvisioningService.call = null
schedule()
@@ -150,7 +146,6 @@ class SimpleVirtProvisioningService(
}
private suspend fun schedule() {
- val clock = simulationContext.clock
val imagesToBeScheduled = incomingImages.toSet()
for (imageInstance in imagesToBeScheduled) {
@@ -159,16 +154,18 @@ class SimpleVirtProvisioningService(
if (selectedHv == null) {
if (requiredMemory > maxMemory || imageInstance.flavor.cpuCount > maxCores) {
- eventFlow.emit(VirtProvisioningEvent.MetricsAvailable(
- this@SimpleVirtProvisioningService,
- hypervisors.size,
- availableHypervisors.size,
- submittedVms,
- runningVms,
- finishedVms,
- queuedVms,
- ++unscheduledVms
- ))
+ eventFlow.emit(
+ VirtProvisioningEvent.MetricsAvailable(
+ this@SimpleVirtProvisioningService,
+ hypervisors.size,
+ availableHypervisors.size,
+ submittedVms,
+ runningVms,
+ finishedVms,
+ queuedVms,
+ ++unscheduledVms
+ )
+ )
incomingImages -= imageInstance
@@ -180,7 +177,7 @@ class SimpleVirtProvisioningService(
}
try {
- logger.info { "[${ctx.clock.millis()}] Spawning ${imageInstance.image} on ${selectedHv.server.uid} ${selectedHv.server.name} ${selectedHv.server.flavor}" }
+ logger.info { "[${clock.millis()}] Spawning ${imageInstance.image} on ${selectedHv.server.uid} ${selectedHv.server.name} ${selectedHv.server.flavor}" }
incomingImages -= imageInstance
// Speculatively update the hypervisor view information to prevent other images in the queue from
@@ -197,16 +194,18 @@ class SimpleVirtProvisioningService(
imageInstance.server = server
imageInstance.continuation.resume(server)
- eventFlow.emit(VirtProvisioningEvent.MetricsAvailable(
- this@SimpleVirtProvisioningService,
- hypervisors.size,
- availableHypervisors.size,
- submittedVms,
- ++runningVms,
- finishedVms,
- --queuedVms,
- unscheduledVms
- ))
+ eventFlow.emit(
+ VirtProvisioningEvent.MetricsAvailable(
+ this@SimpleVirtProvisioningService,
+ hypervisors.size,
+ availableHypervisors.size,
+ submittedVms,
+ ++runningVms,
+ finishedVms,
+ --queuedVms,
+ unscheduledVms
+ )
+ )
activeImages += imageInstance
server.events
@@ -214,18 +213,20 @@ class SimpleVirtProvisioningService(
when (event) {
is ServerEvent.StateChanged -> {
if (event.server.state == ServerState.SHUTOFF) {
- logger.info { "[${ctx.clock.millis()}] Server ${event.server.uid} ${event.server.name} ${event.server.flavor} finished." }
-
- eventFlow.emit(VirtProvisioningEvent.MetricsAvailable(
- this@SimpleVirtProvisioningService,
- hypervisors.size,
- availableHypervisors.size,
- submittedVms,
- --runningVms,
- ++finishedVms,
- queuedVms,
- unscheduledVms
- ))
+ logger.info { "[${clock.millis()}] Server ${event.server.uid} ${event.server.name} ${event.server.flavor} finished." }
+
+ eventFlow.emit(
+ VirtProvisioningEvent.MetricsAvailable(
+ this@SimpleVirtProvisioningService,
+ hypervisors.size,
+ availableHypervisors.size,
+ submittedVms,
+ --runningVms,
+ ++finishedVms,
+ queuedVms,
+ unscheduledVms
+ )
+ )
activeImages -= imageInstance
selectedHv.provisionedCores -= server.flavor.cpuCount
@@ -238,7 +239,7 @@ class SimpleVirtProvisioningService(
}
}
}
- .launchIn(this)
+ .launchIn(coroutineScope)
} catch (e: InsufficientMemoryOnServerException) {
logger.error("Failed to deploy VM", e)
@@ -254,7 +255,7 @@ class SimpleVirtProvisioningService(
private fun stateChanged(server: Server) {
when (server.state) {
ServerState.ACTIVE -> {
- logger.debug { "[${ctx.clock.millis()}] Server ${server.uid} available: ${server.state}" }
+ logger.debug { "[${clock.millis()}] Server ${server.uid} available: ${server.state}" }
if (server in hypervisors) {
// Corner case for when the hypervisor already exists
@@ -272,16 +273,18 @@ class SimpleVirtProvisioningService(
hypervisors[server] = hv
}
- eventFlow.emit(VirtProvisioningEvent.MetricsAvailable(
- this@SimpleVirtProvisioningService,
- hypervisors.size,
- availableHypervisors.size,
- submittedVms,
- runningVms,
- finishedVms,
- queuedVms,
- unscheduledVms
- ))
+ eventFlow.emit(
+ VirtProvisioningEvent.MetricsAvailable(
+ this@SimpleVirtProvisioningService,
+ hypervisors.size,
+ availableHypervisors.size,
+ submittedVms,
+ runningVms,
+ finishedVms,
+ queuedVms,
+ unscheduledVms
+ )
+ )
// Re-schedule on the new machine
if (incomingImages.isNotEmpty()) {
@@ -289,20 +292,22 @@ class SimpleVirtProvisioningService(
}
}
ServerState.SHUTOFF, ServerState.ERROR -> {
- logger.debug { "[${ctx.clock.millis()}] Server ${server.uid} unavailable: ${server.state}" }
+ logger.debug { "[${clock.millis()}] Server ${server.uid} unavailable: ${server.state}" }
val hv = hypervisors[server] ?: return
availableHypervisors -= hv
- eventFlow.emit(VirtProvisioningEvent.MetricsAvailable(
- this@SimpleVirtProvisioningService,
- hypervisors.size,
- availableHypervisors.size,
- submittedVms,
- runningVms,
- finishedVms,
- queuedVms,
- unscheduledVms
- ))
+ eventFlow.emit(
+ VirtProvisioningEvent.MetricsAvailable(
+ this@SimpleVirtProvisioningService,
+ hypervisors.size,
+ availableHypervisors.size,
+ submittedVms,
+ runningVms,
+ finishedVms,
+ queuedVms,
+ unscheduledVms
+ )
+ )
if (incomingImages.isNotEmpty()) {
requestCycle()
@@ -318,16 +323,18 @@ class SimpleVirtProvisioningService(
hv.driver = server.services[VirtDriver]
availableHypervisors += hv
- eventFlow.emit(VirtProvisioningEvent.MetricsAvailable(
- this@SimpleVirtProvisioningService,
- hypervisors.size,
- availableHypervisors.size,
- submittedVms,
- runningVms,
- finishedVms,
- queuedVms,
- unscheduledVms
- ))
+ eventFlow.emit(
+ VirtProvisioningEvent.MetricsAvailable(
+ this@SimpleVirtProvisioningService,
+ hypervisors.size,
+ availableHypervisors.size,
+ submittedVms,
+ runningVms,
+ finishedVms,
+ queuedVms,
+ unscheduledVms
+ )
+ )
hv.driver.events
.onEach { event ->
@@ -335,7 +342,7 @@ class SimpleVirtProvisioningService(
hv.numberOfActiveServers = event.numberOfActiveServers
hv.availableMemory = event.availableMemory
}
- }.launchIn(this)
+ }.launchIn(coroutineScope)
requestCycle()
}
diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt
index 1c7b751c..417db77d 100644
--- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt
+++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt
@@ -24,10 +24,10 @@
package com.atlarge.opendc.compute.core.image
-import java.util.UUID
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
+import java.util.UUID
/**
* Test suite for [FlopsApplicationImage]
diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
index af9d3421..80c9c547 100644
--- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
+++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
@@ -31,8 +31,6 @@ import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.ServerEvent
import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.core.image.FlopsApplicationImage
-import java.util.ServiceLoader
-import java.util.UUID
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
@@ -41,6 +39,8 @@ import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
+import java.util.ServiceLoader
+import java.util.UUID
internal class SimpleBareMetalDriverTest {
/**
@@ -56,7 +56,7 @@ internal class SimpleBareMetalDriverTest {
val dom = root.newDomain(name = "driver")
val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4)
val cpus = List(4) { ProcessingUnit(cpuNode, it, 2400.0) }
- val driver = SimpleBareMetalDriver(dom, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
+ val driver = SimpleBareMetalDriver(dom, simulationContext.clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1_000, 2)
// Batch driver commands
diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt
index ed2256c0..37cd5898 100644
--- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt
+++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt
@@ -25,17 +25,18 @@
package com.atlarge.opendc.compute.metal.service
import com.atlarge.odcsim.SimulationEngineProvider
+import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.ProcessingNode
import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.image.FlopsApplicationImage
import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver
-import java.util.ServiceLoader
-import java.util.UUID
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Test
+import java.util.ServiceLoader
+import java.util.UUID
/**
* Test suite for the [SimpleProvisioningService].
@@ -50,14 +51,15 @@ internal class SimpleProvisioningServiceTest {
val system = provider("sim")
val root = system.newDomain(name = "root")
root.launch {
+ val clock = simulationContext.clock
val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1000, 2)
val dom = root.newDomain("provisioner")
val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4)
val cpus = List(4) { ProcessingUnit(cpuNode, it, 2400.0) }
- val driver = SimpleBareMetalDriver(dom.newDomain(), UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
+ val driver = SimpleBareMetalDriver(dom.newDomain(), clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
- val provisioner = SimpleProvisioningService(dom)
+ val provisioner = SimpleProvisioningService()
provisioner.create(driver)
delay(5)
val nodes = provisioner.nodes()
diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt
index 622b185e..528434b1 100644
--- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt
+++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt
@@ -25,6 +25,7 @@
package com.atlarge.opendc.compute.virt
import com.atlarge.odcsim.SimulationEngineProvider
+import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.ProcessingNode
import com.atlarge.opendc.compute.core.ProcessingUnit
@@ -33,8 +34,6 @@ import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment
import com.atlarge.opendc.compute.core.image.VmImage
import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver
import com.atlarge.opendc.compute.virt.driver.VirtDriver
-import java.util.ServiceLoader
-import java.util.UUID
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.launchIn
@@ -45,6 +44,8 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
+import java.util.ServiceLoader
+import java.util.UUID
/**
* Basic test-suite for the hypervisor.
@@ -62,6 +63,7 @@ internal class HypervisorTest {
val root = system.newDomain("root")
root.launch {
+ val clock = simulationContext.clock
val vmm = HypervisorImage
val workloadA = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1_000, 1)
val workloadB = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 2_000, 1)
@@ -70,7 +72,7 @@ internal class HypervisorTest {
val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 1)
val cpus = List(1) { ProcessingUnit(cpuNode, it, 2000.0) }
- val metalDriver = SimpleBareMetalDriver(driverDom, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
+ val metalDriver = SimpleBareMetalDriver(driverDom, clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
metalDriver.init()
metalDriver.setImage(vmm)
@@ -108,26 +110,41 @@ internal class HypervisorTest {
var overcommissionedBurst = 0L
root.launch {
+ val clock = simulationContext.clock
val vmm = HypervisorImage
val duration = 5 * 60L
- val vmImageA = VmImage(UUID.randomUUID(), "<unnamed>", emptyMap(), sequenceOf(
- FlopsHistoryFragment(0, 28L * duration, duration * 1000, 28.0, 2),
- FlopsHistoryFragment(0, 3500L * duration, duration * 1000, 3500.0, 2),
- FlopsHistoryFragment(0, 0, duration * 1000, 0.0, 2),
- FlopsHistoryFragment(0, 183L * duration, duration * 1000, 183.0, 2)
- ), 2, 0)
- val vmImageB = VmImage(UUID.randomUUID(), "<unnamed>", emptyMap(), sequenceOf(
- FlopsHistoryFragment(0, 28L * duration, duration * 1000, 28.0, 2),
- FlopsHistoryFragment(0, 3100L * duration, duration * 1000, 3100.0, 2),
- FlopsHistoryFragment(0, 0, duration * 1000, 0.0, 2),
- FlopsHistoryFragment(0, 73L * duration, duration * 1000, 73.0, 2)
- ), 2, 0)
+ val vmImageA = VmImage(
+ UUID.randomUUID(),
+ "<unnamed>",
+ emptyMap(),
+ sequenceOf(
+ FlopsHistoryFragment(0, 28L * duration, duration * 1000, 28.0, 2),
+ FlopsHistoryFragment(0, 3500L * duration, duration * 1000, 3500.0, 2),
+ FlopsHistoryFragment(0, 0, duration * 1000, 0.0, 2),
+ FlopsHistoryFragment(0, 183L * duration, duration * 1000, 183.0, 2)
+ ),
+ 2,
+ 0
+ )
+ val vmImageB = VmImage(
+ UUID.randomUUID(),
+ "<unnamed>",
+ emptyMap(),
+ sequenceOf(
+ FlopsHistoryFragment(0, 28L * duration, duration * 1000, 28.0, 2),
+ FlopsHistoryFragment(0, 3100L * duration, duration * 1000, 3100.0, 2),
+ FlopsHistoryFragment(0, 0, duration * 1000, 0.0, 2),
+ FlopsHistoryFragment(0, 73L * duration, duration * 1000, 73.0, 2)
+ ),
+ 2,
+ 0
+ )
val driverDom = root.newDomain("driver")
val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2)
val cpus = List(2) { ProcessingUnit(cpuNode, it, 3200.0) }
- val metalDriver = SimpleBareMetalDriver(driverDom, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
+ val metalDriver = SimpleBareMetalDriver(driverDom, clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
metalDriver.init()
metalDriver.setImage(vmm)
diff --git a/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt b/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt
index f77a581e..87d6b7bd 100644
--- a/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt
+++ b/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt
@@ -24,23 +24,20 @@
package com.atlarge.opendc.core.failure
-import com.atlarge.odcsim.Domain
-import com.atlarge.odcsim.simulationContext
+import kotlinx.coroutines.*
+import java.time.Clock
import kotlin.math.exp
import kotlin.math.max
import kotlin.random.Random
import kotlin.random.asJavaRandom
-import kotlinx.coroutines.Job
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.ensureActive
-import kotlinx.coroutines.launch
/**
* A [FaultInjector] that injects fault in the system which are correlated to each other. Failures do not occur in
* isolation, but will trigger other faults.
*/
public class CorrelatedFaultInjector(
- private val domain: Domain,
+ private val coroutineScope: CoroutineScope,
+ private val clock: Clock,
private val iatScale: Double,
private val iatShape: Double,
private val sizeScale: Double,
@@ -72,7 +69,7 @@ public class CorrelatedFaultInjector(
// Clean up the domain if it finishes
domain.scope.coroutineContext[Job]!!.invokeOnCompletion {
- this@CorrelatedFaultInjector.domain.launch {
+ this@CorrelatedFaultInjector.coroutineScope.launch {
active -= domain
if (active.isEmpty()) {
@@ -86,7 +83,7 @@ public class CorrelatedFaultInjector(
return
}
- job = this.domain.launch {
+ job = this.coroutineScope.launch {
while (active.isNotEmpty()) {
ensureActive()
@@ -94,7 +91,7 @@ public class CorrelatedFaultInjector(
val d = lognvariate(iatScale, iatShape) * 3.6e6
// Handle long overflow
- if (simulationContext.clock.millis() + d <= 0) {
+ if (clock.millis() + d <= 0) {
return@launch
}
@@ -111,7 +108,7 @@ public class CorrelatedFaultInjector(
val df = max(lognvariate(dScale, dShape) * 6e4, 15 * 6e4)
// Handle long overflow
- if (simulationContext.clock.millis() + df <= 0) {
+ if (clock.millis() + df <= 0) {
return@launch
}
diff --git a/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt b/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt
index 0f62667f..1b896858 100644
--- a/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt
+++ b/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt
@@ -25,11 +25,11 @@
package com.atlarge.opendc.core.failure
import com.atlarge.odcsim.simulationContext
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
import kotlin.math.ln1p
import kotlin.math.pow
import kotlin.random.Random
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.launch
/**
* A [FaultInjector] that injects uncorrelated faults into the system, meaning that failures of the subsystems are
diff --git a/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt b/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt
index 7659b18e..c7577824 100644
--- a/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt
+++ b/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt
@@ -38,9 +38,6 @@ import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelec
import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy
import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy
import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy
-import java.io.File
-import java.util.ServiceLoader
-import kotlin.math.max
import kotlinx.coroutines.async
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
@@ -48,6 +45,9 @@ import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
+import java.io.File
+import java.util.ServiceLoader
+import kotlin.math.max
/**
* Main entry point of the experiment.
@@ -68,10 +68,11 @@ fun main(args: Array<String>) {
val schedulerDomain = system.newDomain(name = "scheduler")
val schedulerAsync = schedulerDomain.async {
val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/env/setup-test.json"))
- .use { it.construct(system.newDomain("topology")) }
+ .use { it.construct(schedulerDomain) }
StageWorkflowService(
schedulerDomain,
+ simulationContext.clock,
environment.platforms[0].zones[0].services[ProvisioningService],
mode = WorkflowSchedulerMode.Batch(100),
jobAdmissionPolicy = NullJobAdmissionPolicy,
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt
index ec721ff0..cd85351e 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt
@@ -41,9 +41,9 @@ import com.github.ajalt.clikt.parameters.options.required
import com.github.ajalt.clikt.parameters.types.choice
import com.github.ajalt.clikt.parameters.types.file
import com.github.ajalt.clikt.parameters.types.int
+import mu.KotlinLogging
import java.io.File
import java.io.InputStream
-import mu.KotlinLogging
/**
* The logger for this experiment.
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
index b09c0dbb..3765f307 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
@@ -25,6 +25,7 @@
package com.atlarge.opendc.experiments.sc20.experiment
import com.atlarge.odcsim.Domain
+import com.atlarge.odcsim.SimulationContext
import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.ServerEvent
@@ -45,10 +46,6 @@ import com.atlarge.opendc.experiments.sc20.experiment.monitor.ExperimentMonitor
import com.atlarge.opendc.experiments.sc20.trace.Sc20StreamingParquetTraceReader
import com.atlarge.opendc.format.environment.EnvironmentReader
import com.atlarge.opendc.format.trace.TraceReader
-import java.io.File
-import kotlin.math.ln
-import kotlin.math.max
-import kotlin.random.Random
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
@@ -59,6 +56,10 @@ import kotlinx.coroutines.flow.takeWhile
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import mu.KotlinLogging
+import java.io.File
+import kotlin.math.ln
+import kotlin.math.max
+import kotlin.random.Random
/**
* The logger for this experiment.
@@ -85,7 +86,7 @@ suspend fun createFailureDomain(
val injector =
injectors.getOrPut(cluster) {
createFaultInjector(
- simulationContext.domain,
+ simulationContext,
random,
failureInterval
)
@@ -99,11 +100,12 @@ suspend fun createFailureDomain(
/**
* Obtain the [FaultInjector] to use for the experiments.
*/
-fun createFaultInjector(domain: Domain, random: Random, failureInterval: Double): FaultInjector {
+fun createFaultInjector(simulationContext: SimulationContext, random: Random, failureInterval: Double): FaultInjector {
// Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009
// GRID'5000
return CorrelatedFaultInjector(
- domain,
+ simulationContext.domain,
+ simulationContext.clock,
iatScale = ln(failureInterval), iatShape = 1.03, // Hours
sizeScale = ln(2.0), sizeShape = ln(1.0), // Expect 2 machines, with variation of 1
dScale = ln(60.0), dShape = ln(60.0 * 8), // Minutes
@@ -137,7 +139,7 @@ suspend fun createProvisioner(
// Wait for the bare metal nodes to be spawned
delay(10)
- val scheduler = SimpleVirtProvisioningService(allocationPolicy, simulationContext, bareMetalProvisioner)
+ val scheduler = SimpleVirtProvisioningService(simulationContext.domain, simulationContext.clock, bareMetalProvisioner, allocationPolicy)
// Wait for the hypervisors to be spawned
delay(10)
@@ -219,7 +221,8 @@ suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtP
domain.launch {
chan.send(Unit)
val server = scheduler.deploy(
- workload.image.name, workload.image,
+ workload.image.name,
+ workload.image,
Flavor(workload.image.maxCores, workload.image.requiredMemory)
)
// Monitor server events
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt
index 1580e4dd..7b42b095 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt
@@ -38,13 +38,13 @@ import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionC
import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader
import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader
import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
-import java.io.File
-import java.util.ServiceLoader
-import kotlin.random.Random
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import mu.KotlinLogging
+import java.io.File
+import java.util.ServiceLoader
+import kotlin.random.Random
/**
* The logger for the experiment scenario.
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt
index b931fef9..a06317cb 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt
@@ -31,8 +31,8 @@ import com.atlarge.opendc.experiments.sc20.telemetry.HostEvent
import com.atlarge.opendc.experiments.sc20.telemetry.ProvisionerEvent
import com.atlarge.opendc.experiments.sc20.telemetry.parquet.ParquetHostEventWriter
import com.atlarge.opendc.experiments.sc20.telemetry.parquet.ParquetProvisionerEventWriter
-import java.io.File
import mu.KotlinLogging
+import java.io.File
/**
* The logger instance to use.
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt
index a8ee59a8..ddd64560 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt
@@ -25,12 +25,12 @@
package com.atlarge.opendc.experiments.sc20.runner.execution
import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor
-import java.util.concurrent.Executors
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.launch
import kotlinx.coroutines.supervisorScope
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.withContext
+import java.util.concurrent.Executors
/**
* An [ExperimentScheduler] that runs experiments using a local thread pool.
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt
index 28a19172..3b80276f 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt
@@ -30,8 +30,8 @@ import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionC
import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionListener
import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionResult
import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentScheduler
-import java.util.concurrent.ConcurrentHashMap
import kotlinx.coroutines.runBlocking
+import java.util.concurrent.ConcurrentHashMap
/**
* The default implementation of the [ExperimentRunner] interface.
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt
index afa21f93..0a310027 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt
@@ -25,17 +25,17 @@
package com.atlarge.opendc.experiments.sc20.telemetry.parquet
import com.atlarge.opendc.experiments.sc20.telemetry.Event
-import java.io.Closeable
-import java.io.File
-import java.util.concurrent.ArrayBlockingQueue
-import java.util.concurrent.BlockingQueue
-import kotlin.concurrent.thread
import mu.KotlinLogging
import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
+import java.io.Closeable
+import java.io.File
+import java.util.concurrent.ArrayBlockingQueue
+import java.util.concurrent.BlockingQueue
+import kotlin.concurrent.thread
/**
* The logging instance to use.
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt
index 9fa4e0fb..3bc09435 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt
@@ -25,10 +25,10 @@
package com.atlarge.opendc.experiments.sc20.telemetry.parquet
import com.atlarge.opendc.experiments.sc20.telemetry.HostEvent
-import java.io.File
import org.apache.avro.Schema
import org.apache.avro.SchemaBuilder
import org.apache.avro.generic.GenericData
+import java.io.File
/**
* A Parquet event writer for [HostEvent]s.
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt
index 3d28860c..1f3b0472 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt
@@ -25,10 +25,10 @@
package com.atlarge.opendc.experiments.sc20.telemetry.parquet
import com.atlarge.opendc.experiments.sc20.telemetry.ProvisionerEvent
-import java.io.File
import org.apache.avro.Schema
import org.apache.avro.SchemaBuilder
import org.apache.avro.generic.GenericData
+import java.io.File
/**
* A Parquet event writer for [ProvisionerEvent]s.
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt
index 043e4670..98afe3b8 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt
@@ -25,10 +25,10 @@
package com.atlarge.opendc.experiments.sc20.telemetry.parquet
import com.atlarge.opendc.experiments.sc20.telemetry.RunEvent
-import java.io.File
import org.apache.avro.Schema
import org.apache.avro.SchemaBuilder
import org.apache.avro.generic.GenericData
+import java.io.File
/**
* A Parquet event writer for [RunEvent]s.
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt
index f9709b9f..f1c1dc25 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt
@@ -30,12 +30,12 @@ import com.atlarge.opendc.compute.core.workload.VmWorkload
import com.atlarge.opendc.core.User
import com.atlarge.opendc.format.trace.TraceEntry
import com.atlarge.opendc.format.trace.TraceReader
-import java.io.File
-import java.util.UUID
import mu.KotlinLogging
import org.apache.avro.generic.GenericData
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetReader
+import java.io.File
+import java.util.UUID
private val logger = KotlinLogging.logger {}
@@ -113,7 +113,8 @@ class Sc20RawParquetTraceReader(private val path: File) {
val vmFragments = fragments.getValue(id).asSequence()
val totalLoad = vmFragments.sumByDouble { it.usage } * 5 * 60 // avg MHz * duration = MFLOPs
val vmWorkload = VmWorkload(
- uid, id,
+ uid,
+ id,
UnnamedUser,
VmImage(
uid,
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt
index 8b7b222f..9fa33c3f 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt
@@ -32,14 +32,6 @@ import com.atlarge.opendc.compute.core.workload.VmWorkload
import com.atlarge.opendc.core.User
import com.atlarge.opendc.format.trace.TraceEntry
import com.atlarge.opendc.format.trace.TraceReader
-import java.io.File
-import java.io.Serializable
-import java.util.SortedSet
-import java.util.TreeSet
-import java.util.UUID
-import java.util.concurrent.ArrayBlockingQueue
-import kotlin.concurrent.thread
-import kotlin.random.Random
import mu.KotlinLogging
import org.apache.avro.generic.GenericData
import org.apache.hadoop.fs.Path
@@ -49,6 +41,14 @@ import org.apache.parquet.filter2.predicate.FilterApi
import org.apache.parquet.filter2.predicate.Statistics
import org.apache.parquet.filter2.predicate.UserDefinedPredicate
import org.apache.parquet.io.api.Binary
+import java.io.File
+import java.io.Serializable
+import java.util.SortedSet
+import java.util.TreeSet
+import java.util.UUID
+import java.util.concurrent.ArrayBlockingQueue
+import kotlin.concurrent.thread
+import kotlin.random.Random
private val logger = KotlinLogging.logger {}
@@ -82,11 +82,14 @@ class Sc20StreamingParquetTraceReader(
if (selectedVms.isEmpty())
null
else
- FilterCompat.get(FilterApi.userDefined(FilterApi.binaryColumn("id"),
- SelectedVmFilter(
- TreeSet(selectedVms)
+ FilterCompat.get(
+ FilterApi.userDefined(
+ FilterApi.binaryColumn("id"),
+ SelectedVmFilter(
+ TreeSet(selectedVms)
+ )
)
- ))
+ )
/**
* A poisonous fragment.
@@ -235,7 +238,8 @@ class Sc20StreamingParquetTraceReader(
Random(random.nextInt())
)
val vmWorkload = VmWorkload(
- uid, "VM Workload $id",
+ uid,
+ "VM Workload $id",
UnnamedUser,
VmImage(
uid,
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt
index 56ddbb6d..a2ce3109 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt
@@ -37,12 +37,6 @@ import com.github.ajalt.clikt.parameters.options.required
import com.github.ajalt.clikt.parameters.options.split
import com.github.ajalt.clikt.parameters.types.file
import com.github.ajalt.clikt.parameters.types.long
-import java.io.BufferedReader
-import java.io.File
-import java.io.FileReader
-import java.util.Random
-import kotlin.math.max
-import kotlin.math.min
import me.tongfei.progressbar.ProgressBar
import org.apache.avro.Schema
import org.apache.avro.SchemaBuilder
@@ -51,6 +45,12 @@ import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
+import java.io.BufferedReader
+import java.io.File
+import java.io.FileReader
+import java.util.Random
+import kotlin.math.max
+import kotlin.math.min
/**
* Represents the command for converting traces
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt
index a46bb3e6..3a2ed4b7 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt
@@ -30,9 +30,9 @@ import com.atlarge.opendc.experiments.sc20.experiment.model.CompositeWorkload
import com.atlarge.opendc.experiments.sc20.experiment.model.SamplingStrategy
import com.atlarge.opendc.experiments.sc20.experiment.model.Workload
import com.atlarge.opendc.format.trace.TraceEntry
+import mu.KotlinLogging
import java.util.*
import kotlin.random.Random
-import mu.KotlinLogging
private val logger = KotlinLogging.logger {}
diff --git a/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt b/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
index a79e9a5a..5ecf7605 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
@@ -42,8 +42,6 @@ import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader
import com.atlarge.opendc.format.environment.EnvironmentReader
import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
import com.atlarge.opendc.format.trace.TraceReader
-import java.io.File
-import java.util.ServiceLoader
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
@@ -54,6 +52,8 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
+import java.io.File
+import java.util.ServiceLoader
/**
* An integration test suite for the SC20 experiments.
diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt
index 1c3f70e6..4c4dcf37 100644
--- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt
+++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt
@@ -24,8 +24,8 @@
package com.atlarge.opendc.format.environment
-import com.atlarge.odcsim.Domain
import com.atlarge.opendc.core.Environment
+import kotlinx.coroutines.CoroutineScope
import java.io.Closeable
/**
@@ -33,7 +33,7 @@ import java.io.Closeable
*/
interface EnvironmentReader : Closeable {
/**
- * Construct an [Environment] in the specified domain.
+ * Construct an [Environment] in the specified [CoroutineScope].
*/
- suspend fun construct(dom: Domain): Environment
+ suspend fun construct(coroutineScope: CoroutineScope): Environment
}
diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
index a9aa3337..2b608aef 100644
--- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
+++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
@@ -24,7 +24,7 @@
package com.atlarge.opendc.format.environment.sc18
-import com.atlarge.odcsim.Domain
+import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.MemoryUnit
import com.atlarge.opendc.compute.core.ProcessingNode
import com.atlarge.opendc.compute.core.ProcessingUnit
@@ -39,6 +39,7 @@ import com.atlarge.opendc.format.environment.EnvironmentReader
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
+import kotlinx.coroutines.CoroutineScope
import java.io.InputStream
import java.util.UUID
@@ -55,8 +56,8 @@ class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb
*/
private val setup: Setup = mapper.readValue(input)
- override suspend fun construct(dom: Domain): Environment {
- val provisioningDomain = dom.newDomain("provisioner")
+ override suspend fun construct(coroutineScope: CoroutineScope): Environment {
+ val clock = simulationContext.clock
var counter = 0
val nodes = setup.rooms.flatMap { room ->
@@ -78,7 +79,8 @@ class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb
}
}
SimpleBareMetalDriver(
- dom.newDomain("node-$counter"),
+ coroutineScope,
+ clock,
UUID.randomUUID(),
"node-${counter++}",
emptyMap(),
@@ -91,14 +93,16 @@ class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb
}
}
- val provisioningService = SimpleProvisioningService(provisioningDomain)
+ val provisioningService = SimpleProvisioningService()
for (node in nodes) {
provisioningService.create(node)
}
val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService)
val platform = Platform(
- UUID.randomUUID(), "sc18-platform", listOf(
+ UUID.randomUUID(),
+ "sc18-platform",
+ listOf(
Zone(UUID.randomUUID(), "zone", serviceRegistry)
)
)
diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt
index e34ee2dc..49118675 100644
--- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt
+++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt
@@ -24,7 +24,7 @@
package com.atlarge.opendc.format.environment.sc20
-import com.atlarge.odcsim.Domain
+import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.MemoryUnit
import com.atlarge.opendc.compute.core.ProcessingNode
import com.atlarge.opendc.compute.core.ProcessingUnit
@@ -38,6 +38,7 @@ import com.atlarge.opendc.core.Platform
import com.atlarge.opendc.core.Zone
import com.atlarge.opendc.core.services.ServiceRegistry
import com.atlarge.opendc.format.environment.EnvironmentReader
+import kotlinx.coroutines.CoroutineScope
import java.io.File
import java.io.FileInputStream
import java.io.InputStream
@@ -56,7 +57,9 @@ class Sc20ClusterEnvironmentReader(
constructor(file: File) : this(FileInputStream(file))
@Suppress("BlockingMethodInNonBlockingContext")
- override suspend fun construct(dom: Domain): Environment {
+ override suspend fun construct(coroutineScope: CoroutineScope): Environment {
+ val clock = simulationContext.clock
+
var clusterIdCol = 0
var speedCol = 0
var numberOfHostsCol = 0
@@ -105,7 +108,8 @@ class Sc20ClusterEnvironmentReader(
repeat(numberOfHosts) {
nodes.add(
SimpleBareMetalDriver(
- dom.newDomain("node-$clusterId-$it"),
+ coroutineScope,
+ clock,
UUID(random.nextLong(), random.nextLong()),
"node-$clusterId-$it",
mapOf(NODE_CLUSTER to clusterId),
@@ -123,7 +127,7 @@ class Sc20ClusterEnvironmentReader(
}
}
- val provisioningService = SimpleProvisioningService(dom.newDomain("provisioner"))
+ val provisioningService = SimpleProvisioningService()
for (node in nodes) {
provisioningService.create(node)
}
@@ -131,7 +135,9 @@ class Sc20ClusterEnvironmentReader(
val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService)
val platform = Platform(
- UUID.randomUUID(), "sc20-platform", listOf(
+ UUID.randomUUID(),
+ "sc20-platform",
+ listOf(
Zone(UUID.randomUUID(), "zone", serviceRegistry)
)
)
diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt
index 4b5d6fb7..f22f595f 100644
--- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt
+++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt
@@ -24,7 +24,7 @@
package com.atlarge.opendc.format.environment.sc20
-import com.atlarge.odcsim.Domain
+import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.MemoryUnit
import com.atlarge.opendc.compute.core.ProcessingNode
import com.atlarge.opendc.compute.core.ProcessingUnit
@@ -40,6 +40,7 @@ import com.atlarge.opendc.format.environment.EnvironmentReader
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
+import kotlinx.coroutines.CoroutineScope
import java.io.InputStream
import java.util.UUID
@@ -55,7 +56,8 @@ class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb
*/
private val setup: Setup = mapper.readValue(input)
- override suspend fun construct(dom: Domain): Environment {
+ override suspend fun construct(coroutineScope: CoroutineScope): Environment {
+ val clock = simulationContext.clock
var counter = 0
val nodes = setup.rooms.flatMap { room ->
room.objects.flatMap { roomObject ->
@@ -82,7 +84,8 @@ class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb
}
}
SimpleBareMetalDriver(
- dom.newDomain("node-$counter"),
+ coroutineScope,
+ clock,
UUID.randomUUID(),
"node-${counter++}",
emptyMap(),
@@ -99,7 +102,7 @@ class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb
}
}
- val provisioningService = SimpleProvisioningService(dom.newDomain("provisioner"))
+ val provisioningService = SimpleProvisioningService()
for (node in nodes) {
provisioningService.create(node)
}
@@ -107,7 +110,9 @@ class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb
val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService)
val platform = Platform(
- UUID.randomUUID(), "sc20-platform", listOf(
+ UUID.randomUUID(),
+ "sc20-platform",
+ listOf(
Zone(UUID.randomUUID(), "zone", serviceRegistry)
)
)
diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt
index 1cabc8bc..6ee43b6a 100644
--- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt
+++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt
@@ -129,7 +129,9 @@ class BitbrainsTraceReader(
)
val vmWorkload = VmWorkload(
- uuid, "VM Workload $vmId", UnnamedUser,
+ uuid,
+ "VM Workload $vmId",
+ UnnamedUser,
VmImage(
uuid,
vmId.toString(),
diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReader.kt
index 3a4e2e89..6db3975e 100644
--- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReader.kt
+++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReader.kt
@@ -120,7 +120,8 @@ class GwfTraceReader(reader: BufferedReader) : TraceReader<Job> {
}
val workflow = entry.workload
val task = Task(
- UUID(0L, taskId), "<unnamed>",
+ UUID(0L, taskId),
+ "<unnamed>",
FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), flops, cores),
HashSet(),
mapOf(WORKFLOW_TASK_DEADLINE to runtime)
@@ -136,9 +137,11 @@ class GwfTraceReader(reader: BufferedReader) : TraceReader<Job> {
// Fix dependencies and dependents for all tasks
taskDependencies.forEach { (task, dependencies) ->
- (task.dependencies as MutableSet<Task>).addAll(dependencies.map { taskId ->
- tasks[taskId] ?: throw IllegalArgumentException("Dependency task with id $taskId not found")
- })
+ (task.dependencies as MutableSet<Task>).addAll(
+ dependencies.map { taskId ->
+ tasks[taskId] ?: throw IllegalArgumentException("Dependency task with id $taskId not found")
+ }
+ )
}
// Create the entry iterator
diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt
index 8e34505a..28dc7793 100644
--- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt
+++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt
@@ -164,7 +164,9 @@ class Sc20TraceReader(
Random(random.nextInt())
)
val vmWorkload = VmWorkload(
- uuid, "VM Workload $vmId", UnnamedUser,
+ uuid,
+ "VM Workload $vmId",
+ UnnamedUser,
VmImage(
uuid,
vmId,
diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReader.kt
index 2f6ce238..f7c74562 100644
--- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReader.kt
+++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReader.kt
@@ -115,7 +115,11 @@ class SwfTraceReader(
for (tick in submitTime until (submitTime + waitTime - sliceDuration) step sliceDuration) {
flopsHistory.add(
FlopsHistoryFragment(
- tick * 1000L, 0L, sliceDuration * 1000L, 0.0, cores
+ tick * 1000L,
+ 0L,
+ sliceDuration * 1000L,
+ 0.0,
+ cores
)
)
slicedWaitTime += sliceDuration
@@ -129,12 +133,18 @@ class SwfTraceReader(
flopsPartialSlice = flopsPerSecond * runtimePartialSliceRemainder
flopsFullSlice = flopsPerSecond * runTime - flopsPartialSlice
- for (tick in (submitTime + slicedWaitTime)
- until (submitTime + slicedWaitTime + runTime - sliceDuration)
- step sliceDuration) {
+ for (
+ tick in (submitTime + slicedWaitTime)
+ until (submitTime + slicedWaitTime + runTime - sliceDuration)
+ step sliceDuration
+ ) {
flopsHistory.add(
FlopsHistoryFragment(
- tick * 1000L, flopsFullSlice / sliceDuration, sliceDuration * 1000L, 1.0, cores
+ tick * 1000L,
+ flopsFullSlice / sliceDuration,
+ sliceDuration * 1000L,
+ 1.0,
+ cores
)
)
}
@@ -153,7 +163,9 @@ class SwfTraceReader(
val uuid = UUID(0L, jobNumber)
val vmWorkload = VmWorkload(
- uuid, "SWF Workload $jobNumber", UnnamedUser,
+ uuid,
+ "SWF Workload $jobNumber",
+ UnnamedUser,
VmImage(
uuid,
jobNumber.toString(),
diff --git a/simulator/opendc/opendc-format/src/test/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReaderTest.kt b/simulator/opendc/opendc-format/src/test/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReaderTest.kt
index 94e4b0fc..41ad8aba 100644
--- a/simulator/opendc/opendc-format/src/test/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReaderTest.kt
+++ b/simulator/opendc/opendc-format/src/test/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReaderTest.kt
@@ -1,8 +1,8 @@
package com.atlarge.opendc.format.trace.swf
-import java.io.File
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
+import java.io.File
class SwfTraceReaderTest {
@Test
diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt
index 807c119e..9cfe5531 100644
--- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt
+++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt
@@ -22,13 +22,13 @@ import com.mongodb.client.MongoClients
import com.mongodb.client.MongoCollection
import com.mongodb.client.MongoDatabase
import com.mongodb.client.model.Filters
-import java.io.File
-import java.util.*
-import kotlin.random.Random
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import mu.KotlinLogging
import org.bson.Document
+import java.io.File
+import java.util.*
+import kotlin.random.Random
private val logger = KotlinLogging.logger {}
diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt
index 39092653..c0b0ac31 100644
--- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt
+++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt
@@ -1,11 +1,11 @@
package com.atlarge.opendc.runner.web
-import java.io.File
import org.apache.spark.sql.Column
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.*
+import java.io.File
/**
* A helper class for processing the experiment results using Apache Spark.
@@ -175,13 +175,19 @@ class ResultProcessor(private val master: String, private val outputPath: File)
val sliceLength = 5 * 60 * 1000
val states = map(
- lit("ERROR"), lit(1),
- lit("ACTIVE"), lit(0),
- lit("SHUTOFF"), lit(0)
+ lit("ERROR"),
+ lit(1),
+ lit("ACTIVE"),
+ lit(0),
+ lit("SHUTOFF"),
+ lit(0)
)
val oppositeStates = map(
- lit("ERROR"), lit(0),
- lit("ACTIVE"), lit(1),
- lit("SHUTOFF"), lit(1)
+ lit("ERROR"),
+ lit(0),
+ lit("ACTIVE"),
+ lit(1),
+ lit("SHUTOFF"),
+ lit(1)
)
}
diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt
index 40ffd282..6ec4995d 100644
--- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt
+++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt
@@ -3,8 +3,8 @@ package com.atlarge.opendc.runner.web
import com.mongodb.client.MongoCollection
import com.mongodb.client.model.Filters
import com.mongodb.client.model.Updates
-import java.time.Instant
import org.bson.Document
+import java.time.Instant
/**
* Manages the queue of scenarios that need to be processed.
diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt
index 499585ec..ab683985 100644
--- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt
+++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt
@@ -1,6 +1,6 @@
package com.atlarge.opendc.runner.web
-import com.atlarge.odcsim.Domain
+import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.MemoryUnit
import com.atlarge.opendc.compute.core.ProcessingNode
import com.atlarge.opendc.compute.core.ProcessingUnit
@@ -20,9 +20,10 @@ import com.mongodb.client.model.Aggregates
import com.mongodb.client.model.Field
import com.mongodb.client.model.Filters
import com.mongodb.client.model.Projections
-import java.util.*
+import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import org.bson.Document
+import java.util.*
/**
* A helper class that converts the MongoDB topology into an OpenDC environment.
@@ -31,7 +32,8 @@ class TopologyParser(private val collection: MongoCollection<Document>, private
/**
* Parse the topology with the specified [id].
*/
- override suspend fun construct(dom: Domain): Environment {
+ override suspend fun construct(coroutineScope: CoroutineScope): Environment {
+ val clock = simulationContext.clock
val nodes = mutableListOf<SimpleBareMetalDriver>()
val random = Random(0)
@@ -59,7 +61,8 @@ class TopologyParser(private val collection: MongoCollection<Document>, private
}
nodes.add(
SimpleBareMetalDriver(
- dom.newDomain(machineId),
+ coroutineScope,
+ clock,
UUID(random.nextLong(), random.nextLong()),
"node-$clusterId-$position",
mapOf(NODE_CLUSTER to clusterId),
@@ -73,8 +76,8 @@ class TopologyParser(private val collection: MongoCollection<Document>, private
)
}
- val provisioningService = SimpleProvisioningService(dom.newDomain("provisioner"))
- dom.launch {
+ val provisioningService = SimpleProvisioningService()
+ coroutineScope.launch {
for (node in nodes) {
provisioningService.create(node)
}
@@ -83,7 +86,9 @@ class TopologyParser(private val collection: MongoCollection<Document>, private
val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService)
val platform = Platform(
- UUID.randomUUID(), "opendc-platform", listOf(
+ UUID.randomUUID(),
+ "opendc-platform",
+ listOf(
Zone(UUID.randomUUID(), "zone", serviceRegistry)
)
)
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
index 1193f7b2..aea27972 100644
--- a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
@@ -24,9 +24,7 @@
package com.atlarge.opendc.workflows.service
-import com.atlarge.odcsim.Domain
import com.atlarge.odcsim.flow.EventFlow
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.ServerEvent
import com.atlarge.opendc.compute.core.ServerState
@@ -39,22 +37,23 @@ import com.atlarge.opendc.workflows.service.stage.resource.ResourceSelectionPoli
import com.atlarge.opendc.workflows.service.stage.task.TaskEligibilityPolicy
import com.atlarge.opendc.workflows.service.stage.task.TaskOrderPolicy
import com.atlarge.opendc.workflows.workload.Job
-import java.util.PriorityQueue
-import java.util.Queue
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
-import kotlinx.coroutines.withContext
+import java.time.Clock
+import java.util.PriorityQueue
+import java.util.Queue
/**
* A [WorkflowService] that distributes work through a multi-stage process based on the Reference Architecture for
* Topology Scheduling.
*/
class StageWorkflowService(
- private val domain: Domain,
+ internal val coroutineScope: CoroutineScope,
+ internal val clock: Clock,
private val provisioningService: ProvisioningService,
mode: WorkflowSchedulerMode,
jobAdmissionPolicy: JobAdmissionPolicy,
@@ -63,7 +62,7 @@ class StageWorkflowService(
taskOrderPolicy: TaskOrderPolicy,
resourceFilterPolicy: ResourceFilterPolicy,
resourceSelectionPolicy: ResourceSelectionPolicy
-) : WorkflowService, CoroutineScope by domain {
+) : WorkflowService {
/**
* The incoming jobs ready to be processed by the scheduler.
@@ -175,7 +174,7 @@ class StageWorkflowService(
private val eventFlow = EventFlow<WorkflowEvent>()
init {
- domain.launch {
+ coroutineScope.launch {
nodes = provisioningService.nodes().toList()
available.addAll(nodes)
}
@@ -191,9 +190,9 @@ class StageWorkflowService(
override val events: Flow<WorkflowEvent> = eventFlow
- override suspend fun submit(job: Job) = withContext(domain.coroutineContext) {
+ override suspend fun submit(job: Job) {
// J1 Incoming Jobs
- val jobInstance = JobState(job, simulationContext.clock.millis())
+ val jobInstance = JobState(job, clock.millis())
val instances = job.tasks.associateWith {
TaskState(jobInstance, it)
}
@@ -241,7 +240,7 @@ class StageWorkflowService(
iterator.remove()
jobQueue.add(jobInstance)
activeJobs += jobInstance
- eventFlow.emit(WorkflowEvent.JobStarted(this, jobInstance.job, simulationContext.clock.millis()))
+ eventFlow.emit(WorkflowEvent.JobStarted(this, jobInstance.job, clock.millis()))
rootListener.jobStarted(jobInstance)
}
@@ -295,7 +294,7 @@ class StageWorkflowService(
taskByServer[server] = instance
server.events
.onEach { event -> if (event is ServerEvent.StateChanged) stateChanged(event.server) }
- .launchIn(this)
+ .launchIn(coroutineScope)
activeTasks += instance
taskQueue.poll()
@@ -310,19 +309,19 @@ class StageWorkflowService(
when (server.state) {
ServerState.ACTIVE -> {
val task = taskByServer.getValue(server)
- task.startedAt = simulationContext.clock.millis()
- eventFlow.emit(WorkflowEvent.TaskStarted(this@StageWorkflowService, task.job.job, task.task, simulationContext.clock.millis()))
+ task.startedAt = clock.millis()
+ eventFlow.emit(WorkflowEvent.TaskStarted(this@StageWorkflowService, task.job.job, task.task, clock.millis()))
rootListener.taskStarted(task)
}
ServerState.SHUTOFF, ServerState.ERROR -> {
val task = taskByServer.remove(server) ?: throw IllegalStateException()
val job = task.job
task.state = TaskStatus.FINISHED
- task.finishedAt = simulationContext.clock.millis()
+ task.finishedAt = clock.millis()
job.tasks.remove(task)
available += task.host!!
activeTasks -= task
- eventFlow.emit(WorkflowEvent.TaskFinished(this@StageWorkflowService, task.job.job, task.task, simulationContext.clock.millis()))
+ eventFlow.emit(WorkflowEvent.TaskFinished(this@StageWorkflowService, task.job.job, task.task, clock.millis()))
rootListener.taskFinished(task)
// Add job roots to the scheduling queue
@@ -347,7 +346,7 @@ class StageWorkflowService(
private suspend fun finishJob(job: JobState) {
activeJobs -= job
- eventFlow.emit(WorkflowEvent.JobFinished(this, job.job, simulationContext.clock.millis()))
+ eventFlow.emit(WorkflowEvent.JobFinished(this, job.job, clock.millis()))
rootListener.jobFinished(job)
}
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt
index 776f0b07..cb075b18 100644
--- a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt
@@ -24,7 +24,6 @@
package com.atlarge.opendc.workflows.service
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.workflows.service.stage.StagePolicy
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
@@ -66,13 +65,12 @@ sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Logic> {
override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic {
override suspend fun requestCycle() {
- val ctx = simulationContext
if (next == null) {
// In batch mode, we assume that the scheduler runs at a fixed slot every time
// quantum (e.g t=0, t=60, t=120). We calculate here the delay until the next scheduling slot.
- val delay = quantum - (ctx.clock.millis() % quantum)
+ val delay = quantum - (scheduler.clock.millis() % quantum)
- val job = ctx.domain.launch {
+ val job = scheduler.coroutineScope.launch {
delay(delay)
next = null
scheduler.schedule()
@@ -93,11 +91,10 @@ sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Logic> {
override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic {
override suspend fun requestCycle() {
- val ctx = simulationContext
if (next == null) {
val delay = random.nextInt(200).toLong()
- val job = ctx.domain.launch {
+ val job = scheduler.coroutineScope.launch {
delay(delay)
next = null
scheduler.schedule()
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt
index a60ba0e2..ad818dde 100644
--- a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt
@@ -26,8 +26,8 @@ package com.atlarge.opendc.workflows.service
import com.atlarge.opendc.core.services.AbstractServiceKey
import com.atlarge.opendc.workflows.workload.Job
-import java.util.UUID
import kotlinx.coroutines.flow.Flow
+import java.util.UUID
/**
* A service for cloud workflow management.
diff --git a/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt b/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
index 5c129e37..655d8e1d 100644
--- a/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
+++ b/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
@@ -35,8 +35,6 @@ import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelec
import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy
import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy
import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy
-import java.util.ServiceLoader
-import kotlin.math.max
import kotlinx.coroutines.async
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
@@ -46,6 +44,8 @@ import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
+import java.util.ServiceLoader
+import kotlin.math.max
/**
* Integration test suite for the [StageWorkflowService].
@@ -68,11 +68,13 @@ internal class StageWorkflowSchedulerIntegrationTest {
val schedulerDomain = system.newDomain(name = "scheduler")
val schedulerAsync = schedulerDomain.async {
+ val clock = simulationContext.clock
val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json"))
- .use { it.construct(system.newDomain("topology")) }
+ .use { it.construct(schedulerDomain) }
StageWorkflowService(
schedulerDomain,
+ clock,
environment.platforms[0].zones[0].services[ProvisioningService],
mode = WorkflowSchedulerMode.Batch(100),
jobAdmissionPolicy = NullJobAdmissionPolicy,