diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-09-23 12:49:32 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2022-09-23 12:49:32 +0200 |
| commit | 2d2a3854d355bd4b074ef651f291d34081e70d96 (patch) | |
| tree | e99c4bf7e5647341c1e269797f7f46099753436f /opendc-compute | |
| parent | 8d1d091f093e6ac32dba1e6a4f74490b280fcc4b (diff) | |
| parent | 3d5eb562227dcad5a8a60f31b96e6d68f7774fb2 (diff) | |
merge: Do not require interference model during topology construction (#102)
This pull request refactors the existing workload interference model in order
to remove a dependency on it during the topology construction. With this change,
interference domains (e.g., a single host) can be constructed independently of the
interference profiles of virtual machines.
## Implementation Notes :hammer_and_pick:
* Move VM interference model into compute simulator
* Remove convergence listener parameter
* Remove FlowEngine from SimMachineContext
* Remove timestamp parameter from SimTrace
* Pass interference key via parameter
* Move interference logic into VmInterferenceMember
* Prevent boxing in interference algorithm
* Extract Random dependency from interference model
* Add separate error host state
* Simplify constructor of SimHost
* Make interference domain independent of profile
## External Dependencies :four_leaf_clover:
* N/A
## Breaking API Changes :warning:
* The interface of `VmInterferenceModel` is changed. Users do not need to
provide a seed for the model anymore.
* A `VmInterferenceModel` should be passed via the metadata parameter of
`startWorkload` to enable workload interference.
Diffstat (limited to 'opendc-compute')
16 files changed, 158 insertions, 167 deletions
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt index c0b70268..28ef7c40 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt @@ -37,6 +37,11 @@ import kotlin.coroutines.CoroutineContext */ public interface ComputeService : AutoCloseable { /** + * The servers that are registered with the "compute" service. + */ + public val servers: List<Server> + + /** * The hosts that are registered with the "compute" service. */ public val hosts: Set<Host> diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostState.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostState.kt index 6d85ee2d..ca6c625c 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostState.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostState.kt @@ -27,12 +27,17 @@ package org.opendc.compute.service.driver */ public enum class HostState { /** - * The host is up. + * The host is up and able to host guests. */ UP, /** - * The host is down. + * The host is in a (forced) down state and unable to host any guests. */ - DOWN + DOWN, + + /** + * The host is in an error state and unable to host any guests. + */ + ERROR, } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index 21aaa19e..519cf6c6 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -91,17 +91,18 @@ internal class ComputeServiceImpl( /** * The registered flavors for this compute service. */ - internal val flavors = mutableMapOf<UUID, InternalFlavor>() + internal val flavorById = mutableMapOf<UUID, InternalFlavor>() /** * The registered images for this compute service. */ - internal val images = mutableMapOf<UUID, InternalImage>() + internal val imageById = mutableMapOf<UUID, InternalImage>() /** * The registered servers for this compute service. */ - private val servers = mutableMapOf<UUID, InternalServer>() + private val serverById = mutableMapOf<UUID, InternalServer>() + override val servers: MutableList<Server> = mutableListOf() private var maxCores = 0 private var maxMemory = 0L @@ -127,13 +128,13 @@ internal class ComputeServiceImpl( override suspend fun queryFlavors(): List<Flavor> { check(!isClosed) { "Client is already closed" } - return flavors.values.map { ClientFlavor(it) } + return flavorById.values.map { ClientFlavor(it) } } override suspend fun findFlavor(id: UUID): Flavor? { check(!isClosed) { "Client is already closed" } - return flavors[id]?.let { ClientFlavor(it) } + return flavorById[id]?.let { ClientFlavor(it) } } override suspend fun newFlavor( @@ -156,7 +157,7 @@ internal class ComputeServiceImpl( meta ) - flavors[uid] = flavor + flavorById[uid] = flavor return ClientFlavor(flavor) } @@ -164,13 +165,13 @@ internal class ComputeServiceImpl( override suspend fun queryImages(): List<Image> { check(!isClosed) { "Client is already closed" } - return images.values.map { ClientImage(it) } + return imageById.values.map { ClientImage(it) } } override suspend fun findImage(id: UUID): Image? { check(!isClosed) { "Client is already closed" } - return images[id]?.let { ClientImage(it) } + return imageById[id]?.let { ClientImage(it) } } override suspend fun newImage(name: String, labels: Map<String, String>, meta: Map<String, Any>): Image { @@ -179,7 +180,7 @@ internal class ComputeServiceImpl( val uid = UUID(clock.millis(), random.nextLong()) val image = InternalImage(this@ComputeServiceImpl, uid, name, labels, meta) - images[uid] = image + imageById[uid] = image return ClientImage(image) } @@ -199,13 +200,14 @@ internal class ComputeServiceImpl( this@ComputeServiceImpl, uid, name, - requireNotNull(flavors[flavor.uid]) { "Unknown flavor" }, - requireNotNull(images[image.uid]) { "Unknown image" }, + requireNotNull(flavorById[flavor.uid]) { "Unknown flavor" }, + requireNotNull(imageById[image.uid]) { "Unknown image" }, labels.toMutableMap(), meta.toMutableMap() ) - servers[uid] = server + serverById[uid] = server + servers.add(server) if (start) { server.start() @@ -217,13 +219,13 @@ internal class ComputeServiceImpl( override suspend fun findServer(id: UUID): Server? { check(!isClosed) { "Client is already closed" } - return servers[id]?.let { ClientServer(it) } + return serverById[id]?.let { ClientServer(it) } } override suspend fun queryServers(): List<Server> { check(!isClosed) { "Client is already closed" } - return servers.values.map { ClientServer(it) } + return serverById.values.map { ClientServer(it) } } override fun close() { @@ -263,7 +265,11 @@ internal class ComputeServiceImpl( } override fun lookupHost(server: Server): Host? { - val internal = requireNotNull(servers[server.uid]) { "Invalid server passed to lookupHost" } + if (server is InternalServer) { + return server.host + } + + val internal = requireNotNull(serverById[server.uid]) { "Invalid server passed to lookupHost" } return internal.host } @@ -296,15 +302,16 @@ internal class ComputeServiceImpl( } internal fun delete(flavor: InternalFlavor) { - flavors.remove(flavor.uid) + flavorById.remove(flavor.uid) } internal fun delete(image: InternalImage) { - images.remove(image.uid) + imageById.remove(image.uid) } internal fun delete(server: InternalServer) { - servers.remove(server.uid) + serverById.remove(server.uid) + servers.remove(server) } /** @@ -411,7 +418,7 @@ internal class ComputeServiceImpl( // Re-schedule on the new machine requestSchedulingCycle() } - HostState.DOWN -> { + else -> { logger.debug { "[${clock.instant()}] Host ${host.uid} state changed: $newState" } val hv = hostToView[host] ?: return diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index c28239b4..c04573b5 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -35,17 +35,10 @@ import org.opendc.compute.simulator.internal.Guest import org.opendc.compute.simulator.internal.GuestListener import org.opendc.simulator.compute.* import org.opendc.simulator.compute.kernel.SimHypervisor -import org.opendc.simulator.compute.kernel.SimHypervisorProvider -import org.opendc.simulator.compute.kernel.cpufreq.PerformanceScalingGovernor -import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor -import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit -import org.opendc.simulator.compute.power.ConstantPowerModel -import org.opendc.simulator.compute.power.PowerDriver -import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.simulator.flow.FlowEngine +import java.time.Clock import java.time.Duration import java.time.Instant import java.util.* @@ -57,44 +50,20 @@ import kotlin.coroutines.CoroutineContext public class SimHost( override val uid: UUID, override val name: String, - model: MachineModel, override val meta: Map<String, Any>, - context: CoroutineContext, - engine: FlowEngine, - hypervisorProvider: SimHypervisorProvider, - scalingGovernor: ScalingGovernor = PerformanceScalingGovernor(), - powerDriver: PowerDriver = SimplePowerDriver(ConstantPowerModel(0.0)), + private val context: CoroutineContext, + private val clock: Clock, + private val machine: SimBareMetalMachine, + private val hypervisor: SimHypervisor, private val mapper: SimWorkloadMapper = SimMetaWorkloadMapper(), - interferenceDomain: VmInterferenceDomain? = null, private val optimize: Boolean = false ) : Host, AutoCloseable { /** - * The [CoroutineScope] of the host bounded by the lifecycle of the host. - */ - private val scope: CoroutineScope = CoroutineScope(context + Job()) - - /** - * The clock instance used by the host. - */ - private val clock = engine.clock - - /** * The event listeners registered with this host. */ private val listeners = mutableListOf<HostListener>() /** - * The machine to run on. - */ - public val machine: SimBareMetalMachine = SimBareMetalMachine(engine, model.optimize(), powerDriver) - - /** - * The hypervisor to run multiple workloads. - */ - private val hypervisor: SimHypervisor = hypervisorProvider - .create(engine, scalingGovernor = scalingGovernor, interferenceDomain = interferenceDomain) - - /** * The virtual machines running on the hypervisor. */ private val guests = HashMap<Server, Guest>() @@ -113,7 +82,11 @@ public class SimHost( field = value } - override val model: HostModel = HostModel(model.cpus.sumOf { it.frequency }, model.cpus.size, model.memory.sumOf { it.size }) + override val model: HostModel = HostModel( + machine.model.cpus.sumOf { it.frequency }, + machine.model.cpus.size, + machine.model.memory.sumOf { it.size } + ) /** * The [GuestListener] that listens for guest events. @@ -144,9 +117,9 @@ public class SimHost( val guest = guests.computeIfAbsent(server) { key -> require(canFit(key)) { "Server does not fit" } - val machine = hypervisor.newMachine(key.flavor.toMachineModel(), key.name) + val machine = hypervisor.newMachine(key.flavor.toMachineModel()) val newGuest = Guest( - scope.coroutineContext, + context, clock, this, hypervisor, @@ -193,8 +166,7 @@ public class SimHost( } override fun close() { - reset() - scope.cancel() + reset(HostState.DOWN) machine.cancel() } @@ -269,7 +241,7 @@ public class SimHost( override fun toString(): String = "SimHost[uid=$uid,name=$name,model=$model]" public suspend fun fail() { - reset() + reset(HostState.ERROR) for (guest in _guests) { guest.fail() @@ -308,7 +280,7 @@ public class SimHost( _state = HostState.UP hypervisor.onStart(ctx) } catch (cause: Throwable) { - _state = HostState.DOWN + _state = HostState.ERROR _ctx = null throw cause } @@ -318,7 +290,6 @@ public class SimHost( try { hypervisor.onStop(ctx) } finally { - _state = HostState.DOWN _ctx = null } } @@ -328,12 +299,12 @@ public class SimHost( /** * Reset the machine. */ - private fun reset() { + private fun reset(state: HostState) { updateUptime() // Stop the hypervisor _ctx?.close() - _state = HostState.DOWN + _state = state } /** @@ -346,26 +317,8 @@ public class SimHost( val processingUnits = (0 until cpuCount).map { originalCpu.copy(id = it, node = processingNode, frequency = cpuCapacity) } val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize)) - return MachineModel(processingUnits, memoryUnits).optimize() - } - - /** - * Optimize the [MachineModel] for simulation. - */ - private fun MachineModel.optimize(): MachineModel { - if (!optimize) { - return this - } - - val originalCpu = cpus[0] - val freq = cpus.sumOf { it.frequency } - val processingNode = originalCpu.node.copy(coreCount = 1) - val processingUnits = listOf(originalCpu.copy(frequency = freq, node = processingNode)) - - val memorySize = memory.sumOf { it.size } - val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize)) - - return MachineModel(processingUnits, memoryUnits) + val model = MachineModel(processingUnits, memoryUnits) + return if (optimize) model.optimize() else model } private var _lastReport = clock.millis() @@ -384,7 +337,7 @@ public class SimHost( if (_state == HostState.UP) { _uptime += duration - } else if (_state == HostState.DOWN && scope.isActive) { + } else if (_state == HostState.ERROR) { // Only increment downtime if the machine is in a failure state _downtime += duration } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt index ea3c6549..cc084526 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt @@ -217,7 +217,7 @@ internal class Guest( */ private suspend fun runMachine(workload: SimWorkload) { delay(1) // TODO Introduce model for boot time - machine.runWorkload(workload, mapOf("driver" to host, "server" to server)) + machine.runWorkload(workload, mapOf("driver" to host, "server" to server) + server.meta) } /** diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index 5ba4a667..a7993291 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -30,16 +30,20 @@ import org.junit.jupiter.api.assertAll import org.opendc.compute.api.* import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostListener -import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider +import org.opendc.simulator.compute.SimBareMetalMachine +import org.opendc.simulator.compute.kernel.SimHypervisor import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.power.ConstantPowerModel +import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.compute.workload.SimTrace import org.opendc.simulator.compute.workload.SimTraceFragment import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.flow.FlowEngine +import org.opendc.simulator.flow.mux.FlowMultiplexerFactory import java.time.Instant import java.util.* import kotlin.coroutines.resume @@ -67,14 +71,16 @@ internal class SimHostTest { fun testOvercommitted() = runBlockingSimulation { val duration = 5 * 60L val engine = FlowEngine(coroutineContext, clock) + val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) + val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1), null) val host = SimHost( uid = UUID.randomUUID(), name = "test", - model = machineModel, meta = emptyMap(), coroutineContext, - engine, - SimFairShareHypervisorProvider() + clock, + machine, + hypervisor ) val vmImageA = MockImage( UUID.randomUUID(), @@ -149,14 +155,16 @@ internal class SimHostTest { fun testFailure() = runBlockingSimulation { val duration = 5 * 60L val engine = FlowEngine(coroutineContext, clock) + val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) + val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1), null) val host = SimHost( uid = UUID.randomUUID(), name = "test", - model = machineModel, meta = emptyMap(), coroutineContext, - engine, - SimFairShareHypervisorProvider() + clock, + machine, + hypervisor ) val image = MockImage( UUID.randomUUID(), diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt index fddb4890..f6744123 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt @@ -26,12 +26,12 @@ import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.yield -import org.opendc.compute.api.Server import org.opendc.compute.service.ComputeService import org.opendc.compute.service.scheduler.ComputeScheduler import org.opendc.compute.simulator.SimHost import org.opendc.compute.workload.topology.HostSpec -import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel +import org.opendc.simulator.compute.SimBareMetalMachine +import org.opendc.simulator.compute.kernel.SimHypervisor import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.flow.FlowEngine import java.time.Clock @@ -46,55 +46,52 @@ import kotlin.math.max * @param context [CoroutineContext] to run the simulation in. * @param clock [Clock] instance tracking simulation time. * @param scheduler [ComputeScheduler] implementation to use for the service. - * @param failureModel A failure model to use for injecting failures. - * @param interferenceModel The model to use for performance interference. * @param schedulingQuantum The scheduling quantum of the scheduler. */ public class ComputeServiceHelper( private val context: CoroutineContext, private val clock: Clock, scheduler: ComputeScheduler, - private val failureModel: FailureModel? = null, - private val interferenceModel: VmInterferenceModel? = null, + seed: Long, schedulingQuantum: Duration = Duration.ofMinutes(5) ) : AutoCloseable { /** * The [ComputeService] that has been configured by the manager. */ - public val service: ComputeService + public val service: ComputeService = ComputeService(context, clock, scheduler, schedulingQuantum) /** * The [FlowEngine] to simulate the hosts. */ - private val _engine = FlowEngine(context, clock) + private val engine = FlowEngine(context, clock) /** * The hosts that belong to this class. */ - private val _hosts = mutableSetOf<SimHost>() + private val hosts = mutableSetOf<SimHost>() - init { - val service = createService(scheduler, schedulingQuantum) - this.service = service - } + /** + * The source of randomness. + */ + private val random = SplittableRandom(seed) /** * Run a simulation of the [ComputeService] by replaying the workload trace given by [trace]. * * @param trace The trace to simulate. - * @param seed The seed for the simulation. - * @param servers A list to which the created servers is added. * @param submitImmediately A flag to indicate that the servers are scheduled immediately (so not at their start time). + * @param failureModel A failure model to use for injecting failures. + * @param interference A flag to indicate that VM interference needs to be enabled. */ public suspend fun run( trace: List<VirtualMachine>, - seed: Long, - servers: MutableList<Server>? = null, - submitImmediately: Boolean = false + submitImmediately: Boolean = false, + failureModel: FailureModel? = null, + interference: Boolean = false, ) { - val random = Random(seed) - val injector = failureModel?.createInjector(context, clock, service, random) + val injector = failureModel?.createInjector(context, clock, service, Random(random.nextLong())) val client = service.newClient() + val clock = clock // Create new image for the virtual machine val image = client.newImage("vm-image") @@ -121,10 +118,16 @@ public class ComputeServiceHelper( delay(max(0, (start - offset) - now)) } - launch { - val workloadOffset = -offset + 300001 - val workload = SimTraceWorkload(entry.trace, workloadOffset) + val workloadOffset = -offset + 300001 + val workload = SimTraceWorkload(entry.trace, workloadOffset) + val meta = mutableMapOf<String, Any>("workload" to workload) + val interferenceProfile = entry.interferenceProfile + if (interference && interferenceProfile != null) { + meta["interference-profile"] = interferenceProfile + } + + launch { val server = client.newServer( entry.name, image, @@ -134,11 +137,9 @@ public class ComputeServiceHelper( entry.memCapacity, meta = if (entry.cpuCapacity > 0.0) mapOf("cpu-capacity" to entry.cpuCapacity) else emptyMap() ), - meta = mapOf("workload" to workload) + meta = meta ) - servers?.add(server) - // Wait for the server reach its end time val endTime = entry.stopTime.toEpochMilli() delay(endTime + workloadOffset - clock.millis() + 5 * 60 * 1000) @@ -164,20 +165,21 @@ public class ComputeServiceHelper( * @return The [SimHost] that has been constructed by the runner. */ public fun registerHost(spec: HostSpec, optimize: Boolean = false): SimHost { + val machine = SimBareMetalMachine(engine, spec.model, spec.powerDriver) + val hypervisor = SimHypervisor(engine, spec.multiplexerFactory, random) + val host = SimHost( spec.uid, spec.name, - spec.model, spec.meta, context, - _engine, - spec.hypervisor, - powerDriver = spec.powerDriver, - interferenceDomain = interferenceModel?.newDomain(), + clock, + machine, + hypervisor, optimize = optimize ) - require(_hosts.add(host)) { "Host with uid ${spec.uid} already exists" } + require(hosts.add(host)) { "Host with uid ${spec.uid} already exists" } service.addHost(host) return host @@ -186,11 +188,11 @@ public class ComputeServiceHelper( override fun close() { service.close() - for (host in _hosts) { + for (host in hosts) { host.close() } - _hosts.clear() + hosts.clear() } /** diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt index aa0b5eaf..78002c2f 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt @@ -22,7 +22,6 @@ package org.opendc.compute.workload -import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import java.util.* /** @@ -32,10 +31,5 @@ public interface ComputeWorkload { /** * Resolve the workload into a list of [VirtualMachine]s to simulate. */ - public fun resolve(loader: ComputeWorkloadLoader, random: Random): Resolved - - /** - * A concrete instance of a workload. - */ - public data class Resolved(val vms: List<VirtualMachine>, val interferenceModel: VmInterferenceModel?) + public fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt index 12c2325a..387a3ec2 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt @@ -48,7 +48,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) { /** * The cache of workloads. */ - private val cache = ConcurrentHashMap<String, SoftReference<ComputeWorkload.Resolved>>() + private val cache = ConcurrentHashMap<String, SoftReference<List<VirtualMachine>>>() /** * Read the fragments into memory. @@ -87,7 +87,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) { /** * Read the metadata into a workload. */ - private fun parseMeta(trace: Trace, fragments: Map<String, Builder>): List<VirtualMachine> { + private fun parseMeta(trace: Trace, fragments: Map<String, Builder>, interferenceModel: VmInterferenceModel): List<VirtualMachine> { val reader = checkNotNull(trace.getTable(TABLE_RESOURCES)).newReader() val idCol = reader.resolve(RESOURCE_ID) @@ -128,7 +128,8 @@ public class ComputeWorkloadLoader(private val baseDir: File) { totalLoad, submissionTime, endTime, - builder.build() + builder.build(), + interferenceModel.getProfile(id) ) ) } @@ -159,7 +160,6 @@ public class ComputeWorkloadLoader(private val baseDir: File) { val modelBuilder = VmInterferenceModel.builder() while (reader.nextRow()) { - @Suppress("UNCHECKED_CAST") val members = reader.getSet(membersCol, String::class.java)!! val target = reader.getDouble(targetCol) val score = reader.getDouble(scoreCol) @@ -177,7 +177,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) { /** * Load the trace with the specified [name] and [format]. */ - public fun get(name: String, format: String): ComputeWorkload.Resolved { + public fun get(name: String, format: String): List<VirtualMachine> { val ref = cache.compute(name) { key, oldVal -> val inst = oldVal?.get() if (inst == null) { @@ -188,11 +188,10 @@ public class ComputeWorkloadLoader(private val baseDir: File) { val trace = Trace.open(path, format) val fragments = parseFragments(trace) - val vms = parseMeta(trace, fragments) val interferenceModel = parseInterferenceModel(trace) - val instance = ComputeWorkload.Resolved(vms, interferenceModel) + val vms = parseMeta(trace, fragments, interferenceModel) - SoftReference(instance) + SoftReference(vms) } else { oldVal } @@ -223,6 +222,11 @@ public class ComputeWorkloadLoader(private val baseDir: File) { private val builder = SimTrace.builder() /** + * The deadline of the previous fragment. + */ + private var previousDeadline = Long.MIN_VALUE + + /** * Add a fragment to the trace. * * @param timestamp Timestamp at which the fragment starts (in epoch millis). @@ -233,7 +237,14 @@ public class ComputeWorkloadLoader(private val baseDir: File) { fun add(timestamp: Long, deadline: Long, usage: Double, cores: Int) { val duration = max(0, deadline - timestamp) totalLoad += (usage * duration) / 1000.0 // avg MHz * duration = MFLOPs - builder.add(timestamp, deadline, usage, cores) + + if (timestamp != previousDeadline) { + // There is a gap between the previous and current fragment; fill the gap + builder.add(timestamp, 0.0, cores) + } + + builder.add(deadline, usage, cores) + previousDeadline = deadline } /** diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt index 88e80719..8560b537 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt @@ -22,6 +22,7 @@ package org.opendc.compute.workload +import org.opendc.simulator.compute.kernel.interference.VmInterferenceProfile import org.opendc.simulator.compute.workload.SimTrace import java.time.Instant import java.util.* @@ -37,6 +38,7 @@ import java.util.* * @param startTime The start time of the VM. * @param stopTime The stop time of the VM. * @param trace The trace that belong to this VM. + * @param interferenceProfile The interference profile of this virtual machine. */ public data class VirtualMachine( val uid: UUID, @@ -48,4 +50,5 @@ public data class VirtualMachine( val startTime: Instant, val stopTime: Instant, val trace: SimTrace, + val interferenceProfile: VmInterferenceProfile? ) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt index 1959c48d..9b2bec55 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt @@ -37,17 +37,17 @@ internal class CompositeComputeWorkload(val sources: Map<ComputeWorkload, Double */ private val logger = KotlinLogging.logger {} - override fun resolve(loader: ComputeWorkloadLoader, random: Random): ComputeWorkload.Resolved { + override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> { val traces = sources.map { (source, fraction) -> fraction to source.resolve(loader, random) } - val totalLoad = traces.sumOf { (_, w) -> w.vms.sumOf { it.totalLoad } } + val totalLoad = traces.sumOf { (_, vms) -> vms.sumOf { it.totalLoad } } val res = mutableListOf<VirtualMachine>() - for ((fraction, w) in traces) { + for ((fraction, vms) in traces) { var currentLoad = 0.0 - for (entry in w.vms) { + for (entry in vms) { val entryLoad = entry.totalLoad if ((currentLoad + entryLoad) / totalLoad > fraction) { break @@ -58,9 +58,9 @@ internal class CompositeComputeWorkload(val sources: Map<ComputeWorkload, Double } } - val vmCount = traces.sumOf { (_, w) -> w.vms.size } + val vmCount = traces.sumOf { (_, vms) -> vms.size } logger.info { "Sampled $vmCount VMs into subset of ${res.size} VMs" } - return ComputeWorkload.Resolved(res, null) + return res } } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt index 84a77f0f..52f4c672 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt @@ -45,8 +45,8 @@ internal class HpcSampledComputeWorkload(val source: ComputeWorkload, val fracti */ private val pattern = Regex("^(ComputeNode|cn).*") - override fun resolve(loader: ComputeWorkloadLoader, random: Random): ComputeWorkload.Resolved { - val (vms, interferenceModel) = source.resolve(loader, random) + override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> { + val vms = source.resolve(loader, random) val (hpc, nonHpc) = vms.partition { entry -> val name = entry.name @@ -130,7 +130,7 @@ internal class HpcSampledComputeWorkload(val source: ComputeWorkload, val fracti logger.debug { "Total sampled load: ${hpcLoad + nonHpcLoad}" } logger.info { "Sampled ${vms.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } - return ComputeWorkload.Resolved(res, interferenceModel) + return res } /** diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt index bc13560c..ef6de729 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt @@ -37,8 +37,8 @@ internal class LoadSampledComputeWorkload(val source: ComputeWorkload, val fract */ private val logger = KotlinLogging.logger {} - override fun resolve(loader: ComputeWorkloadLoader, random: Random): ComputeWorkload.Resolved { - val (vms, interferenceModel) = source.resolve(loader, random) + override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> { + val vms = source.resolve(loader, random) val res = mutableListOf<VirtualMachine>() val totalLoad = vms.sumOf { it.totalLoad } @@ -56,6 +56,6 @@ internal class LoadSampledComputeWorkload(val source: ComputeWorkload, val fract logger.info { "Sampled ${vms.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } - return ComputeWorkload.Resolved(res, interferenceModel) + return res } } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt index dc9abaef..c20cb8f3 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt @@ -24,13 +24,14 @@ package org.opendc.compute.workload.internal import org.opendc.compute.workload.ComputeWorkload import org.opendc.compute.workload.ComputeWorkloadLoader +import org.opendc.compute.workload.VirtualMachine import java.util.* /** * A [ComputeWorkload] from a trace. */ internal class TraceComputeWorkload(val name: String, val format: String) : ComputeWorkload { - override fun resolve(loader: ComputeWorkloadLoader, random: Random): ComputeWorkload.Resolved { + override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> { return loader.get(name, format) } } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMetricReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMetricReader.kt index 45bd9ab1..a0ec4bd6 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMetricReader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMetricReader.kt @@ -42,7 +42,6 @@ import java.time.Instant * @param scope The [CoroutineScope] to run the reader in. * @param clock The virtual clock. * @param service The [ComputeService] to monitor. - * @param servers The [Server]s to monitor. * @param monitor The monitor to export the metrics to. * @param exportInterval The export interval. */ @@ -50,7 +49,6 @@ public class ComputeMetricReader( scope: CoroutineScope, clock: Clock, private val service: ComputeService, - private val servers: List<Server>, private val monitor: ComputeMonitor, private val exportInterval: Duration = Duration.ofMinutes(5) ) : AutoCloseable { @@ -76,6 +74,11 @@ public class ComputeMetricReader( */ private val job = scope.launch { val intervalMs = exportInterval.toMillis() + val service = service + val monitor = monitor + val hostTableReaders = hostTableReaders + val serverTableReaders = serverTableReaders + val serviceTableReader = serviceTableReader try { while (isActive) { @@ -91,7 +94,7 @@ public class ComputeMetricReader( reader.reset() } - for (server in servers) { + for (server in service.servers) { val reader = serverTableReaders.computeIfAbsent(server) { ServerTableReaderImpl(service, it) } reader.record(now) monitor.record(reader) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/HostSpec.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/HostSpec.kt index f3dc1e9e..87530f5a 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/HostSpec.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/HostSpec.kt @@ -22,10 +22,9 @@ package org.opendc.compute.workload.topology -import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider -import org.opendc.simulator.compute.kernel.SimHypervisorProvider import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.power.PowerDriver +import org.opendc.simulator.flow.mux.FlowMultiplexerFactory import java.util.* /** @@ -36,7 +35,7 @@ import java.util.* * @param meta The metadata of the host. * @param model The physical model of the machine. * @param powerDriver The [PowerDriver] to model the power consumption of the machine. - * @param hypervisor The hypervisor implementation to use. + * @param multiplexerFactory The [FlowMultiplexerFactory] that is used to multiplex the virtual machines over the host. */ public data class HostSpec( val uid: UUID, @@ -44,5 +43,5 @@ public data class HostSpec( val meta: Map<String, Any>, val model: MachineModel, val powerDriver: PowerDriver, - val hypervisor: SimHypervisorProvider = SimFairShareHypervisorProvider() + val multiplexerFactory: FlowMultiplexerFactory = FlowMultiplexerFactory.maxMinMultiplexer() ) |
