summaryrefslogtreecommitdiff
path: root/opendc-compute
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-09-23 12:49:32 +0200
committerGitHub <noreply@github.com>2022-09-23 12:49:32 +0200
commit2d2a3854d355bd4b074ef651f291d34081e70d96 (patch)
treee99c4bf7e5647341c1e269797f7f46099753436f /opendc-compute
parent8d1d091f093e6ac32dba1e6a4f74490b280fcc4b (diff)
parent3d5eb562227dcad5a8a60f31b96e6d68f7774fb2 (diff)
merge: Do not require interference model during topology construction (#102)
This pull request refactors the existing workload interference model in order to remove a dependency on it during the topology construction. With this change, interference domains (e.g., a single host) can be constructed independently of the interference profiles of virtual machines. ## Implementation Notes :hammer_and_pick: * Move VM interference model into compute simulator * Remove convergence listener parameter * Remove FlowEngine from SimMachineContext * Remove timestamp parameter from SimTrace * Pass interference key via parameter * Move interference logic into VmInterferenceMember * Prevent boxing in interference algorithm * Extract Random dependency from interference model * Add separate error host state * Simplify constructor of SimHost * Make interference domain independent of profile ## External Dependencies :four_leaf_clover: * N/A ## Breaking API Changes :warning: * The interface of `VmInterferenceModel` is changed. Users do not need to provide a seed for the model anymore. * A `VmInterferenceModel` should be passed via the metadata parameter of `startWorkload` to enable workload interference.
Diffstat (limited to 'opendc-compute')
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt5
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostState.kt11
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt45
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt87
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt2
-rw-r--r--opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt22
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt70
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt8
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt29
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt3
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt12
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt6
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt6
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt3
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMetricReader.kt9
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/HostSpec.kt7
16 files changed, 158 insertions, 167 deletions
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
index c0b70268..28ef7c40 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
@@ -37,6 +37,11 @@ import kotlin.coroutines.CoroutineContext
*/
public interface ComputeService : AutoCloseable {
/**
+ * The servers that are registered with the "compute" service.
+ */
+ public val servers: List<Server>
+
+ /**
* The hosts that are registered with the "compute" service.
*/
public val hosts: Set<Host>
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostState.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostState.kt
index 6d85ee2d..ca6c625c 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostState.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostState.kt
@@ -27,12 +27,17 @@ package org.opendc.compute.service.driver
*/
public enum class HostState {
/**
- * The host is up.
+ * The host is up and able to host guests.
*/
UP,
/**
- * The host is down.
+ * The host is in a (forced) down state and unable to host any guests.
*/
- DOWN
+ DOWN,
+
+ /**
+ * The host is in an error state and unable to host any guests.
+ */
+ ERROR,
}
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
index 21aaa19e..519cf6c6 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
@@ -91,17 +91,18 @@ internal class ComputeServiceImpl(
/**
* The registered flavors for this compute service.
*/
- internal val flavors = mutableMapOf<UUID, InternalFlavor>()
+ internal val flavorById = mutableMapOf<UUID, InternalFlavor>()
/**
* The registered images for this compute service.
*/
- internal val images = mutableMapOf<UUID, InternalImage>()
+ internal val imageById = mutableMapOf<UUID, InternalImage>()
/**
* The registered servers for this compute service.
*/
- private val servers = mutableMapOf<UUID, InternalServer>()
+ private val serverById = mutableMapOf<UUID, InternalServer>()
+ override val servers: MutableList<Server> = mutableListOf()
private var maxCores = 0
private var maxMemory = 0L
@@ -127,13 +128,13 @@ internal class ComputeServiceImpl(
override suspend fun queryFlavors(): List<Flavor> {
check(!isClosed) { "Client is already closed" }
- return flavors.values.map { ClientFlavor(it) }
+ return flavorById.values.map { ClientFlavor(it) }
}
override suspend fun findFlavor(id: UUID): Flavor? {
check(!isClosed) { "Client is already closed" }
- return flavors[id]?.let { ClientFlavor(it) }
+ return flavorById[id]?.let { ClientFlavor(it) }
}
override suspend fun newFlavor(
@@ -156,7 +157,7 @@ internal class ComputeServiceImpl(
meta
)
- flavors[uid] = flavor
+ flavorById[uid] = flavor
return ClientFlavor(flavor)
}
@@ -164,13 +165,13 @@ internal class ComputeServiceImpl(
override suspend fun queryImages(): List<Image> {
check(!isClosed) { "Client is already closed" }
- return images.values.map { ClientImage(it) }
+ return imageById.values.map { ClientImage(it) }
}
override suspend fun findImage(id: UUID): Image? {
check(!isClosed) { "Client is already closed" }
- return images[id]?.let { ClientImage(it) }
+ return imageById[id]?.let { ClientImage(it) }
}
override suspend fun newImage(name: String, labels: Map<String, String>, meta: Map<String, Any>): Image {
@@ -179,7 +180,7 @@ internal class ComputeServiceImpl(
val uid = UUID(clock.millis(), random.nextLong())
val image = InternalImage(this@ComputeServiceImpl, uid, name, labels, meta)
- images[uid] = image
+ imageById[uid] = image
return ClientImage(image)
}
@@ -199,13 +200,14 @@ internal class ComputeServiceImpl(
this@ComputeServiceImpl,
uid,
name,
- requireNotNull(flavors[flavor.uid]) { "Unknown flavor" },
- requireNotNull(images[image.uid]) { "Unknown image" },
+ requireNotNull(flavorById[flavor.uid]) { "Unknown flavor" },
+ requireNotNull(imageById[image.uid]) { "Unknown image" },
labels.toMutableMap(),
meta.toMutableMap()
)
- servers[uid] = server
+ serverById[uid] = server
+ servers.add(server)
if (start) {
server.start()
@@ -217,13 +219,13 @@ internal class ComputeServiceImpl(
override suspend fun findServer(id: UUID): Server? {
check(!isClosed) { "Client is already closed" }
- return servers[id]?.let { ClientServer(it) }
+ return serverById[id]?.let { ClientServer(it) }
}
override suspend fun queryServers(): List<Server> {
check(!isClosed) { "Client is already closed" }
- return servers.values.map { ClientServer(it) }
+ return serverById.values.map { ClientServer(it) }
}
override fun close() {
@@ -263,7 +265,11 @@ internal class ComputeServiceImpl(
}
override fun lookupHost(server: Server): Host? {
- val internal = requireNotNull(servers[server.uid]) { "Invalid server passed to lookupHost" }
+ if (server is InternalServer) {
+ return server.host
+ }
+
+ val internal = requireNotNull(serverById[server.uid]) { "Invalid server passed to lookupHost" }
return internal.host
}
@@ -296,15 +302,16 @@ internal class ComputeServiceImpl(
}
internal fun delete(flavor: InternalFlavor) {
- flavors.remove(flavor.uid)
+ flavorById.remove(flavor.uid)
}
internal fun delete(image: InternalImage) {
- images.remove(image.uid)
+ imageById.remove(image.uid)
}
internal fun delete(server: InternalServer) {
- servers.remove(server.uid)
+ serverById.remove(server.uid)
+ servers.remove(server)
}
/**
@@ -411,7 +418,7 @@ internal class ComputeServiceImpl(
// Re-schedule on the new machine
requestSchedulingCycle()
}
- HostState.DOWN -> {
+ else -> {
logger.debug { "[${clock.instant()}] Host ${host.uid} state changed: $newState" }
val hv = hostToView[host] ?: return
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index c28239b4..c04573b5 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -35,17 +35,10 @@ import org.opendc.compute.simulator.internal.Guest
import org.opendc.compute.simulator.internal.GuestListener
import org.opendc.simulator.compute.*
import org.opendc.simulator.compute.kernel.SimHypervisor
-import org.opendc.simulator.compute.kernel.SimHypervisorProvider
-import org.opendc.simulator.compute.kernel.cpufreq.PerformanceScalingGovernor
-import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor
-import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.MemoryUnit
-import org.opendc.simulator.compute.power.ConstantPowerModel
-import org.opendc.simulator.compute.power.PowerDriver
-import org.opendc.simulator.compute.power.SimplePowerDriver
import org.opendc.simulator.compute.workload.SimWorkload
-import org.opendc.simulator.flow.FlowEngine
+import java.time.Clock
import java.time.Duration
import java.time.Instant
import java.util.*
@@ -57,44 +50,20 @@ import kotlin.coroutines.CoroutineContext
public class SimHost(
override val uid: UUID,
override val name: String,
- model: MachineModel,
override val meta: Map<String, Any>,
- context: CoroutineContext,
- engine: FlowEngine,
- hypervisorProvider: SimHypervisorProvider,
- scalingGovernor: ScalingGovernor = PerformanceScalingGovernor(),
- powerDriver: PowerDriver = SimplePowerDriver(ConstantPowerModel(0.0)),
+ private val context: CoroutineContext,
+ private val clock: Clock,
+ private val machine: SimBareMetalMachine,
+ private val hypervisor: SimHypervisor,
private val mapper: SimWorkloadMapper = SimMetaWorkloadMapper(),
- interferenceDomain: VmInterferenceDomain? = null,
private val optimize: Boolean = false
) : Host, AutoCloseable {
/**
- * The [CoroutineScope] of the host bounded by the lifecycle of the host.
- */
- private val scope: CoroutineScope = CoroutineScope(context + Job())
-
- /**
- * The clock instance used by the host.
- */
- private val clock = engine.clock
-
- /**
* The event listeners registered with this host.
*/
private val listeners = mutableListOf<HostListener>()
/**
- * The machine to run on.
- */
- public val machine: SimBareMetalMachine = SimBareMetalMachine(engine, model.optimize(), powerDriver)
-
- /**
- * The hypervisor to run multiple workloads.
- */
- private val hypervisor: SimHypervisor = hypervisorProvider
- .create(engine, scalingGovernor = scalingGovernor, interferenceDomain = interferenceDomain)
-
- /**
* The virtual machines running on the hypervisor.
*/
private val guests = HashMap<Server, Guest>()
@@ -113,7 +82,11 @@ public class SimHost(
field = value
}
- override val model: HostModel = HostModel(model.cpus.sumOf { it.frequency }, model.cpus.size, model.memory.sumOf { it.size })
+ override val model: HostModel = HostModel(
+ machine.model.cpus.sumOf { it.frequency },
+ machine.model.cpus.size,
+ machine.model.memory.sumOf { it.size }
+ )
/**
* The [GuestListener] that listens for guest events.
@@ -144,9 +117,9 @@ public class SimHost(
val guest = guests.computeIfAbsent(server) { key ->
require(canFit(key)) { "Server does not fit" }
- val machine = hypervisor.newMachine(key.flavor.toMachineModel(), key.name)
+ val machine = hypervisor.newMachine(key.flavor.toMachineModel())
val newGuest = Guest(
- scope.coroutineContext,
+ context,
clock,
this,
hypervisor,
@@ -193,8 +166,7 @@ public class SimHost(
}
override fun close() {
- reset()
- scope.cancel()
+ reset(HostState.DOWN)
machine.cancel()
}
@@ -269,7 +241,7 @@ public class SimHost(
override fun toString(): String = "SimHost[uid=$uid,name=$name,model=$model]"
public suspend fun fail() {
- reset()
+ reset(HostState.ERROR)
for (guest in _guests) {
guest.fail()
@@ -308,7 +280,7 @@ public class SimHost(
_state = HostState.UP
hypervisor.onStart(ctx)
} catch (cause: Throwable) {
- _state = HostState.DOWN
+ _state = HostState.ERROR
_ctx = null
throw cause
}
@@ -318,7 +290,6 @@ public class SimHost(
try {
hypervisor.onStop(ctx)
} finally {
- _state = HostState.DOWN
_ctx = null
}
}
@@ -328,12 +299,12 @@ public class SimHost(
/**
* Reset the machine.
*/
- private fun reset() {
+ private fun reset(state: HostState) {
updateUptime()
// Stop the hypervisor
_ctx?.close()
- _state = HostState.DOWN
+ _state = state
}
/**
@@ -346,26 +317,8 @@ public class SimHost(
val processingUnits = (0 until cpuCount).map { originalCpu.copy(id = it, node = processingNode, frequency = cpuCapacity) }
val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize))
- return MachineModel(processingUnits, memoryUnits).optimize()
- }
-
- /**
- * Optimize the [MachineModel] for simulation.
- */
- private fun MachineModel.optimize(): MachineModel {
- if (!optimize) {
- return this
- }
-
- val originalCpu = cpus[0]
- val freq = cpus.sumOf { it.frequency }
- val processingNode = originalCpu.node.copy(coreCount = 1)
- val processingUnits = listOf(originalCpu.copy(frequency = freq, node = processingNode))
-
- val memorySize = memory.sumOf { it.size }
- val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize))
-
- return MachineModel(processingUnits, memoryUnits)
+ val model = MachineModel(processingUnits, memoryUnits)
+ return if (optimize) model.optimize() else model
}
private var _lastReport = clock.millis()
@@ -384,7 +337,7 @@ public class SimHost(
if (_state == HostState.UP) {
_uptime += duration
- } else if (_state == HostState.DOWN && scope.isActive) {
+ } else if (_state == HostState.ERROR) {
// Only increment downtime if the machine is in a failure state
_downtime += duration
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
index ea3c6549..cc084526 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
@@ -217,7 +217,7 @@ internal class Guest(
*/
private suspend fun runMachine(workload: SimWorkload) {
delay(1) // TODO Introduce model for boot time
- machine.runWorkload(workload, mapOf("driver" to host, "server" to server))
+ machine.runWorkload(workload, mapOf("driver" to host, "server" to server) + server.meta)
}
/**
diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index 5ba4a667..a7993291 100644
--- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -30,16 +30,20 @@ import org.junit.jupiter.api.assertAll
import org.opendc.compute.api.*
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.driver.HostListener
-import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider
+import org.opendc.simulator.compute.SimBareMetalMachine
+import org.opendc.simulator.compute.kernel.SimHypervisor
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.power.ConstantPowerModel
+import org.opendc.simulator.compute.power.SimplePowerDriver
import org.opendc.simulator.compute.workload.SimTrace
import org.opendc.simulator.compute.workload.SimTraceFragment
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.flow.FlowEngine
+import org.opendc.simulator.flow.mux.FlowMultiplexerFactory
import java.time.Instant
import java.util.*
import kotlin.coroutines.resume
@@ -67,14 +71,16 @@ internal class SimHostTest {
fun testOvercommitted() = runBlockingSimulation {
val duration = 5 * 60L
val engine = FlowEngine(coroutineContext, clock)
+ val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)))
+ val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1), null)
val host = SimHost(
uid = UUID.randomUUID(),
name = "test",
- model = machineModel,
meta = emptyMap(),
coroutineContext,
- engine,
- SimFairShareHypervisorProvider()
+ clock,
+ machine,
+ hypervisor
)
val vmImageA = MockImage(
UUID.randomUUID(),
@@ -149,14 +155,16 @@ internal class SimHostTest {
fun testFailure() = runBlockingSimulation {
val duration = 5 * 60L
val engine = FlowEngine(coroutineContext, clock)
+ val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)))
+ val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1), null)
val host = SimHost(
uid = UUID.randomUUID(),
name = "test",
- model = machineModel,
meta = emptyMap(),
coroutineContext,
- engine,
- SimFairShareHypervisorProvider()
+ clock,
+ machine,
+ hypervisor
)
val image = MockImage(
UUID.randomUUID(),
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt
index fddb4890..f6744123 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt
@@ -26,12 +26,12 @@ import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.yield
-import org.opendc.compute.api.Server
import org.opendc.compute.service.ComputeService
import org.opendc.compute.service.scheduler.ComputeScheduler
import org.opendc.compute.simulator.SimHost
import org.opendc.compute.workload.topology.HostSpec
-import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
+import org.opendc.simulator.compute.SimBareMetalMachine
+import org.opendc.simulator.compute.kernel.SimHypervisor
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.flow.FlowEngine
import java.time.Clock
@@ -46,55 +46,52 @@ import kotlin.math.max
* @param context [CoroutineContext] to run the simulation in.
* @param clock [Clock] instance tracking simulation time.
* @param scheduler [ComputeScheduler] implementation to use for the service.
- * @param failureModel A failure model to use for injecting failures.
- * @param interferenceModel The model to use for performance interference.
* @param schedulingQuantum The scheduling quantum of the scheduler.
*/
public class ComputeServiceHelper(
private val context: CoroutineContext,
private val clock: Clock,
scheduler: ComputeScheduler,
- private val failureModel: FailureModel? = null,
- private val interferenceModel: VmInterferenceModel? = null,
+ seed: Long,
schedulingQuantum: Duration = Duration.ofMinutes(5)
) : AutoCloseable {
/**
* The [ComputeService] that has been configured by the manager.
*/
- public val service: ComputeService
+ public val service: ComputeService = ComputeService(context, clock, scheduler, schedulingQuantum)
/**
* The [FlowEngine] to simulate the hosts.
*/
- private val _engine = FlowEngine(context, clock)
+ private val engine = FlowEngine(context, clock)
/**
* The hosts that belong to this class.
*/
- private val _hosts = mutableSetOf<SimHost>()
+ private val hosts = mutableSetOf<SimHost>()
- init {
- val service = createService(scheduler, schedulingQuantum)
- this.service = service
- }
+ /**
+ * The source of randomness.
+ */
+ private val random = SplittableRandom(seed)
/**
* Run a simulation of the [ComputeService] by replaying the workload trace given by [trace].
*
* @param trace The trace to simulate.
- * @param seed The seed for the simulation.
- * @param servers A list to which the created servers is added.
* @param submitImmediately A flag to indicate that the servers are scheduled immediately (so not at their start time).
+ * @param failureModel A failure model to use for injecting failures.
+ * @param interference A flag to indicate that VM interference needs to be enabled.
*/
public suspend fun run(
trace: List<VirtualMachine>,
- seed: Long,
- servers: MutableList<Server>? = null,
- submitImmediately: Boolean = false
+ submitImmediately: Boolean = false,
+ failureModel: FailureModel? = null,
+ interference: Boolean = false,
) {
- val random = Random(seed)
- val injector = failureModel?.createInjector(context, clock, service, random)
+ val injector = failureModel?.createInjector(context, clock, service, Random(random.nextLong()))
val client = service.newClient()
+ val clock = clock
// Create new image for the virtual machine
val image = client.newImage("vm-image")
@@ -121,10 +118,16 @@ public class ComputeServiceHelper(
delay(max(0, (start - offset) - now))
}
- launch {
- val workloadOffset = -offset + 300001
- val workload = SimTraceWorkload(entry.trace, workloadOffset)
+ val workloadOffset = -offset + 300001
+ val workload = SimTraceWorkload(entry.trace, workloadOffset)
+ val meta = mutableMapOf<String, Any>("workload" to workload)
+ val interferenceProfile = entry.interferenceProfile
+ if (interference && interferenceProfile != null) {
+ meta["interference-profile"] = interferenceProfile
+ }
+
+ launch {
val server = client.newServer(
entry.name,
image,
@@ -134,11 +137,9 @@ public class ComputeServiceHelper(
entry.memCapacity,
meta = if (entry.cpuCapacity > 0.0) mapOf("cpu-capacity" to entry.cpuCapacity) else emptyMap()
),
- meta = mapOf("workload" to workload)
+ meta = meta
)
- servers?.add(server)
-
// Wait for the server reach its end time
val endTime = entry.stopTime.toEpochMilli()
delay(endTime + workloadOffset - clock.millis() + 5 * 60 * 1000)
@@ -164,20 +165,21 @@ public class ComputeServiceHelper(
* @return The [SimHost] that has been constructed by the runner.
*/
public fun registerHost(spec: HostSpec, optimize: Boolean = false): SimHost {
+ val machine = SimBareMetalMachine(engine, spec.model, spec.powerDriver)
+ val hypervisor = SimHypervisor(engine, spec.multiplexerFactory, random)
+
val host = SimHost(
spec.uid,
spec.name,
- spec.model,
spec.meta,
context,
- _engine,
- spec.hypervisor,
- powerDriver = spec.powerDriver,
- interferenceDomain = interferenceModel?.newDomain(),
+ clock,
+ machine,
+ hypervisor,
optimize = optimize
)
- require(_hosts.add(host)) { "Host with uid ${spec.uid} already exists" }
+ require(hosts.add(host)) { "Host with uid ${spec.uid} already exists" }
service.addHost(host)
return host
@@ -186,11 +188,11 @@ public class ComputeServiceHelper(
override fun close() {
service.close()
- for (host in _hosts) {
+ for (host in hosts) {
host.close()
}
- _hosts.clear()
+ hosts.clear()
}
/**
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt
index aa0b5eaf..78002c2f 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt
@@ -22,7 +22,6 @@
package org.opendc.compute.workload
-import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import java.util.*
/**
@@ -32,10 +31,5 @@ public interface ComputeWorkload {
/**
* Resolve the workload into a list of [VirtualMachine]s to simulate.
*/
- public fun resolve(loader: ComputeWorkloadLoader, random: Random): Resolved
-
- /**
- * A concrete instance of a workload.
- */
- public data class Resolved(val vms: List<VirtualMachine>, val interferenceModel: VmInterferenceModel?)
+ public fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine>
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
index 12c2325a..387a3ec2 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
@@ -48,7 +48,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
/**
* The cache of workloads.
*/
- private val cache = ConcurrentHashMap<String, SoftReference<ComputeWorkload.Resolved>>()
+ private val cache = ConcurrentHashMap<String, SoftReference<List<VirtualMachine>>>()
/**
* Read the fragments into memory.
@@ -87,7 +87,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
/**
* Read the metadata into a workload.
*/
- private fun parseMeta(trace: Trace, fragments: Map<String, Builder>): List<VirtualMachine> {
+ private fun parseMeta(trace: Trace, fragments: Map<String, Builder>, interferenceModel: VmInterferenceModel): List<VirtualMachine> {
val reader = checkNotNull(trace.getTable(TABLE_RESOURCES)).newReader()
val idCol = reader.resolve(RESOURCE_ID)
@@ -128,7 +128,8 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
totalLoad,
submissionTime,
endTime,
- builder.build()
+ builder.build(),
+ interferenceModel.getProfile(id)
)
)
}
@@ -159,7 +160,6 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
val modelBuilder = VmInterferenceModel.builder()
while (reader.nextRow()) {
- @Suppress("UNCHECKED_CAST")
val members = reader.getSet(membersCol, String::class.java)!!
val target = reader.getDouble(targetCol)
val score = reader.getDouble(scoreCol)
@@ -177,7 +177,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
/**
* Load the trace with the specified [name] and [format].
*/
- public fun get(name: String, format: String): ComputeWorkload.Resolved {
+ public fun get(name: String, format: String): List<VirtualMachine> {
val ref = cache.compute(name) { key, oldVal ->
val inst = oldVal?.get()
if (inst == null) {
@@ -188,11 +188,10 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
val trace = Trace.open(path, format)
val fragments = parseFragments(trace)
- val vms = parseMeta(trace, fragments)
val interferenceModel = parseInterferenceModel(trace)
- val instance = ComputeWorkload.Resolved(vms, interferenceModel)
+ val vms = parseMeta(trace, fragments, interferenceModel)
- SoftReference(instance)
+ SoftReference(vms)
} else {
oldVal
}
@@ -223,6 +222,11 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
private val builder = SimTrace.builder()
/**
+ * The deadline of the previous fragment.
+ */
+ private var previousDeadline = Long.MIN_VALUE
+
+ /**
* Add a fragment to the trace.
*
* @param timestamp Timestamp at which the fragment starts (in epoch millis).
@@ -233,7 +237,14 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
fun add(timestamp: Long, deadline: Long, usage: Double, cores: Int) {
val duration = max(0, deadline - timestamp)
totalLoad += (usage * duration) / 1000.0 // avg MHz * duration = MFLOPs
- builder.add(timestamp, deadline, usage, cores)
+
+ if (timestamp != previousDeadline) {
+ // There is a gap between the previous and current fragment; fill the gap
+ builder.add(timestamp, 0.0, cores)
+ }
+
+ builder.add(deadline, usage, cores)
+ previousDeadline = deadline
}
/**
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt
index 88e80719..8560b537 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt
@@ -22,6 +22,7 @@
package org.opendc.compute.workload
+import org.opendc.simulator.compute.kernel.interference.VmInterferenceProfile
import org.opendc.simulator.compute.workload.SimTrace
import java.time.Instant
import java.util.*
@@ -37,6 +38,7 @@ import java.util.*
* @param startTime The start time of the VM.
* @param stopTime The stop time of the VM.
* @param trace The trace that belong to this VM.
+ * @param interferenceProfile The interference profile of this virtual machine.
*/
public data class VirtualMachine(
val uid: UUID,
@@ -48,4 +50,5 @@ public data class VirtualMachine(
val startTime: Instant,
val stopTime: Instant,
val trace: SimTrace,
+ val interferenceProfile: VmInterferenceProfile?
)
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt
index 1959c48d..9b2bec55 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt
@@ -37,17 +37,17 @@ internal class CompositeComputeWorkload(val sources: Map<ComputeWorkload, Double
*/
private val logger = KotlinLogging.logger {}
- override fun resolve(loader: ComputeWorkloadLoader, random: Random): ComputeWorkload.Resolved {
+ override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> {
val traces = sources.map { (source, fraction) -> fraction to source.resolve(loader, random) }
- val totalLoad = traces.sumOf { (_, w) -> w.vms.sumOf { it.totalLoad } }
+ val totalLoad = traces.sumOf { (_, vms) -> vms.sumOf { it.totalLoad } }
val res = mutableListOf<VirtualMachine>()
- for ((fraction, w) in traces) {
+ for ((fraction, vms) in traces) {
var currentLoad = 0.0
- for (entry in w.vms) {
+ for (entry in vms) {
val entryLoad = entry.totalLoad
if ((currentLoad + entryLoad) / totalLoad > fraction) {
break
@@ -58,9 +58,9 @@ internal class CompositeComputeWorkload(val sources: Map<ComputeWorkload, Double
}
}
- val vmCount = traces.sumOf { (_, w) -> w.vms.size }
+ val vmCount = traces.sumOf { (_, vms) -> vms.size }
logger.info { "Sampled $vmCount VMs into subset of ${res.size} VMs" }
- return ComputeWorkload.Resolved(res, null)
+ return res
}
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt
index 84a77f0f..52f4c672 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt
@@ -45,8 +45,8 @@ internal class HpcSampledComputeWorkload(val source: ComputeWorkload, val fracti
*/
private val pattern = Regex("^(ComputeNode|cn).*")
- override fun resolve(loader: ComputeWorkloadLoader, random: Random): ComputeWorkload.Resolved {
- val (vms, interferenceModel) = source.resolve(loader, random)
+ override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> {
+ val vms = source.resolve(loader, random)
val (hpc, nonHpc) = vms.partition { entry ->
val name = entry.name
@@ -130,7 +130,7 @@ internal class HpcSampledComputeWorkload(val source: ComputeWorkload, val fracti
logger.debug { "Total sampled load: ${hpcLoad + nonHpcLoad}" }
logger.info { "Sampled ${vms.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }
- return ComputeWorkload.Resolved(res, interferenceModel)
+ return res
}
/**
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt
index bc13560c..ef6de729 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt
@@ -37,8 +37,8 @@ internal class LoadSampledComputeWorkload(val source: ComputeWorkload, val fract
*/
private val logger = KotlinLogging.logger {}
- override fun resolve(loader: ComputeWorkloadLoader, random: Random): ComputeWorkload.Resolved {
- val (vms, interferenceModel) = source.resolve(loader, random)
+ override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> {
+ val vms = source.resolve(loader, random)
val res = mutableListOf<VirtualMachine>()
val totalLoad = vms.sumOf { it.totalLoad }
@@ -56,6 +56,6 @@ internal class LoadSampledComputeWorkload(val source: ComputeWorkload, val fract
logger.info { "Sampled ${vms.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }
- return ComputeWorkload.Resolved(res, interferenceModel)
+ return res
}
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt
index dc9abaef..c20cb8f3 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt
@@ -24,13 +24,14 @@ package org.opendc.compute.workload.internal
import org.opendc.compute.workload.ComputeWorkload
import org.opendc.compute.workload.ComputeWorkloadLoader
+import org.opendc.compute.workload.VirtualMachine
import java.util.*
/**
* A [ComputeWorkload] from a trace.
*/
internal class TraceComputeWorkload(val name: String, val format: String) : ComputeWorkload {
- override fun resolve(loader: ComputeWorkloadLoader, random: Random): ComputeWorkload.Resolved {
+ override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> {
return loader.get(name, format)
}
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMetricReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMetricReader.kt
index 45bd9ab1..a0ec4bd6 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMetricReader.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMetricReader.kt
@@ -42,7 +42,6 @@ import java.time.Instant
* @param scope The [CoroutineScope] to run the reader in.
* @param clock The virtual clock.
* @param service The [ComputeService] to monitor.
- * @param servers The [Server]s to monitor.
* @param monitor The monitor to export the metrics to.
* @param exportInterval The export interval.
*/
@@ -50,7 +49,6 @@ public class ComputeMetricReader(
scope: CoroutineScope,
clock: Clock,
private val service: ComputeService,
- private val servers: List<Server>,
private val monitor: ComputeMonitor,
private val exportInterval: Duration = Duration.ofMinutes(5)
) : AutoCloseable {
@@ -76,6 +74,11 @@ public class ComputeMetricReader(
*/
private val job = scope.launch {
val intervalMs = exportInterval.toMillis()
+ val service = service
+ val monitor = monitor
+ val hostTableReaders = hostTableReaders
+ val serverTableReaders = serverTableReaders
+ val serviceTableReader = serviceTableReader
try {
while (isActive) {
@@ -91,7 +94,7 @@ public class ComputeMetricReader(
reader.reset()
}
- for (server in servers) {
+ for (server in service.servers) {
val reader = serverTableReaders.computeIfAbsent(server) { ServerTableReaderImpl(service, it) }
reader.record(now)
monitor.record(reader)
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/HostSpec.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/HostSpec.kt
index f3dc1e9e..87530f5a 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/HostSpec.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/HostSpec.kt
@@ -22,10 +22,9 @@
package org.opendc.compute.workload.topology
-import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider
-import org.opendc.simulator.compute.kernel.SimHypervisorProvider
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.power.PowerDriver
+import org.opendc.simulator.flow.mux.FlowMultiplexerFactory
import java.util.*
/**
@@ -36,7 +35,7 @@ import java.util.*
* @param meta The metadata of the host.
* @param model The physical model of the machine.
* @param powerDriver The [PowerDriver] to model the power consumption of the machine.
- * @param hypervisor The hypervisor implementation to use.
+ * @param multiplexerFactory The [FlowMultiplexerFactory] that is used to multiplex the virtual machines over the host.
*/
public data class HostSpec(
val uid: UUID,
@@ -44,5 +43,5 @@ public data class HostSpec(
val meta: Map<String, Any>,
val model: MachineModel,
val powerDriver: PowerDriver,
- val hypervisor: SimHypervisorProvider = SimFairShareHypervisorProvider()
+ val multiplexerFactory: FlowMultiplexerFactory = FlowMultiplexerFactory.maxMinMultiplexer()
)