summaryrefslogtreecommitdiff
path: root/opendc-compute
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-10-03 18:15:09 +0200
committerGitHub <noreply@github.com>2021-10-03 18:15:09 +0200
commitb92d0e8703014f143ff0b1fe67de09fff6f867b1 (patch)
tree34238f56af20f0eb697f25ad5a700bab7fa4d6fb /opendc-compute
parent54bccf522e169d5cba6489291217f3307ae71094 (diff)
parent012fe8fa9be1676b8eef0cce795738a00c4260c0 (diff)
merge: Migrate to flow-based simulation for low-level models
This pull request converts the `opendc-simulator-resources` module into a flow simulator and adapts the existing low-level models (e.g., CPU, network, disk) to this new flow simulator. The flow simulator works differently from the uniform resource consumption model, in that it models flow through a system of connections, as opposed to resource consumptions. Concretely, this means that while in the uniform resource consumption model, consumptions with the same usage are propagated to the resources, in the flow simulator, only changes to the flow in the system are propagated. Overall, this leads to less updates in the system and therefore higher performance. The benchmarks shows that the new implementation obtains more than double the performance of the old implementation. We have focused in the new implementation on reducing the amount of work and memory allocations/loads/stores per updates. * Migrate from kotlinx-benchmark to jmh-gradle (for better profiling support) * Use longer traces for benchmarks (to prevent measuring the benchmark overhead) * Use direct field access for perf-sensitive code * Combine work and deadline to duration * Add support for pushing flow from context (to eliminate the allocation for every `SimResourceCommand`) * Reduce memory allocations in SimResourceInterpreter, by revamping the way timers are allocated. * Simplify max-min aggregator implementation (by utilizing the new push mechanism) * Invoke consumer callback on every invalidation (in order to propagate changes downstream) * Lazily push changes to resource context (by not updating the flow rate immediately after a push, but only after an update) * Remove onUpdate callback * Merge distributor and aggregator into switch * Separate push and pull flags * Remove failure callback from FlowSource * Create separate callbacks for remaining events * Make convergence callback optional * Reduce field accesses in FlowConsumerContextImpl * Optimize hot path in SimTraceWorkload * Expose CPU time counters directly on hypervisor * Optimize telemetry collection **Breaking API Changes** * The entire `opendc-simulator-resources` module has been replaced by the `opendc-simulator-flow` module. * `SimHypervisor.Listener` has been removed in favour of a new interface that exposes the performance counters of the hypervisor directly. To listen for convergence, use `FlowConvergenceListener`.
Diffstat (limited to 'opendc-compute')
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt244
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt150
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt2
-rw-r--r--opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt30
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt10
5 files changed, 220 insertions, 216 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index fdb3f1dc..b9d02185 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -47,11 +47,9 @@ import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.power.ConstantPowerModel
import org.opendc.simulator.compute.power.PowerDriver
import org.opendc.simulator.compute.power.SimplePowerDriver
-import org.opendc.simulator.resources.SimResourceDistributorMaxMin
-import org.opendc.simulator.resources.SimResourceInterpreter
+import org.opendc.simulator.flow.FlowEngine
import java.util.*
import kotlin.coroutines.CoroutineContext
-import kotlin.math.roundToLong
/**
* A [Host] that is simulates virtual machines on a physical machine using [SimHypervisor].
@@ -62,9 +60,9 @@ public class SimHost(
model: MachineModel,
override val meta: Map<String, Any>,
context: CoroutineContext,
- interpreter: SimResourceInterpreter,
+ engine: FlowEngine,
meterProvider: MeterProvider,
- hypervisor: SimHypervisorProvider,
+ hypervisorProvider: SimHypervisorProvider,
scalingGovernor: ScalingGovernor = PerformanceScalingGovernor(),
powerDriver: PowerDriver = SimplePowerDriver(ConstantPowerModel(0.0)),
private val mapper: SimWorkloadMapper = SimMetaWorkloadMapper(),
@@ -79,7 +77,7 @@ public class SimHost(
/**
* The clock instance used by the host.
*/
- private val clock = interpreter.clock
+ private val clock = engine.clock
/**
* The logger instance of this server.
@@ -99,39 +97,19 @@ public class SimHost(
/**
* The machine to run on.
*/
- public val machine: SimBareMetalMachine = SimBareMetalMachine(interpreter, model.optimize(), powerDriver)
+ public val machine: SimBareMetalMachine = SimBareMetalMachine(engine, model.optimize(), powerDriver)
/**
* The hypervisor to run multiple workloads.
*/
- private val hypervisor: SimHypervisor = hypervisor.create(
- interpreter,
- scalingGovernor = scalingGovernor,
- interferenceDomain = interferenceDomain,
- listener = object : SimHypervisor.Listener {
- override fun onSliceFinish(
- hypervisor: SimHypervisor,
- totalWork: Double,
- grantedWork: Double,
- overcommittedWork: Double,
- interferedWork: Double,
- cpuUsage: Double,
- cpuDemand: Double
- ) {
- _cpuDemand = cpuDemand
- _cpuUsage = cpuUsage
-
- collectTime()
- }
- }
- )
- private var _cpuUsage = 0.0
- private var _cpuDemand = 0.0
+ private val hypervisor: SimHypervisor = hypervisorProvider
+ .create(engine, scalingGovernor = scalingGovernor, interferenceDomain = interferenceDomain)
/**
* The virtual machines running on the hypervisor.
*/
private val guests = HashMap<Server, Guest>()
+ private val _guests = mutableListOf<Guest>()
override val state: HostState
get() = _state
@@ -158,22 +136,13 @@ public class SimHost(
}
}
+ /**
+ * The [Job] that represents the machine running the hypervisor.
+ */
+ private var _job: Job? = null
+
init {
- // Launch hypervisor onto machine
- scope.launch {
- try {
- _bootTime = clock.millis()
- _state = HostState.UP
- machine.run(this@SimHost.hypervisor, emptyMap())
- } catch (_: CancellationException) {
- // Ignored
- } catch (cause: Throwable) {
- logger.error(cause) { "Host failed" }
- throw cause
- } finally {
- _state = HostState.DOWN
- }
- }
+ launch()
meter.upDownCounterBuilder("system.guests")
.setDescription("Number of guests on this host")
@@ -185,15 +154,15 @@ public class SimHost(
meter.gaugeBuilder("system.cpu.demand")
.setDescription("Amount of CPU resources the guests would use if there were no CPU contention or CPU limits")
.setUnit("MHz")
- .buildWithCallback { result -> result.observe(_cpuDemand) }
+ .buildWithCallback { result -> result.observe(hypervisor.cpuDemand) }
meter.gaugeBuilder("system.cpu.usage")
.setDescription("Amount of CPU resources used by the host")
.setUnit("MHz")
- .buildWithCallback { result -> result.observe(_cpuUsage) }
+ .buildWithCallback { result -> result.observe(hypervisor.cpuUsage) }
meter.gaugeBuilder("system.cpu.utilization")
.setDescription("Utilization of the CPU resources of the host")
.setUnit("%")
- .buildWithCallback { result -> result.observe(_cpuUsage / _cpuLimit) }
+ .buildWithCallback { result -> result.observe(hypervisor.cpuUsage / _cpuLimit) }
meter.counterBuilder("system.cpu.time")
.setDescription("Amount of CPU time spent by the host")
.setUnit("s")
@@ -201,16 +170,16 @@ public class SimHost(
meter.gaugeBuilder("system.power.usage")
.setDescription("Power usage of the host ")
.setUnit("W")
- .buildWithCallback { result -> result.observe(machine.powerDraw) }
+ .buildWithCallback { result -> result.observe(machine.powerUsage) }
meter.counterBuilder("system.power.total")
.setDescription("Amount of energy used by the CPU")
.setUnit("J")
.ofDoubles()
- .buildWithCallback(::collectPowerTotal)
+ .buildWithCallback { result -> result.observe(machine.energyUsage) }
meter.counterBuilder("system.time")
.setDescription("The uptime of the host")
.setUnit("s")
- .buildWithCallback(::collectTime)
+ .buildWithCallback(::collectUptime)
meter.gaugeBuilder("system.time.boot")
.setDescription("The boot time of the host")
.setUnit("1")
@@ -231,7 +200,7 @@ public class SimHost(
require(canFit(key)) { "Server does not fit" }
val machine = hypervisor.createMachine(key.flavor.toMachineModel(), key.name)
- Guest(
+ val newGuest = Guest(
scope.coroutineContext,
clock,
this,
@@ -240,6 +209,9 @@ public class SimHost(
server,
machine
)
+
+ _guests.add(newGuest)
+ newGuest
}
if (start) {
@@ -263,7 +235,7 @@ public class SimHost(
override suspend fun delete(server: Server) {
val guest = guests[server] ?: return
- guest.terminate()
+ guest.delete()
}
override fun addListener(listener: HostListener) {
@@ -285,30 +257,61 @@ public class SimHost(
public suspend fun fail() {
reset()
- for (guest in guests.values) {
+ for (guest in _guests) {
guest.fail()
}
}
public suspend fun recover() {
- collectTime()
- _state = HostState.UP
- _bootTime = clock.millis()
+ updateUptime()
+
+ launch()
+
+ // Wait for the hypervisor to launch before recovering the guests
+ yield()
- for (guest in guests.values) {
+ for (guest in _guests) {
guest.recover()
}
}
/**
+ * Launch the hypervisor.
+ */
+ private fun launch() {
+ check(_job == null) { "Concurrent hypervisor running" }
+
+ // Launch hypervisor onto machine
+ _job = scope.launch {
+ try {
+ _bootTime = clock.millis()
+ _state = HostState.UP
+ machine.run(hypervisor, emptyMap())
+ } catch (_: CancellationException) {
+ // Ignored
+ } catch (cause: Throwable) {
+ logger.error(cause) { "Host failed" }
+ throw cause
+ } finally {
+ _state = HostState.DOWN
+ }
+ }
+ }
+
+ /**
* Reset the machine.
*/
private fun reset() {
- collectTime()
+ updateUptime()
+
+ // Stop the hypervisor
+ val job = _job
+ if (job != null) {
+ job.cancel()
+ _job = null
+ }
_state = HostState.DOWN
- _cpuUsage = 0.0
- _cpuDemand = 0.0
}
/**
@@ -358,11 +361,17 @@ public class SimHost(
var error = 0L
var invalid = 0L
- for ((_, guest) in guests) {
+ val guests = _guests.listIterator()
+ for (guest in guests) {
when (guest.state) {
ServerState.TERMINATED -> terminated++
ServerState.RUNNING -> running++
ServerState.ERROR -> error++
+ ServerState.DELETED -> {
+ // Remove guests that have been deleted
+ this.guests.remove(guest.server)
+ guests.remove()
+ }
else -> invalid++
}
}
@@ -381,24 +390,9 @@ public class SimHost(
private fun collectCpuLimit(result: ObservableDoubleMeasurement) {
result.observe(_cpuLimit)
- for (guest in guests.values) {
- guest.collectCpuLimit(result)
- }
- }
-
- private var _lastCpuTimeCallback = clock.millis()
-
- /**
- * Helper function to track the CPU time of a machine.
- */
- private fun collectCpuTime(result: ObservableLongMeasurement) {
- val now = clock.millis()
- val duration = now - _lastCpuTimeCallback
-
- try {
- collectCpuTime(duration, result)
- } finally {
- _lastCpuTimeCallback = now
+ val guests = _guests
+ for (i in guests.indices) {
+ guests[i].collectCpuLimit(result)
}
}
@@ -406,50 +400,22 @@ public class SimHost(
private val _stealState = Attributes.of(STATE_KEY, "steal")
private val _lostState = Attributes.of(STATE_KEY, "lost")
private val _idleState = Attributes.of(STATE_KEY, "idle")
- private var _totalTime = 0.0
/**
* Helper function to track the CPU time of a machine.
*/
- private fun collectCpuTime(duration: Long, result: ObservableLongMeasurement) {
- val coreCount = this.model.cpuCount
- val d = coreCount / _cpuLimit
-
+ private fun collectCpuTime(result: ObservableLongMeasurement) {
val counters = hypervisor.counters
- val grantedWork = counters.actual
- val overcommittedWork = counters.overcommit
- val interferedWork = (counters as? SimResourceDistributorMaxMin.Counters)?.interference ?: 0.0
-
- _totalTime += (duration / 1000.0) * coreCount
- val activeTime = (grantedWork * d).roundToLong()
- val idleTime = (_totalTime - grantedWork * d).roundToLong()
- val stealTime = (overcommittedWork * d).roundToLong()
- val lostTime = (interferedWork * d).roundToLong()
-
- result.observe(activeTime, _activeState)
- result.observe(idleTime, _idleState)
- result.observe(stealTime, _stealState)
- result.observe(lostTime, _lostState)
-
- for (guest in guests.values) {
- guest.collectCpuTime(duration, result)
- }
- }
- private var _lastPowerCallback = clock.millis()
- private var _totalPower = 0.0
+ result.observe(counters.cpuActiveTime / 1000L, _activeState)
+ result.observe(counters.cpuIdleTime / 1000L, _idleState)
+ result.observe(counters.cpuStealTime / 1000L, _stealState)
+ result.observe(counters.cpuLostTime / 1000L, _lostState)
- /**
- * Helper function to collect the total power usage of the machine.
- */
- private fun collectPowerTotal(result: ObservableDoubleMeasurement) {
- val now = clock.millis()
- val duration = now - _lastPowerCallback
-
- _totalPower += duration / 1000.0 * machine.powerDraw
- result.observe(_totalPower)
-
- _lastPowerCallback = now
+ val guests = _guests
+ for (i in guests.indices) {
+ guests[i].collectCpuTime(result)
+ }
}
private var _lastReport = clock.millis()
@@ -457,14 +423,21 @@ public class SimHost(
/**
* Helper function to track the uptime of a machine.
*/
- private fun collectTime(result: ObservableLongMeasurement? = null) {
+ private fun updateUptime() {
val now = clock.millis()
val duration = now - _lastReport
+ _lastReport = now
- try {
- collectTime(duration, result)
- } finally {
- _lastReport = now
+ if (_state == HostState.UP) {
+ _uptime += duration
+ } else if (_state == HostState.DOWN && scope.isActive) {
+ // Only increment downtime if the machine is in a failure state
+ _downtime += duration
+ }
+
+ val guests = _guests
+ for (i in guests.indices) {
+ guests[i].updateUptime(duration)
}
}
@@ -476,19 +449,15 @@ public class SimHost(
/**
* Helper function to track the uptime of a machine.
*/
- private fun collectTime(duration: Long, result: ObservableLongMeasurement? = null) {
- if (state == HostState.UP) {
- _uptime += duration
- } else if (state == HostState.DOWN && scope.isActive) {
- // Only increment downtime if the machine is in a failure state
- _downtime += duration
- }
+ private fun collectUptime(result: ObservableLongMeasurement) {
+ updateUptime()
- result?.observe(_uptime, _upState)
- result?.observe(_downtime, _downState)
+ result.observe(_uptime, _upState)
+ result.observe(_downtime, _downState)
- for (guest in guests.values) {
- guest.collectUptime(duration, result)
+ val guests = _guests
+ for (i in guests.indices) {
+ guests[i].collectUptime(result)
}
}
@@ -497,13 +466,14 @@ public class SimHost(
/**
* Helper function to track the boot time of a machine.
*/
- private fun collectBootTime(result: ObservableLongMeasurement? = null) {
+ private fun collectBootTime(result: ObservableLongMeasurement) {
if (_bootTime != Long.MIN_VALUE) {
- result?.observe(_bootTime)
+ result.observe(_bootTime)
}
- for (guest in guests.values) {
- guest.collectBootTime(result)
+ val guests = _guests
+ for (i in guests.indices) {
+ guests[i].collectBootTime(result)
}
}
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
index 7f33154a..5ea1860d 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
@@ -24,6 +24,7 @@ package org.opendc.compute.simulator.internal
import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.api.common.Attributes
+import io.opentelemetry.api.common.AttributesBuilder
import io.opentelemetry.api.metrics.ObservableDoubleMeasurement
import io.opentelemetry.api.metrics.ObservableLongMeasurement
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
@@ -33,12 +34,10 @@ import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
import org.opendc.compute.simulator.SimHost
import org.opendc.compute.simulator.SimWorkloadMapper
-import org.opendc.simulator.compute.SimAbstractMachine
-import org.opendc.simulator.compute.SimMachine
+import org.opendc.simulator.compute.kernel.SimVirtualMachine
import org.opendc.simulator.compute.workload.SimWorkload
import java.time.Clock
import kotlin.coroutines.CoroutineContext
-import kotlin.math.roundToLong
/**
* A virtual machine instance that is managed by a [SimHost].
@@ -50,7 +49,7 @@ internal class Guest(
private val mapper: SimWorkloadMapper,
private val listener: GuestListener,
val server: Server,
- val machine: SimMachine
+ val machine: SimVirtualMachine
) {
/**
* The [CoroutineScope] of the guest.
@@ -73,17 +72,7 @@ internal class Guest(
/**
* The attributes of the guest.
*/
- val attributes: Attributes = Attributes.builder()
- .put(ResourceAttributes.HOST_NAME, server.name)
- .put(ResourceAttributes.HOST_ID, server.uid.toString())
- .put(ResourceAttributes.HOST_TYPE, server.flavor.name)
- .put(AttributeKey.longKey("host.num_cpus"), server.flavor.cpuCount.toLong())
- .put(AttributeKey.longKey("host.mem_capacity"), server.flavor.memorySize)
- .put(AttributeKey.stringArrayKey("host.labels"), server.labels.map { (k, v) -> "$k:$v" })
- .put(ResourceAttributes.HOST_ARCH, ResourceAttributes.HostArchValues.AMD64)
- .put(ResourceAttributes.HOST_IMAGE_NAME, server.image.name)
- .put(ResourceAttributes.HOST_IMAGE_ID, server.image.uid.toString())
- .build()
+ val attributes: Attributes = GuestAttributes(this)
/**
* Start the guest.
@@ -116,12 +105,12 @@ internal class Guest(
}
/**
- * Terminate the guest.
+ * Delete the guest.
*
* This operation will stop the guest if it is running on the host and remove all resources associated with the
* guest.
*/
- suspend fun terminate() {
+ suspend fun delete() {
stop()
state = ServerState.DELETED
@@ -198,7 +187,7 @@ internal class Guest(
}
/**
- * Run the process that models the virtual machine lifecycle as a coroutine.
+ * Converge the process that models the virtual machine lifecycle as a coroutine.
*/
private suspend fun runMachine(workload: SimWorkload) {
delay(1) // TODO Introduce model for boot time
@@ -226,27 +215,30 @@ internal class Guest(
private var _uptime = 0L
private var _downtime = 0L
- private val _upState = Attributes.builder()
- .putAll(attributes)
+ private val _upState = attributes.toBuilder()
.put(STATE_KEY, "up")
.build()
- private val _downState = Attributes.builder()
- .putAll(attributes)
+ private val _downState = attributes.toBuilder()
.put(STATE_KEY, "down")
.build()
/**
- * Helper function to track the uptime of the guest.
+ * Helper function to track the uptime and downtime of the guest.
*/
- fun collectUptime(duration: Long, result: ObservableLongMeasurement? = null) {
+ fun updateUptime(duration: Long) {
if (state == ServerState.RUNNING) {
_uptime += duration
} else if (state == ServerState.ERROR) {
_downtime += duration
}
+ }
- result?.observe(_uptime, _upState)
- result?.observe(_downtime, _downState)
+ /**
+ * Helper function to track the uptime of the guest.
+ */
+ fun collectUptime(result: ObservableLongMeasurement) {
+ result.observe(_uptime, _upState)
+ result.observe(_downtime, _downState)
}
private var _bootTime = Long.MIN_VALUE
@@ -254,55 +246,35 @@ internal class Guest(
/**
* Helper function to track the boot time of the guest.
*/
- fun collectBootTime(result: ObservableLongMeasurement? = null) {
+ fun collectBootTime(result: ObservableLongMeasurement) {
if (_bootTime != Long.MIN_VALUE) {
- result?.observe(_bootTime)
+ result.observe(_bootTime)
}
}
- private val _activeState = Attributes.builder()
- .putAll(attributes)
+ private val _activeState = attributes.toBuilder()
.put(STATE_KEY, "active")
.build()
- private val _stealState = Attributes.builder()
- .putAll(attributes)
+ private val _stealState = attributes.toBuilder()
.put(STATE_KEY, "steal")
.build()
- private val _lostState = Attributes.builder()
- .putAll(attributes)
+ private val _lostState = attributes.toBuilder()
.put(STATE_KEY, "lost")
.build()
- private val _idleState = Attributes.builder()
- .putAll(attributes)
+ private val _idleState = attributes.toBuilder()
.put(STATE_KEY, "idle")
.build()
- private var _totalTime = 0.0
/**
* Helper function to track the CPU time of a machine.
*/
- fun collectCpuTime(duration: Long, result: ObservableLongMeasurement) {
- val coreCount = server.flavor.cpuCount
- val d = coreCount / _cpuLimit
-
- var grantedWork = 0.0
- var overcommittedWork = 0.0
+ fun collectCpuTime(result: ObservableLongMeasurement) {
+ val counters = machine.counters
- for (cpu in (machine as SimAbstractMachine).cpus) {
- val counters = cpu.counters
- grantedWork += counters.actual
- overcommittedWork += counters.overcommit
- }
-
- _totalTime += (duration / 1000.0) * coreCount
- val activeTime = (grantedWork * d).roundToLong()
- val idleTime = (_totalTime - grantedWork * d).roundToLong()
- val stealTime = (overcommittedWork * d).roundToLong()
-
- result.observe(activeTime, _activeState)
- result.observe(idleTime, _idleState)
- result.observe(stealTime, _stealState)
- result.observe(0, _lostState)
+ result.observe(counters.cpuActiveTime / 1000, _activeState)
+ result.observe(counters.cpuIdleTime / 1000, _idleState)
+ result.observe(counters.cpuStealTime / 1000, _stealState)
+ result.observe(counters.cpuLostTime / 1000, _lostState)
}
private val _cpuLimit = machine.model.cpus.sumOf { it.frequency }
@@ -313,4 +285,66 @@ internal class Guest(
fun collectCpuLimit(result: ObservableDoubleMeasurement) {
result.observe(_cpuLimit, attributes)
}
+
+ /**
+ * An optimized [Attributes] implementation.
+ */
+ private class GuestAttributes(private val uid: String, private val attributes: Attributes) : Attributes by attributes {
+ /**
+ * Construct a [GuestAttributes] instance from a [Guest].
+ */
+ constructor(guest: Guest) : this(
+ guest.server.uid.toString(),
+ Attributes.builder()
+ .put(ResourceAttributes.HOST_NAME, guest.server.name)
+ .put(ResourceAttributes.HOST_ID, guest.server.uid.toString())
+ .put(ResourceAttributes.HOST_TYPE, guest.server.flavor.name)
+ .put(AttributeKey.longKey("host.num_cpus"), guest.server.flavor.cpuCount.toLong())
+ .put(AttributeKey.longKey("host.mem_capacity"), guest.server.flavor.memorySize)
+ .put(AttributeKey.stringArrayKey("host.labels"), guest.server.labels.map { (k, v) -> "$k:$v" })
+ .put(ResourceAttributes.HOST_ARCH, ResourceAttributes.HostArchValues.AMD64)
+ .put(ResourceAttributes.HOST_IMAGE_NAME, guest.server.image.name)
+ .put(ResourceAttributes.HOST_IMAGE_ID, guest.server.image.uid.toString())
+ .build()
+ )
+
+ override fun <T : Any?> get(key: AttributeKey<T>): T? {
+ // Optimize access to the HOST_ID key which is accessed quite often
+ if (key == ResourceAttributes.HOST_ID) {
+ @Suppress("UNCHECKED_CAST")
+ return uid as T?
+ }
+ return attributes.get(key)
+ }
+
+ override fun toBuilder(): AttributesBuilder {
+ val delegate = attributes.toBuilder()
+ return object : AttributesBuilder {
+
+ override fun putAll(attributes: Attributes): AttributesBuilder {
+ delegate.putAll(attributes)
+ return this
+ }
+
+ override fun <T : Any?> put(key: AttributeKey<Long>, value: Int): AttributesBuilder {
+ delegate.put<T>(key, value)
+ return this
+ }
+
+ override fun <T : Any?> put(key: AttributeKey<T>, value: T): AttributesBuilder {
+ delegate.put(key, value)
+ return this
+ }
+
+ override fun build(): Attributes = GuestAttributes(uid, delegate.build())
+ }
+ }
+
+ override fun equals(other: Any?): Boolean = attributes == other
+
+ // Cache hash code
+ private val _hash = attributes.hashCode()
+
+ override fun hashCode(): Int = _hash
+ }
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt
index 6919b7fd..7d46e626 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt
@@ -75,7 +75,7 @@ internal class HostFaultInjectorImpl(
}
/**
- * Run the injection process.
+ * Converge the injection process.
*/
private suspend fun runInjector() {
while (true) {
diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index e75c31a0..26089b6d 100644
--- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -41,7 +41,7 @@ import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.simulator.resources.SimResourceInterpreter
+import org.opendc.simulator.flow.FlowEngine
import org.opendc.telemetry.compute.ComputeMetricExporter
import org.opendc.telemetry.compute.HOST_ID
import org.opendc.telemetry.compute.table.HostData
@@ -87,14 +87,14 @@ internal class SimHostTest {
.setClock(clock.toOtelClock())
.build()
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
+ val engine = FlowEngine(coroutineContext, clock)
val virtDriver = SimHost(
uid = hostId,
name = "test",
model = machineModel,
meta = emptyMap(),
coroutineContext,
- interpreter,
+ engine,
meterProvider,
SimFairShareHypervisorProvider()
)
@@ -170,9 +170,9 @@ internal class SimHostTest {
reader.close()
assertAll(
- { assertEquals(659, activeTime, "Active time does not match") },
- { assertEquals(2342, idleTime, "Idle time does not match") },
- { assertEquals(638, stealTime, "Steal time does not match") },
+ { assertEquals(658, activeTime, "Active time does not match") },
+ { assertEquals(1741, idleTime, "Idle time does not match") },
+ { assertEquals(637, stealTime, "Steal time does not match") },
{ assertEquals(1500001, clock.millis()) }
)
}
@@ -199,14 +199,14 @@ internal class SimHostTest {
.setClock(clock.toOtelClock())
.build()
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
+ val engine = FlowEngine(coroutineContext, clock)
val host = SimHost(
uid = hostId,
name = "test",
model = machineModel,
meta = emptyMap(),
coroutineContext,
- interpreter,
+ engine,
meterProvider,
SimFairShareHypervisorProvider()
)
@@ -253,7 +253,7 @@ internal class SimHostTest {
host.spawn(server)
delay(5000L)
host.fail()
- delay(5000L)
+ delay(duration * 1000)
host.recover()
suspendCancellableCoroutine<Unit> { cont ->
@@ -274,12 +274,12 @@ internal class SimHostTest {
reader.close()
assertAll(
- { assertEquals(2661, idleTime, "Idle time does not match") },
- { assertEquals(339, activeTime, "Active time does not match") },
- { assertEquals(1195001, uptime, "Uptime does not match") },
- { assertEquals(5000, downtime, "Downtime does not match") },
- { assertEquals(1195000, guestUptime, "Guest uptime does not match") },
- { assertEquals(5000, guestDowntime, "Guest downtime does not match") },
+ { assertEquals(1175, idleTime, "Idle time does not match") },
+ { assertEquals(624, activeTime, "Active time does not match") },
+ { assertEquals(900001, uptime, "Uptime does not match") },
+ { assertEquals(300000, downtime, "Downtime does not match") },
+ { assertEquals(900000, guestUptime, "Guest uptime does not match") },
+ { assertEquals(300000, guestDowntime, "Guest downtime does not match") },
)
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt
index ed45bd8a..283f82fe 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt
@@ -36,7 +36,7 @@ import org.opendc.compute.simulator.SimHost
import org.opendc.compute.workload.topology.HostSpec
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.compute.workload.SimTraceWorkload
-import org.opendc.simulator.resources.SimResourceInterpreter
+import org.opendc.simulator.flow.FlowEngine
import org.opendc.telemetry.compute.*
import org.opendc.telemetry.sdk.toOtelClock
import java.time.Clock
@@ -73,9 +73,9 @@ public class ComputeWorkloadRunner(
private val _metricProducers = mutableListOf<MetricProducer>()
/**
- * The [SimResourceInterpreter] to simulate the hosts.
+ * The [FlowEngine] to simulate the hosts.
*/
- private val interpreter = SimResourceInterpreter(context, clock)
+ private val engine = FlowEngine(context, clock)
/**
* The hosts that belong to this class.
@@ -89,7 +89,7 @@ public class ComputeWorkloadRunner(
}
/**
- * Run a simulation of the [ComputeService] by replaying the workload trace given by [trace].
+ * Converge a simulation of the [ComputeService] by replaying the workload trace given by [trace].
*/
public suspend fun run(trace: List<VirtualMachine>, seed: Long) {
val random = Random(seed)
@@ -178,7 +178,7 @@ public class ComputeWorkloadRunner(
spec.model,
spec.meta,
context,
- interpreter,
+ engine,
meterProvider,
spec.hypervisor,
powerDriver = spec.powerDriver,