diff options
58 files changed, 1738 insertions, 943 deletions
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt index fc092a3f..f3b94e3d 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt @@ -25,7 +25,12 @@ package org.opendc.compute.service.driver /** * Describes the static machine properties of the host. * + * @property cpuCapacity The total CPU capacity of the host in MHz. * @property cpuCount The number of logical processing cores available for this host. - * @property memorySize The amount of memory available for this host in MB. + * @property memoryCapacity The amount of memory available for this host in MB. */ -public data class HostModel(public val cpuCount: Int, public val memorySize: Long) +public data class HostModel( + public val cpuCapacity: Double, + public val cpuCount: Int, + public val memoryCapacity: Long +) diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index 57e70fcd..292feabe 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -26,6 +26,7 @@ import io.opentelemetry.api.common.AttributeKey import io.opentelemetry.api.common.Attributes import io.opentelemetry.api.metrics.Meter import io.opentelemetry.api.metrics.MeterProvider +import io.opentelemetry.api.metrics.ObservableLongMeasurement import kotlinx.coroutines.* import mu.KotlinLogging import org.opendc.compute.api.* @@ -173,6 +174,12 @@ internal class ComputeServiceImpl( result.observe(available, upState) result.observe(total - available, downState) } + + meter.gaugeBuilder("system.time.provision") + .setDescription("The most recent timestamp where the server entered a provisioned state") + .setUnit("1") + .ofLongs() + .buildWithCallback(::collectProvisionTime) } override fun newClient(): ComputeClient { @@ -298,7 +305,7 @@ internal class ComputeServiceImpl( val hv = HostView(host) maxCores = max(maxCores, host.model.cpuCount) - maxMemory = max(maxMemory, host.model.memorySize) + maxMemory = max(maxMemory, host.model.memoryCapacity) hostToView[host] = hv if (host.state == HostState.UP) { @@ -324,8 +331,10 @@ internal class ComputeServiceImpl( internal fun schedule(server: InternalServer): SchedulingRequest { logger.debug { "Enqueueing server ${server.uid} to be assigned to host." } + val now = clock.millis() + val request = SchedulingRequest(server, now) - val request = SchedulingRequest(server, clock.millis()) + server.lastProvisioningTimestamp = now queue.add(request) _serversPending.add(1) requestSchedulingCycle() @@ -501,4 +510,13 @@ internal class ComputeServiceImpl( requestSchedulingCycle() } } + + /** + * Collect the timestamp when each server entered its provisioning state most recently. + */ + private fun collectProvisionTime(result: ObservableLongMeasurement) { + for ((_, server) in servers) { + result.observe(server.lastProvisioningTimestamp, server.attributes) + } + } } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt index e2f33f11..0876209a 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt @@ -37,7 +37,7 @@ public class HostView(public val host: Host) { get() = host.uid public var instanceCount: Int = 0 - public var availableMemory: Long = host.model.memorySize + public var availableMemory: Long = host.model.memoryCapacity public var provisionedCores: Int = 0 override fun toString(): String = "HostView[host=$host]" diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt index 05a7e1bf..f1b92c66 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt @@ -55,7 +55,7 @@ internal class InternalServer( /** * The attributes of a server. */ - internal val attributes: Attributes = Attributes.builder() + @JvmField internal val attributes: Attributes = Attributes.builder() .put(ResourceAttributes.HOST_NAME, name) .put(ResourceAttributes.HOST_ID, uid.toString()) .put(ResourceAttributes.HOST_TYPE, flavor.name) @@ -70,7 +70,12 @@ internal class InternalServer( /** * The [Host] that has been assigned to host the server. */ - internal var host: Host? = null + @JvmField internal var host: Host? = null + + /** + * The most recent timestamp when the server entered a provisioning state. + */ + @JvmField internal var lastProvisioningTimestamp: Long = Long.MIN_VALUE /** * The current scheduling request. diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt index a470a453..8a7a646c 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt @@ -34,7 +34,7 @@ public class RamFilter(private val allocationRatio: Double) : HostFilter { override fun test(host: HostView, server: Server): Boolean { val requested = server.flavor.memorySize val available = host.availableMemory - val total = host.host.model.memorySize + val total = host.host.model.memoryCapacity // Do not allow an instance to overcommit against itself, only against // other instances. diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceGroup.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuCapacityFilter.kt index 708ddede..791710c8 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceGroup.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuCapacityFilter.kt @@ -20,25 +20,21 @@ * SOFTWARE. */ -package org.opendc.simulator.compute.kernel.interference +package org.opendc.compute.service.scheduler.filters + +import org.opendc.compute.api.Server +import org.opendc.compute.service.internal.HostView /** - * A group of virtual machines that together can interfere when operating on the same resources, causing performance - * variability. + * A [HostFilter] that filters hosts based on the vCPU speed requirements of a [Server] and the available + * capacity on the host. */ -public data class VmInterferenceGroup( - /** - * The minimum load of the host before the interference occurs. - */ - public val targetLoad: Double, - - /** - * A score in [0, 1] representing the performance variability as a result of resource interference. - */ - public val score: Double, +public class VCpuCapacityFilter : HostFilter { + override fun test(host: HostView, server: Server): Boolean { + val requiredCapacity = server.flavor.meta["cpu-capacity"] as? Double + val hostModel = host.host.model + val availableCapacity = hostModel.cpuCapacity / hostModel.cpuCount - /** - * The members of this interference group. - */ - public val members: Set<String> -) + return requiredCapacity == null || availableCapacity >= (requiredCapacity / server.flavor.cpuCount) + } +} diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuCapacityWeigher.kt index 8e787b97..a86226e2 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuCapacityWeigher.kt @@ -20,31 +20,21 @@ * SOFTWARE. */ -package org.opendc.telemetry.compute.table +package org.opendc.compute.service.scheduler.weights -import java.time.Instant +import org.opendc.compute.api.Server +import org.opendc.compute.service.internal.HostView /** - * A trace entry for a particular host. + * A [HostWeigher] that weighs the hosts based on the difference required vCPU capacity and the available CPU capacity. */ -public data class HostData( - val timestamp: Instant, - val host: HostInfo, - val guestsTerminated: Int, - val guestsRunning: Int, - val guestsError: Int, - val guestsInvalid: Int, - val cpuLimit: Double, - val cpuUsage: Double, - val cpuDemand: Double, - val cpuUtilization: Double, - val cpuActiveTime: Long, - val cpuIdleTime: Long, - val cpuStealTime: Long, - val cpuLostTime: Long, - val powerUsage: Double, - val powerTotal: Double, - val uptime: Long, - val downtime: Long, - val bootTime: Instant? -) +public class VCpuCapacityWeigher(override val multiplier: Double = 1.0) : HostWeigher { + + override fun getWeight(host: HostView, server: Server): Double { + val model = host.host.model + val requiredCapacity = server.flavor.meta["cpu-capacity"] as? Double ?: 0.0 + return model.cpuCapacity / model.cpuCount - requiredCapacity / server.flavor.cpuCount + } + + override fun toString(): String = "VCpuWeigher" +} diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt index 564f9493..7b8d0fe2 100644 --- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt +++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt @@ -125,7 +125,7 @@ internal class ComputeServiceTest { fun testAddHost() = scope.runBlockingSimulation { val host = mockk<Host>(relaxUnitFun = true) - every { host.model } returns HostModel(4, 2048) + every { host.model } returns HostModel(4 * 2600.0, 4, 2048) every { host.state } returns HostState.UP assertEquals(0, service.hostCount) @@ -147,7 +147,7 @@ internal class ComputeServiceTest { fun testAddHostDouble() = scope.runBlockingSimulation { val host = mockk<Host>(relaxUnitFun = true) - every { host.model } returns HostModel(4, 2048) + every { host.model } returns HostModel(4 * 2600.0, 4, 2048) every { host.state } returns HostState.DOWN assertEquals(0, service.hostCount) @@ -216,7 +216,7 @@ internal class ComputeServiceTest { fun testServerCannotFitOnHost() = scope.runBlockingSimulation { val host = mockk<Host>(relaxUnitFun = true) - every { host.model } returns HostModel(4, 2048) + every { host.model } returns HostModel(4 * 2600.0, 4, 2048) every { host.state } returns HostState.UP every { host.canFit(any()) } returns false @@ -241,7 +241,7 @@ internal class ComputeServiceTest { val listeners = mutableListOf<HostListener>() every { host.uid } returns UUID.randomUUID() - every { host.model } returns HostModel(4, 2048) + every { host.model } returns HostModel(4 * 2600.0, 4, 2048) every { host.state } returns HostState.DOWN every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) } every { host.canFit(any()) } returns false @@ -272,7 +272,7 @@ internal class ComputeServiceTest { val listeners = mutableListOf<HostListener>() every { host.uid } returns UUID.randomUUID() - every { host.model } returns HostModel(4, 2048) + every { host.model } returns HostModel(4 * 2600.0, 4, 2048) every { host.state } returns HostState.UP every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) } every { host.canFit(any()) } returns false @@ -303,7 +303,7 @@ internal class ComputeServiceTest { val listeners = mutableListOf<HostListener>() every { host.uid } returns UUID.randomUUID() - every { host.model } returns HostModel(4, 2048) + every { host.model } returns HostModel(4 * 2600.0, 4, 2048) every { host.state } returns HostState.UP every { host.canFit(any()) } returns true every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) } @@ -326,7 +326,7 @@ internal class ComputeServiceTest { val listeners = mutableListOf<HostListener>() every { host.uid } returns UUID.randomUUID() - every { host.model } returns HostModel(4, 2048) + every { host.model } returns HostModel(4 * 2600.0, 4, 2048) every { host.state } returns HostState.UP every { host.canFit(any()) } returns true every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) } @@ -369,7 +369,7 @@ internal class ComputeServiceTest { val listeners = mutableListOf<HostListener>() every { host.uid } returns UUID.randomUUID() - every { host.model } returns HostModel(4, 2048) + every { host.model } returns HostModel(4 * 2600.0, 4, 2048) every { host.state } returns HostState.UP every { host.canFit(any()) } returns true every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) } diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt index cafd4498..3f2ce43b 100644 --- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt +++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt @@ -33,10 +33,7 @@ import org.opendc.compute.api.Server import org.opendc.compute.service.driver.HostModel import org.opendc.compute.service.driver.HostState import org.opendc.compute.service.internal.HostView -import org.opendc.compute.service.scheduler.filters.ComputeFilter -import org.opendc.compute.service.scheduler.filters.InstanceCountFilter -import org.opendc.compute.service.scheduler.filters.RamFilter -import org.opendc.compute.service.scheduler.filters.VCpuFilter +import org.opendc.compute.service.scheduler.filters.* import org.opendc.compute.service.scheduler.weights.CoreRamWeigher import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher import org.opendc.compute.service.scheduler.weights.RamWeigher @@ -183,12 +180,12 @@ internal class FilterSchedulerTest { val hostA = mockk<HostView>() every { hostA.host.state } returns HostState.UP - every { hostA.host.model } returns HostModel(4, 2048) + every { hostA.host.model } returns HostModel(4 * 2600.0, 4, 2048) every { hostA.availableMemory } returns 512 val hostB = mockk<HostView>() every { hostB.host.state } returns HostState.UP - every { hostB.host.model } returns HostModel(4, 2048) + every { hostB.host.model } returns HostModel(4 * 2600.0, 4, 2048) every { hostB.availableMemory } returns 2048 scheduler.addHost(hostA) @@ -210,7 +207,7 @@ internal class FilterSchedulerTest { val host = mockk<HostView>() every { host.host.state } returns HostState.UP - every { host.host.model } returns HostModel(4, 2048) + every { host.host.model } returns HostModel(4 * 2600.0, 4, 2048) every { host.availableMemory } returns 2048 scheduler.addHost(host) @@ -231,12 +228,12 @@ internal class FilterSchedulerTest { val hostA = mockk<HostView>() every { hostA.host.state } returns HostState.UP - every { hostA.host.model } returns HostModel(4, 2048) + every { hostA.host.model } returns HostModel(4 * 2600.0, 4, 2048) every { hostA.provisionedCores } returns 3 val hostB = mockk<HostView>() every { hostB.host.state } returns HostState.UP - every { hostB.host.model } returns HostModel(4, 2048) + every { hostB.host.model } returns HostModel(4 * 2600.0, 4, 2048) every { hostB.provisionedCores } returns 0 scheduler.addHost(hostA) @@ -258,7 +255,7 @@ internal class FilterSchedulerTest { val host = mockk<HostView>() every { host.host.state } returns HostState.UP - every { host.host.model } returns HostModel(4, 2048) + every { host.host.model } returns HostModel(4 * 2600.0, 4, 2048) every { host.provisionedCores } returns 0 scheduler.addHost(host) @@ -271,6 +268,34 @@ internal class FilterSchedulerTest { } @Test + fun testVCpuCapacityFilter() { + val scheduler = FilterScheduler( + filters = listOf(VCpuCapacityFilter()), + weighers = emptyList(), + ) + + val hostA = mockk<HostView>() + every { hostA.host.state } returns HostState.UP + every { hostA.host.model } returns HostModel(8 * 2600.0, 8, 2048) + every { hostA.availableMemory } returns 512 + scheduler.addHost(hostA) + + val hostB = mockk<HostView>() + every { hostB.host.state } returns HostState.UP + every { hostB.host.model } returns HostModel(4 * 3200.0, 4, 2048) + every { hostB.availableMemory } returns 512 + + scheduler.addHost(hostB) + + val server = mockk<Server>() + every { server.flavor.cpuCount } returns 2 + every { server.flavor.memorySize } returns 1024 + every { server.flavor.meta } returns mapOf("cpu-capacity" to 2 * 3200.0) + + assertEquals(hostB, scheduler.select(server)) + } + + @Test fun testInstanceCountFilter() { val scheduler = FilterScheduler( filters = listOf(InstanceCountFilter(limit = 2)), @@ -279,12 +304,12 @@ internal class FilterSchedulerTest { val hostA = mockk<HostView>() every { hostA.host.state } returns HostState.UP - every { hostA.host.model } returns HostModel(4, 2048) + every { hostA.host.model } returns HostModel(4 * 2600.0, 4, 2048) every { hostA.instanceCount } returns 2 val hostB = mockk<HostView>() every { hostB.host.state } returns HostState.UP - every { hostB.host.model } returns HostModel(4, 2048) + every { hostB.host.model } returns HostModel(4 * 2600.0, 4, 2048) every { hostB.instanceCount } returns 0 scheduler.addHost(hostA) @@ -306,12 +331,12 @@ internal class FilterSchedulerTest { val hostA = mockk<HostView>() every { hostA.host.state } returns HostState.UP - every { hostA.host.model } returns HostModel(4, 2048) + every { hostA.host.model } returns HostModel(4 * 2600.0, 4, 2048) every { hostA.availableMemory } returns 1024 val hostB = mockk<HostView>() every { hostB.host.state } returns HostState.UP - every { hostB.host.model } returns HostModel(4, 2048) + every { hostB.host.model } returns HostModel(4 * 2600.0, 4, 2048) every { hostB.availableMemory } returns 512 scheduler.addHost(hostA) @@ -333,12 +358,12 @@ internal class FilterSchedulerTest { val hostA = mockk<HostView>() every { hostA.host.state } returns HostState.UP - every { hostA.host.model } returns HostModel(12, 2048) + every { hostA.host.model } returns HostModel(12 * 2600.0, 12, 2048) every { hostA.availableMemory } returns 1024 val hostB = mockk<HostView>() every { hostB.host.state } returns HostState.UP - every { hostB.host.model } returns HostModel(4, 2048) + every { hostB.host.model } returns HostModel(4 * 2600.0, 4, 2048) every { hostB.availableMemory } returns 512 scheduler.addHost(hostA) @@ -360,12 +385,12 @@ internal class FilterSchedulerTest { val hostA = mockk<HostView>() every { hostA.host.state } returns HostState.UP - every { hostA.host.model } returns HostModel(4, 2048) + every { hostA.host.model } returns HostModel(4 * 2600.0, 4, 2048) every { hostA.provisionedCores } returns 2 val hostB = mockk<HostView>() every { hostB.host.state } returns HostState.UP - every { hostB.host.model } returns HostModel(4, 2048) + every { hostB.host.model } returns HostModel(4 * 2600.0, 4, 2048) every { hostB.provisionedCores } returns 0 scheduler.addHost(hostA) @@ -387,12 +412,12 @@ internal class FilterSchedulerTest { val hostA = mockk<HostView>() every { hostA.host.state } returns HostState.UP - every { hostA.host.model } returns HostModel(4, 2048) + every { hostA.host.model } returns HostModel(4 * 2600.0, 4, 2048) every { hostA.instanceCount } returns 2 val hostB = mockk<HostView>() every { hostB.host.state } returns HostState.UP - every { hostB.host.model } returns HostModel(4, 2048) + every { hostB.host.model } returns HostModel(4 * 2600.0, 4, 2048) every { hostB.instanceCount } returns 0 scheduler.addHost(hostA) diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index b9d02185..908a58e9 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -47,6 +47,7 @@ import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.power.PowerDriver import org.opendc.simulator.compute.power.SimplePowerDriver +import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.flow.FlowEngine import java.util.* import kotlin.coroutines.CoroutineContext @@ -121,7 +122,7 @@ public class SimHost( field = value } - override val model: HostModel = HostModel(model.cpus.size, model.memory.sumOf { it.size }) + override val model: HostModel = HostModel(model.cpus.sumOf { it.frequency }, model.cpus.size, model.memory.sumOf { it.size }) /** * The [GuestListener] that listens for guest events. @@ -136,11 +137,6 @@ public class SimHost( } } - /** - * The [Job] that represents the machine running the hypervisor. - */ - private var _job: Job? = null - init { launch() @@ -188,7 +184,7 @@ public class SimHost( } override fun canFit(server: Server): Boolean { - val sufficientMemory = model.memorySize >= server.flavor.memorySize + val sufficientMemory = model.memoryCapacity >= server.flavor.memorySize val enoughCpus = model.cpuCount >= server.flavor.cpuCount val canFit = hypervisor.canFit(server.flavor.toMachineModel()) @@ -199,11 +195,12 @@ public class SimHost( val guest = guests.computeIfAbsent(server) { key -> require(canFit(key)) { "Server does not fit" } - val machine = hypervisor.createMachine(key.flavor.toMachineModel(), key.name) + val machine = hypervisor.newMachine(key.flavor.toMachineModel(), key.name) val newGuest = Guest( scope.coroutineContext, clock, this, + hypervisor, mapper, guestListener, server, @@ -249,7 +246,7 @@ public class SimHost( override fun close() { reset() scope.cancel() - machine.close() + machine.cancel() } override fun toString(): String = "SimHost[uid=$uid,name=$name,model=$model]" @@ -276,26 +273,39 @@ public class SimHost( } /** + * The [Job] that represents the machine running the hypervisor. + */ + private var _ctx: SimMachineContext? = null + + /** * Launch the hypervisor. */ private fun launch() { - check(_job == null) { "Concurrent hypervisor running" } + check(_ctx == null) { "Concurrent hypervisor running" } // Launch hypervisor onto machine - _job = scope.launch { - try { - _bootTime = clock.millis() - _state = HostState.UP - machine.run(hypervisor, emptyMap()) - } catch (_: CancellationException) { - // Ignored - } catch (cause: Throwable) { - logger.error(cause) { "Host failed" } - throw cause - } finally { - _state = HostState.DOWN + _ctx = machine.startWorkload(object : SimWorkload { + override fun onStart(ctx: SimMachineContext) { + try { + _bootTime = clock.millis() + _state = HostState.UP + hypervisor.onStart(ctx) + } catch (cause: Throwable) { + _state = HostState.DOWN + _ctx = null + throw cause + } } - } + + override fun onStop(ctx: SimMachineContext) { + try { + hypervisor.onStop(ctx) + } finally { + _state = HostState.DOWN + _ctx = null + } + } + }) } /** @@ -305,12 +315,7 @@ public class SimHost( updateUptime() // Stop the hypervisor - val job = _job - if (job != null) { - job.cancel() - _job = null - } - + _ctx?.close() _state = HostState.DOWN } @@ -319,8 +324,9 @@ public class SimHost( */ private fun Flavor.toMachineModel(): MachineModel { val originalCpu = machine.model.cpus[0] + val cpuCapacity = (this.meta["cpu-capacity"] as? Double ?: Double.MAX_VALUE).coerceAtMost(originalCpu.frequency) val processingNode = originalCpu.node.copy(coreCount = cpuCount) - val processingUnits = (0 until cpuCount).map { originalCpu.copy(id = it, node = processingNode) } + val processingUnits = (0 until cpuCount).map { originalCpu.copy(id = it, node = processingNode, frequency = cpuCapacity) } val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize)) return MachineModel(processingUnits, memoryUnits).optimize() diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt index 5ea1860d..9f3122db 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt @@ -34,7 +34,9 @@ import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState import org.opendc.compute.simulator.SimHost import org.opendc.compute.simulator.SimWorkloadMapper +import org.opendc.simulator.compute.kernel.SimHypervisor import org.opendc.simulator.compute.kernel.SimVirtualMachine +import org.opendc.simulator.compute.runWorkload import org.opendc.simulator.compute.workload.SimWorkload import java.time.Clock import kotlin.coroutines.CoroutineContext @@ -46,6 +48,7 @@ internal class Guest( context: CoroutineContext, private val clock: Clock, val host: SimHost, + private val hypervisor: SimHypervisor, private val mapper: SimWorkloadMapper, private val listener: GuestListener, val server: Server, @@ -114,8 +117,7 @@ internal class Guest( stop() state = ServerState.DELETED - - machine.close() + hypervisor.removeMachine(machine) scope.cancel() } @@ -191,7 +193,7 @@ internal class Guest( */ private suspend fun runMachine(workload: SimWorkload) { delay(1) // TODO Introduce model for boot time - machine.run(workload, mapOf("driver" to host, "server" to server)) + machine.runWorkload(workload, mapOf("driver" to host, "server" to server)) } /** @@ -248,7 +250,7 @@ internal class Guest( */ fun collectBootTime(result: ObservableLongMeasurement) { if (_bootTime != Long.MIN_VALUE) { - result.observe(_bootTime) + result.observe(_bootTime, attributes) } } diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index a0ff9228..799a8cf0 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -46,8 +46,8 @@ import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.flow.FlowEngine import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.HOST_ID -import org.opendc.telemetry.compute.table.HostData -import org.opendc.telemetry.compute.table.ServerData +import org.opendc.telemetry.compute.table.HostTableReader +import org.opendc.telemetry.compute.table.ServerTableReader import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import org.opendc.telemetry.sdk.toOtelClock import java.time.Duration @@ -140,10 +140,10 @@ internal class SimHostTest { val reader = CoroutineMetricReader( this, listOf(meterProvider as MetricProducer), object : ComputeMetricExporter() { - override fun record(data: HostData) { - activeTime += data.cpuActiveTime - idleTime += data.cpuIdleTime - stealTime += data.cpuStealTime + override fun record(reader: HostTableReader) { + activeTime += reader.cpuActiveTime + idleTime += reader.cpuIdleTime + stealTime += reader.cpuStealTime } }, exportInterval = Duration.ofSeconds(duration) @@ -236,16 +236,16 @@ internal class SimHostTest { val reader = CoroutineMetricReader( this, listOf(meterProvider as MetricProducer), object : ComputeMetricExporter() { - override fun record(data: HostData) { - activeTime += data.cpuActiveTime - idleTime += data.cpuIdleTime - uptime += data.uptime - downtime += data.downtime + override fun record(reader: HostTableReader) { + activeTime += reader.cpuActiveTime + idleTime += reader.cpuIdleTime + uptime += reader.uptime + downtime += reader.downtime } - override fun record(data: ServerData) { - guestUptime += data.uptime - guestDowntime += data.downtime + override fun record(reader: ServerTableReader) { + guestUptime += reader.uptime + guestDowntime += reader.downtime } }, exportInterval = Duration.ofSeconds(duration) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt index 1a6624f7..f23becda 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt @@ -92,7 +92,8 @@ public class ComputeWorkloadLoader(private val baseDir: File) { val idCol = reader.resolve(RESOURCE_ID) val startTimeCol = reader.resolve(RESOURCE_START_TIME) val stopTimeCol = reader.resolve(RESOURCE_STOP_TIME) - val coresCol = reader.resolve(RESOURCE_CPU_COUNT) + val cpuCountCol = reader.resolve(RESOURCE_CPU_COUNT) + val cpuCapacityCol = reader.resolve(RESOURCE_CPU_CAPACITY) val memCol = reader.resolve(RESOURCE_MEM_CAPACITY) var counter = 0 @@ -108,8 +109,9 @@ public class ComputeWorkloadLoader(private val baseDir: File) { val submissionTime = reader.get(startTimeCol) as Instant val endTime = reader.get(stopTimeCol) as Instant - val maxCores = reader.getInt(coresCol) - val requiredMemory = reader.getDouble(memCol) / 1000.0 // Convert from KB to MB + val cpuCount = reader.getInt(cpuCountCol) + val cpuCapacity = reader.getDouble(cpuCapacityCol) + val memCapacity = reader.getDouble(memCol) / 1000.0 // Convert from KB to MB val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray()) val builder = fragments.getValue(id) @@ -119,8 +121,9 @@ public class ComputeWorkloadLoader(private val baseDir: File) { VirtualMachine( uid, id, - maxCores, - requiredMemory.roundToLong(), + cpuCount, + cpuCapacity, + memCapacity.roundToLong(), totalLoad, submissionTime, endTime, diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt index 283f82fe..90ee56cb 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt @@ -128,7 +128,8 @@ public class ComputeWorkloadRunner( client.newFlavor( entry.name, entry.cpuCount, - entry.memCapacity + entry.memCapacity, + meta = if (entry.cpuCapacity > 0.0) mapOf("cpu-capacity" to entry.cpuCapacity) else emptyMap() ), meta = mapOf("workload" to workload) ) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt index 5dd239f6..88e80719 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt @@ -31,8 +31,9 @@ import java.util.* * * @param uid The unique identifier of the virtual machine. * @param name The name of the virtual machine. + * @param cpuCapacity The required CPU capacity for the VM in MHz. * @param cpuCount The number of vCPUs in the VM. - * @param memCapacity The provisioned memory for the VM. + * @param memCapacity The provisioned memory for the VM in MB. * @param startTime The start time of the VM. * @param stopTime The stop time of the VM. * @param trace The trace that belong to this VM. @@ -41,6 +42,7 @@ public data class VirtualMachine( val uid: UUID, val name: String, val cpuCount: Int, + val cpuCapacity: Double, val memCapacity: Long, val totalLoad: Double, val startTime: Instant, diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt index ad182d67..a46885f4 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt @@ -25,9 +25,9 @@ package org.opendc.compute.workload.export.parquet import io.opentelemetry.sdk.common.CompletableResultCode import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.ComputeMonitor -import org.opendc.telemetry.compute.table.HostData -import org.opendc.telemetry.compute.table.ServerData -import org.opendc.telemetry.compute.table.ServiceData +import org.opendc.telemetry.compute.table.HostTableReader +import org.opendc.telemetry.compute.table.ServerTableReader +import org.opendc.telemetry.compute.table.ServiceTableReader import java.io.File /** @@ -49,16 +49,16 @@ public class ParquetComputeMetricExporter(base: File, partition: String, bufferS bufferSize ) - override fun record(data: ServerData) { - serverWriter.write(data) + override fun record(reader: ServerTableReader) { + serverWriter.write(reader) } - override fun record(data: HostData) { - hostWriter.write(data) + override fun record(reader: HostTableReader) { + hostWriter.write(reader) } - override fun record(data: ServiceData) { - serviceWriter.write(data) + override fun record(reader: ServiceTableReader) { + serviceWriter.write(reader) } override fun shutdown(): CompletableResultCode { diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt index 4172d729..84387bbc 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt @@ -50,9 +50,9 @@ public abstract class ParquetDataWriter<in T>( private val logger = KotlinLogging.logger {} /** - * The queue of commands to process. + * The queue of records to process. */ - private val queue: BlockingQueue<T> = ArrayBlockingQueue(bufferSize) + private val queue: BlockingQueue<GenericData.Record> = ArrayBlockingQueue(bufferSize) /** * An exception to be propagated to the actual writer. @@ -72,20 +72,20 @@ public abstract class ParquetDataWriter<in T>( } val queue = queue - val buf = mutableListOf<T>() + val buf = mutableListOf<GenericData.Record>() var shouldStop = false try { while (!shouldStop) { try { - process(writer, queue.take()) + writer.write(queue.take()) } catch (e: InterruptedException) { shouldStop = true } if (queue.drainTo(buf) > 0) { for (data in buf) { - process(writer, data) + writer.write(data) } buf.clear() } @@ -119,7 +119,9 @@ public abstract class ParquetDataWriter<in T>( throw IllegalStateException("Writer thread failed", exception) } - queue.put(data) + val builder = GenericRecordBuilder(schema) + convert(builder, data) + queue.put(builder.build()) } /** @@ -133,13 +135,4 @@ public abstract class ParquetDataWriter<in T>( init { writerThread.start() } - - /** - * Process the specified [data] to be written to the Parquet file. - */ - private fun process(writer: ParquetWriter<GenericData.Record>, data: T) { - val builder = GenericRecordBuilder(schema) - convert(builder, data) - writer.write(builder.build()) - } } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt index 98a0739e..2b7cac8f 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt @@ -28,17 +28,17 @@ import org.apache.avro.generic.GenericData import org.apache.avro.generic.GenericRecordBuilder import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetWriter -import org.opendc.telemetry.compute.table.HostData +import org.opendc.telemetry.compute.table.HostTableReader import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA import org.opendc.trace.util.parquet.UUID_SCHEMA import org.opendc.trace.util.parquet.optional import java.io.File /** - * A Parquet event writer for [HostData]s. + * A Parquet event writer for [HostTableReader]s. */ public class ParquetHostDataWriter(path: File, bufferSize: Int) : - ParquetDataWriter<HostData>(path, SCHEMA, bufferSize) { + ParquetDataWriter<HostTableReader>(path, SCHEMA, bufferSize) { override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> { return builder @@ -46,7 +46,7 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) : .build() } - override fun convert(builder: GenericRecordBuilder, data: HostData) { + override fun convert(builder: GenericRecordBuilder, data: HostTableReader) { builder["timestamp"] = data.timestamp.toEpochMilli() builder["host_id"] = data.host.id diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt index 0d11ec23..144b6624 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt @@ -28,17 +28,17 @@ import org.apache.avro.generic.GenericData import org.apache.avro.generic.GenericRecordBuilder import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetWriter -import org.opendc.telemetry.compute.table.ServerData +import org.opendc.telemetry.compute.table.ServerTableReader import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA import org.opendc.trace.util.parquet.UUID_SCHEMA import org.opendc.trace.util.parquet.optional import java.io.File /** - * A Parquet event writer for [ServerData]s. + * A Parquet event writer for [ServerTableReader]s. */ public class ParquetServerDataWriter(path: File, bufferSize: Int) : - ParquetDataWriter<ServerData>(path, SCHEMA, bufferSize) { + ParquetDataWriter<ServerTableReader>(path, SCHEMA, bufferSize) { override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> { return builder @@ -47,7 +47,7 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) : .build() } - override fun convert(builder: GenericRecordBuilder, data: ServerData) { + override fun convert(builder: GenericRecordBuilder, data: ServerTableReader) { builder["timestamp"] = data.timestamp.toEpochMilli() builder["server_id"] = data.server.id @@ -55,9 +55,8 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) : builder["uptime"] = data.uptime builder["downtime"] = data.downtime - val bootTime = data.bootTime - builder["boot_time"] = bootTime?.toEpochMilli() - builder["scheduling_latency"] = data.schedulingLatency + builder["boot_time"] = data.bootTime?.toEpochMilli() + builder["provision_time"] = data.provisionTime?.toEpochMilli() builder["cpu_count"] = data.server.cpuCount builder["cpu_limit"] = data.cpuLimit @@ -81,8 +80,8 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) : .name("host_id").type(UUID_SCHEMA.optional()).noDefault() .requiredLong("uptime") .requiredLong("downtime") + .name("provision_time").type(TIMESTAMP_SCHEMA.optional()).noDefault() .name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault() - .requiredLong("scheduling_latency") .requiredInt("cpu_count") .requiredDouble("cpu_limit") .requiredLong("cpu_time_active") diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt index 47824b29..ec8a2b65 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt @@ -25,17 +25,17 @@ package org.opendc.compute.workload.export.parquet import org.apache.avro.Schema import org.apache.avro.SchemaBuilder import org.apache.avro.generic.GenericRecordBuilder -import org.opendc.telemetry.compute.table.ServiceData +import org.opendc.telemetry.compute.table.ServiceTableReader import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA import java.io.File /** - * A Parquet event writer for [ServiceData]s. + * A Parquet event writer for [ServiceTableReader]s. */ public class ParquetServiceDataWriter(path: File, bufferSize: Int) : - ParquetDataWriter<ServiceData>(path, SCHEMA, bufferSize) { + ParquetDataWriter<ServiceTableReader>(path, SCHEMA, bufferSize) { - override fun convert(builder: GenericRecordBuilder, data: ServiceData) { + override fun convert(builder: GenericRecordBuilder, data: ServiceTableReader) { builder["timestamp"] = data.timestamp.toEpochMilli() builder["hosts_up"] = data.hostsUp builder["hosts_down"] = data.hostsDown diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReader.kt deleted file mode 100644 index 67f9626c..00000000 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReader.kt +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.workload.util - -import com.fasterxml.jackson.annotation.JsonProperty -import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper -import com.fasterxml.jackson.module.kotlin.readValue -import org.opendc.simulator.compute.kernel.interference.VmInterferenceGroup -import java.io.File -import java.io.InputStream - -/** - * A parser for the JSON performance interference setup files used for the TPDS article on Capelin. - */ -public class PerformanceInterferenceReader { - /** - * The [ObjectMapper] to use. - */ - private val mapper = jacksonObjectMapper() - - init { - mapper.addMixIn(VmInterferenceGroup::class.java, GroupMixin::class.java) - } - - /** - * Read the performance interface model from [file]. - */ - public fun read(file: File): List<VmInterferenceGroup> { - return mapper.readValue(file) - } - - /** - * Read the performance interface model from the input. - */ - public fun read(input: InputStream): List<VmInterferenceGroup> { - return mapper.readValue(input) - } - - private data class GroupMixin( - @JsonProperty("minServerLoad") - val targetLoad: Double, - @JsonProperty("performanceScore") - val score: Double, - @JsonProperty("vms") - val members: Set<String>, - ) -} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReader.kt new file mode 100644 index 00000000..e0fa8904 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReader.kt @@ -0,0 +1,128 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.util + +import com.fasterxml.jackson.annotation.JsonProperty +import com.fasterxml.jackson.core.JsonParseException +import com.fasterxml.jackson.core.JsonParser +import com.fasterxml.jackson.core.JsonToken +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper +import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel +import java.io.File +import java.io.InputStream + +/** + * A parser for the JSON performance interference setup files used for the TPDS article on Capelin. + */ +public class VmInterferenceModelReader { + /** + * The [ObjectMapper] to use. + */ + private val mapper = jacksonObjectMapper() + + /** + * Read the performance interface model from [file]. + */ + public fun read(file: File): VmInterferenceModel { + val builder = VmInterferenceModel.builder() + val parser = mapper.createParser(file) + parseGroups(parser, builder) + return builder.build() + } + + /** + * Read the performance interface model from the input. + */ + public fun read(input: InputStream): VmInterferenceModel { + val builder = VmInterferenceModel.builder() + val parser = mapper.createParser(input) + parseGroups(parser, builder) + return builder.build() + } + + /** + * Parse all groups in an interference JSON file. + */ + private fun parseGroups(parser: JsonParser, builder: VmInterferenceModel.Builder) { + parser.nextToken() + + if (!parser.isExpectedStartArrayToken) { + throw JsonParseException(parser, "Expected array at start, but got ${parser.currentToken()}") + } + + while (parser.nextToken() != JsonToken.END_ARRAY) { + parseGroup(parser, builder) + } + } + + /** + * Parse a group an interference JSON file. + */ + private fun parseGroup(parser: JsonParser, builder: VmInterferenceModel.Builder) { + var targetLoad = Double.POSITIVE_INFINITY + var score = 1.0 + val members = mutableSetOf<String>() + + if (!parser.isExpectedStartObjectToken) { + throw JsonParseException(parser, "Expected object, but got ${parser.currentToken()}") + } + + while (parser.nextValue() != JsonToken.END_OBJECT) { + when (parser.currentName) { + "vms" -> parseGroupMembers(parser, members) + "minServerLoad" -> targetLoad = parser.doubleValue + "performanceScore" -> score = parser.doubleValue + } + } + + builder.addGroup(members, targetLoad, score) + } + + /** + * Parse the members of a group. + */ + private fun parseGroupMembers(parser: JsonParser, members: MutableSet<String>) { + if (!parser.isExpectedStartArrayToken) { + throw JsonParseException(parser, "Expected array for group members, but got ${parser.currentToken()}") + } + + while (parser.nextValue() != JsonToken.END_ARRAY) { + if (parser.currentToken() != JsonToken.VALUE_STRING) { + throw JsonParseException(parser, "Expected string value for group member") + } + + val member = parser.text.removePrefix("vm__workload__").removeSuffix(".txt") + members.add(member) + } + } + + private data class Group( + @JsonProperty("minServerLoad") + val targetLoad: Double, + @JsonProperty("performanceScore") + val score: Double, + @JsonProperty("vms") + val members: Set<String>, + ) +} diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReaderTest.kt index c48bff3a..1c3e7149 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt +++ b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReaderTest.kt @@ -20,24 +20,18 @@ * SOFTWARE. */ -package org.opendc.telemetry.compute.table +package org.opendc.compute.workload.util -import java.time.Instant +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertDoesNotThrow /** - * A trace entry for a particular server. + * Test suite for the [VmInterferenceModelReader] class. */ -public data class ServerData( - val timestamp: Instant, - val server: ServerInfo, - val host: HostInfo?, - val uptime: Long, - val downtime: Long, - val bootTime: Instant?, - val schedulingLatency: Long, - val cpuLimit: Double, - val cpuActiveTime: Long, - val cpuIdleTime: Long, - val cpuStealTime: Long, - val cpuLostTime: Long, -) +class VmInterferenceModelReaderTest { + @Test + fun testSmoke() { + val input = checkNotNull(VmInterferenceModelReader::class.java.getResourceAsStream("/perf-interference.json")) + assertDoesNotThrow { VmInterferenceModelReader().read(input) } + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 4e855f82..53c9de11 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -30,14 +30,13 @@ import org.opendc.compute.workload.createComputeScheduler import org.opendc.compute.workload.export.parquet.ParquetComputeMetricExporter import org.opendc.compute.workload.grid5000 import org.opendc.compute.workload.topology.apply -import org.opendc.compute.workload.util.PerformanceInterferenceReader +import org.opendc.compute.workload.util.VmInterferenceModelReader import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload import org.opendc.experiments.capelin.topology.clusterTopology import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf -import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.core.runBlockingSimulation import org.opendc.telemetry.compute.collectServiceMetrics import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader @@ -99,9 +98,8 @@ abstract class Portfolio(name: String) : Experiment(name) { val seeder = Random(repeat.toLong()) val performanceInterferenceModel = if (operationalPhenomena.hasInterference) - PerformanceInterferenceReader() + VmInterferenceModelReader() .read(File(config.getString("interference-model"))) - .let { VmInterferenceModel(it, Random(seeder.nextLong())) } else null @@ -116,7 +114,7 @@ abstract class Portfolio(name: String) : Experiment(name) { clock, computeScheduler, failureModel, - performanceInterferenceModel + performanceInterferenceModel?.withSeed(repeat.toLong()) ) val exporter = ParquetComputeMetricExporter( diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 94e92c1b..f3a6ed1a 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -34,13 +34,12 @@ import org.opendc.compute.service.scheduler.weights.CoreRamWeigher import org.opendc.compute.workload.* import org.opendc.compute.workload.topology.Topology import org.opendc.compute.workload.topology.apply -import org.opendc.compute.workload.util.PerformanceInterferenceReader +import org.opendc.compute.workload.util.VmInterferenceModelReader import org.opendc.experiments.capelin.topology.clusterTopology -import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.core.runBlockingSimulation import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.collectServiceMetrics -import org.opendc.telemetry.compute.table.HostData +import org.opendc.telemetry.compute.table.HostTableReader import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import java.io.File import java.time.Duration @@ -177,9 +176,9 @@ class CapelinIntegrationTest { val workload = createTestWorkload(1.0, seed) val perfInterferenceInput = checkNotNull(CapelinIntegrationTest::class.java.getResourceAsStream("/bitbrains-perf-interference.json")) val performanceInterferenceModel = - PerformanceInterferenceReader() + VmInterferenceModelReader() .read(perfInterferenceInput) - .let { VmInterferenceModel(it, Random(seed.toLong())) } + .withSeed(seed.toLong()) val simulator = ComputeWorkloadRunner( coroutineContext, @@ -213,7 +212,7 @@ class CapelinIntegrationTest { { assertEquals(6013515, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, { assertEquals(14724500, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } }, { assertEquals(12530742, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } }, - { assertEquals(481251, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } } + { assertEquals(465088, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } } ) } @@ -285,13 +284,13 @@ class CapelinIntegrationTest { var energyUsage = 0.0 var uptime = 0L - override fun record(data: HostData) { - idleTime += data.cpuIdleTime - activeTime += data.cpuActiveTime - stealTime += data.cpuStealTime - lostTime += data.cpuLostTime - energyUsage += data.powerTotal - uptime += data.uptime + override fun record(reader: HostTableReader) { + idleTime += reader.cpuIdleTime + activeTime += reader.cpuActiveTime + stealTime += reader.cpuStealTime + lostTime += reader.cpuLostTime + energyUsage += reader.powerTotal + uptime += reader.uptime } } } diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt index fb36d2c7..1752802f 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt @@ -34,6 +34,7 @@ import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.PowerModel import org.opendc.simulator.compute.power.SimplePowerDriver +import org.opendc.simulator.compute.runWorkload import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.flow.* import java.time.Clock @@ -128,6 +129,8 @@ public class SimTFDevice( } } + override fun onStop(ctx: SimMachineContext) {} + override fun onStart(conn: FlowConnection, now: Long) { ctx = conn capacity = conn.capacity @@ -172,7 +175,7 @@ public class SimTFDevice( init { scope.launch { - machine.run(workload) + machine.runWorkload(workload) } } @@ -189,7 +192,7 @@ public class SimTFDevice( } override fun close() { - machine.close() + machine.cancel() scope.cancel() } diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionInstance.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionInstance.kt index a8b04df4..77eadbbe 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionInstance.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionInstance.kt @@ -25,9 +25,9 @@ package org.opendc.faas.service.deployer import org.opendc.faas.service.FunctionObject /** - * A [FunctionInstance] is a a self-contained worker—typically a container—capable of handling function executions. + * A [FunctionInstance] is a self-contained worker—typically a container—capable of handling function executions. * - * Multiple, concurrent function instances can exists for a single function, for scalability purposes. + * Multiple, concurrent function instances can exist for a single function, for scalability purposes. */ public interface FunctionInstance : AutoCloseable { /** diff --git a/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt b/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt index 020d75b5..68233c1a 100644 --- a/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt +++ b/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt @@ -36,6 +36,7 @@ import org.opendc.simulator.compute.SimMachine import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.power.SimplePowerDriver +import org.opendc.simulator.compute.runWorkload import org.opendc.simulator.flow.FlowEngine import java.time.Clock import java.util.ArrayDeque @@ -114,7 +115,7 @@ public class SimFunctionDeployer( override fun close() { state = FunctionInstanceState.Deleted stop() - machine.close() + machine.cancel() } override fun toString(): String = "FunctionInstance[state=$state]" @@ -130,7 +131,7 @@ public class SimFunctionDeployer( launch { try { - machine.run(workload) + machine.runWorkload(workload) } finally { state = FunctionInstanceState.Deleted } diff --git a/opendc-simulator/opendc-simulator-compute/build.gradle.kts b/opendc-simulator/opendc-simulator-compute/build.gradle.kts index a2bb89c2..ca8b912a 100644 --- a/opendc-simulator/opendc-simulator-compute/build.gradle.kts +++ b/opendc-simulator/opendc-simulator-compute/build.gradle.kts @@ -35,7 +35,7 @@ dependencies { api(projects.opendcSimulator.opendcSimulatorPower) api(projects.opendcSimulator.opendcSimulatorNetwork) implementation(projects.opendcSimulator.opendcSimulatorCore) - implementation(projects.opendcUtils) + implementation(libs.kotlin.logging) testImplementation(libs.slf4j.simple) } diff --git a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt index cb52d24f..91e91f9d 100644 --- a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt +++ b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt @@ -76,7 +76,7 @@ class SimMachineBenchmarks { val machine = SimBareMetalMachine( engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) - return@runBlockingSimulation machine.run(SimTraceWorkload(trace)) + return@runBlockingSimulation machine.runWorkload(SimTraceWorkload(trace)) } } @@ -89,15 +89,15 @@ class SimMachineBenchmarks { ) val hypervisor = SimSpaceSharedHypervisor(engine, null, null) - launch { machine.run(hypervisor) } + launch { machine.runWorkload(hypervisor) } - val vm = hypervisor.createMachine(machineModel) + val vm = hypervisor.newMachine(machineModel) try { - return@runBlockingSimulation vm.run(SimTraceWorkload(trace)) + return@runBlockingSimulation vm.runWorkload(SimTraceWorkload(trace)) } finally { - vm.close() - machine.close() + vm.cancel() + machine.cancel() } } } @@ -111,15 +111,15 @@ class SimMachineBenchmarks { ) val hypervisor = SimFairShareHypervisor(engine, null, null, null) - launch { machine.run(hypervisor) } + launch { machine.runWorkload(hypervisor) } - val vm = hypervisor.createMachine(machineModel) + val vm = hypervisor.newMachine(machineModel) try { - return@runBlockingSimulation vm.run(SimTraceWorkload(trace)) + return@runBlockingSimulation vm.runWorkload(SimTraceWorkload(trace)) } finally { - vm.close() - machine.close() + vm.cancel() + machine.cancel() } } } @@ -133,22 +133,22 @@ class SimMachineBenchmarks { ) val hypervisor = SimFairShareHypervisor(engine, null, null, null) - launch { machine.run(hypervisor) } + launch { machine.runWorkload(hypervisor) } coroutineScope { repeat(2) { - val vm = hypervisor.createMachine(machineModel) + val vm = hypervisor.newMachine(machineModel) launch { try { - vm.run(SimTraceWorkload(trace)) + vm.runWorkload(SimTraceWorkload(trace)) } finally { - machine.close() + machine.cancel() } } } } - machine.close() + machine.cancel() } } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt new file mode 100644 index 00000000..c23f48dc --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute + +import kotlinx.coroutines.suspendCancellableCoroutine +import org.opendc.simulator.compute.workload.SimWorkload +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException + +/** + * Run the specified [SimWorkload] on this machine and suspend execution util [workload] has finished. + * + * @param workload The workload to start on the machine. + * @param meta The metadata to pass to the workload. + * @return A [SimMachineContext] that represents the execution context for the workload. + * @throws IllegalStateException if a workload is already active on the machine or if the machine is closed. + */ +public suspend fun SimMachine.runWorkload(workload: SimWorkload, meta: Map<String, Any> = emptyMap()) { + return suspendCancellableCoroutine { cont -> + cont.invokeOnCancellation { this@runWorkload.cancel() } + + startWorkload( + object : SimWorkload { + override fun onStart(ctx: SimMachineContext) { + try { + workload.onStart(ctx) + } catch (cause: Throwable) { + cont.resumeWithException(cause) + throw cause + } + } + + override fun onStop(ctx: SimMachineContext) { + try { + workload.onStop(ctx) + + if (!cont.isCompleted) { + cont.resume(Unit) + } + } catch (cause: Throwable) { + cont.resumeWithException(cause) + throw cause + } + } + }, + meta + ) + } +} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt index 60a10f20..6a4c594d 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt @@ -23,6 +23,7 @@ package org.opendc.simulator.compute import kotlinx.coroutines.* +import mu.KotlinLogging import org.opendc.simulator.compute.device.SimNetworkAdapter import org.opendc.simulator.compute.device.SimPeripheral import org.opendc.simulator.compute.model.MachineModel @@ -31,8 +32,6 @@ import org.opendc.simulator.compute.model.NetworkAdapter import org.opendc.simulator.compute.model.StorageDevice import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.flow.* -import kotlin.coroutines.Continuation -import kotlin.coroutines.resume /** * Abstract implementation of the [SimMachine] interface. @@ -72,48 +71,20 @@ public abstract class SimAbstractMachine( public override val peripherals: List<SimPeripheral> = net.map { it as SimNetworkAdapter } /** - * A flag to indicate that the machine is terminated. + * The current active [Context]. */ - private var isTerminated = false + private var _ctx: Context? = null - /** - * The continuation to resume when the virtual machine workload has finished. - */ - private var cont: Continuation<Unit>? = null - - /** - * Converge the specified [SimWorkload] on this machine and suspend execution util the workload has finished. - */ - override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) { - check(!isTerminated) { "Machine is terminated" } - check(cont == null) { "A machine cannot run concurrently" } - - val ctx = Context(meta) - - return suspendCancellableCoroutine { cont -> - this.cont = cont - - // Cancel all cpus on cancellation - cont.invokeOnCancellation { - this.cont = null - engine.batch { - for (cpu in cpus) { - cpu.cancel() - } - } - } + override fun startWorkload(workload: SimWorkload, meta: Map<String, Any>): SimMachineContext { + check(_ctx == null) { "A machine cannot run concurrently" } - engine.batch { workload.onStart(ctx) } - } + val ctx = Context(workload, meta) + ctx.start() + return ctx } - override fun close() { - if (isTerminated) { - return - } - - isTerminated = true - cancel() + override fun cancel() { + _ctx?.close() } override fun onConverge(now: Long, delta: Long) { @@ -121,29 +92,35 @@ public abstract class SimAbstractMachine( } /** - * Cancel the workload that is currently running on the machine. + * The execution context in which the workload runs. + * + * @param workload The workload that is running on the machine. + * @param meta The metadata passed to the workload. */ - private fun cancel() { - engine.batch { - for (cpu in cpus) { - cpu.cancel() + private inner class Context( + private val workload: SimWorkload, + override val meta: Map<String, Any> + ) : SimMachineContext { + /** + * A flag to indicate that the context has been closed. + */ + private var isClosed = false + + override val engine: FlowEngine = this@SimAbstractMachine.engine + + /** + * Start this context. + */ + fun start() { + try { + _ctx = this + engine.batch { workload.onStart(this) } + } catch (cause: Throwable) { + logger.warn(cause) { "Workload failed during onStart callback" } + close() } } - val cont = cont - if (cont != null) { - this.cont = null - cont.resume(Unit) - } - } - - /** - * The execution context in which the workload runs. - */ - private inner class Context(override val meta: Map<String, Any>) : SimMachineContext { - override val engine: FlowEngine - get() = this@SimAbstractMachine.engine - override val cpus: List<SimProcessingUnit> = this@SimAbstractMachine.cpus override val memory: SimMemory = this@SimAbstractMachine.memory @@ -152,7 +129,49 @@ public abstract class SimAbstractMachine( override val storage: List<SimStorageInterface> = this@SimAbstractMachine.storage - override fun close() = cancel() + override fun close() { + if (isClosed) { + return + } + + isClosed = true + assert(_ctx == this) { "Invariant violation: multiple contexts active for a single machine" } + _ctx = null + + // Cancel all the resources associated with the machine + doCancel() + + try { + workload.onStop(this) + } catch (cause: Throwable) { + logger.warn(cause) { "Workload failed during onStop callback" } + } + } + + /** + * Run the stop procedures for the resources associated with the machine. + */ + private fun doCancel() { + engine.batch { + for (cpu in cpus) { + cpu.cancel() + } + + memory.cancel() + + for (ifx in net) { + (ifx as NetworkAdapterImpl).disconnect() + } + + for (storage in storage) { + val impl = storage as StorageDeviceImpl + impl.read.cancel() + impl.write.cancel() + } + } + } + + override fun toString(): String = "SimAbstractMachine.Context" } /** @@ -166,7 +185,7 @@ public abstract class SimAbstractMachine( * The [SimNetworkAdapter] implementation for a machine. */ private class NetworkAdapterImpl( - private val engine: FlowEngine, + engine: FlowEngine, model: NetworkAdapter, index: Int ) : SimNetworkAdapter(), SimNetworkInterface { @@ -208,4 +227,12 @@ public abstract class SimAbstractMachine( override fun toString(): String = "SimAbstractMachine.StorageDeviceImpl[name=$name,capacity=$capacity]" } + + private companion object { + /** + * The logging instance associated with this class. + */ + @JvmStatic + private val logger = KotlinLogging.logger {} + } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt index ab0b56ae..94581e89 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt @@ -29,7 +29,7 @@ import org.opendc.simulator.compute.workload.SimWorkload /** * A generic machine that is able to run a [SimWorkload]. */ -public interface SimMachine : AutoCloseable { +public interface SimMachine { /** * The model of the machine containing its specifications. */ @@ -41,12 +41,19 @@ public interface SimMachine : AutoCloseable { public val peripherals: List<SimPeripheral> /** - * Converge the specified [SimWorkload] on this machine and suspend execution util the workload has finished. + * Start the specified [SimWorkload] on this machine. + * + * @param workload The workload to start on the machine. + * @param meta The metadata to pass to the workload. + * @return A [SimMachineContext] that represents the execution context for the workload. + * @throws IllegalStateException if a workload is already active on the machine or if the machine is closed. */ - public suspend fun run(workload: SimWorkload, meta: Map<String, Any> = emptyMap()) + public fun startWorkload(workload: SimWorkload, meta: Map<String, Any> = emptyMap()): SimMachineContext /** - * Terminate this machine. + * Cancel the workload that is currently running on this machine. + * + * If no workload is active, this operation is a no-op. */ - public override fun close() + public fun cancel() } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt index f6d8f628..07465126 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt @@ -28,7 +28,9 @@ import org.opendc.simulator.compute.kernel.cpufreq.ScalingPolicy import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.flow.* +import org.opendc.simulator.flow.interference.InterferenceKey import org.opendc.simulator.flow.mux.FlowMultiplexer import kotlin.math.roundToLong @@ -92,13 +94,20 @@ public abstract class SimAbstractHypervisor( private val governors = mutableListOf<ScalingGovernor.Logic>() /* SimHypervisor */ - override fun createMachine(model: MachineModel, interferenceId: String?): SimVirtualMachine { + override fun newMachine(model: MachineModel, interferenceId: String?): SimVirtualMachine { require(canFit(model)) { "Machine does not fit" } val vm = VirtualMachine(model, interferenceId) _vms.add(vm) return vm } + override fun removeMachine(machine: SimVirtualMachine) { + if (_vms.remove(machine)) { + // This cast must always succeed, since `_vms` only contains `VirtualMachine` types. + (machine as VirtualMachine).close() + } + } + /* SimWorkload */ override fun onStart(ctx: SimMachineContext) { context = ctx @@ -121,6 +130,8 @@ public abstract class SimAbstractHypervisor( } } + override fun onStop(ctx: SimMachineContext) {} + private var _cpuCount = 0 private var _cpuCapacity = 0.0 @@ -141,33 +152,31 @@ public abstract class SimAbstractHypervisor( * * @param model The machine model of the virtual machine. */ - private inner class VirtualMachine(model: MachineModel, interferenceId: String? = null) : SimAbstractMachine(engine, parent = null, model), SimVirtualMachine { + private inner class VirtualMachine( + model: MachineModel, + interferenceId: String? = null + ) : SimAbstractMachine(engine, parent = null, model), SimVirtualMachine, AutoCloseable { + /** + * A flag to indicate that the machine is closed. + */ + private var isClosed = false + /** * The interference key of this virtual machine. */ - private val interferenceKey = interferenceId?.let { interferenceDomain?.join(interferenceId) } + private val interferenceKey: InterferenceKey? = interferenceId?.let { interferenceDomain?.createKey(it) } /** * The vCPUs of the machine. */ - override val cpus = model.cpus.map { VCpu(mux, mux.newInput(interferenceKey), it) } + override val cpus = model.cpus.map { cpu -> VCpu(mux, mux.newInput(cpu.frequency, interferenceKey), cpu) } /** * The resource counters associated with the hypervisor. */ override val counters: SimHypervisorCounters get() = _counters - private val _counters = object : SimHypervisorCounters { - private val d = cpus.size / cpus.sumOf { it.model.frequency } * 1000 - - override val cpuActiveTime: Long - get() = (cpus.sumOf { it.counters.actual } * d).roundToLong() - override val cpuIdleTime: Long - get() = (cpus.sumOf { it.counters.actual + it.counters.remaining } * d).roundToLong() - override val cpuStealTime: Long - get() = (cpus.sumOf { it.counters.demand - it.counters.actual } * d).roundToLong() - override val cpuLostTime: Long = (cpus.sumOf { it.counters.interference } * d).roundToLong() - } + private val _counters = VmCountersImpl(cpus) /** * The CPU capacity of the hypervisor in MHz. @@ -187,14 +196,58 @@ public abstract class SimAbstractHypervisor( override val cpuUsage: Double get() = cpus.sumOf(FlowConsumer::rate) + override fun startWorkload(workload: SimWorkload, meta: Map<String, Any>): SimMachineContext { + check(!isClosed) { "Machine is closed" } + + return super.startWorkload( + object : SimWorkload { + override fun onStart(ctx: SimMachineContext) { + try { + joinInterferenceDomain() + workload.onStart(ctx) + } catch (cause: Throwable) { + leaveInterferenceDomain() + throw cause + } + } + + override fun onStop(ctx: SimMachineContext) { + leaveInterferenceDomain() + workload.onStop(ctx) + } + }, + meta + ) + } + override fun close() { - super.close() + if (isClosed) { + return + } + + isClosed = true + cancel() for (cpu in cpus) { cpu.close() } + } - _vms.remove(this) + /** + * Join the interference domain of the hypervisor. + */ + private fun joinInterferenceDomain() { + val interferenceKey = interferenceKey + if (interferenceKey != null) { + interferenceDomain?.join(interferenceKey) + } + } + + /** + * Leave the interference domain of the hypervisor. + */ + private fun leaveInterferenceDomain() { + val interferenceKey = interferenceKey if (interferenceKey != null) { interferenceDomain?.leave(interferenceKey) } @@ -211,9 +264,7 @@ public abstract class SimAbstractHypervisor( ) : SimProcessingUnit, FlowConsumer by source { override var capacity: Double get() = source.capacity - set(_) { - // Ignore capacity changes - } + set(_) = TODO("Capacity changes on vCPU not supported") override fun toString(): String = "SimAbstractHypervisor.VCpu[model=$model]" @@ -287,4 +338,20 @@ public abstract class SimAbstractHypervisor( cpuTime[3] += (interferenceDelta * d).roundToLong() } } + + /** + * A [SimHypervisorCounters] implementation for a virtual machine. + */ + private class VmCountersImpl(private val cpus: List<VCpu>) : SimHypervisorCounters { + private val d = cpus.size / cpus.sumOf { it.model.frequency } * 1000 + + override val cpuActiveTime: Long + get() = (cpus.sumOf { it.counters.actual } * d).roundToLong() + override val cpuIdleTime: Long + get() = (cpus.sumOf { it.counters.actual + it.counters.remaining } * d).roundToLong() + override val cpuStealTime: Long + get() = (cpus.sumOf { it.counters.demand - it.counters.actual } * d).roundToLong() + override val cpuLostTime: Long + get() = (cpus.sumOf { it.counters.interference } * d).roundToLong() + } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt index 57d4cf20..a69f419f 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt @@ -67,5 +67,12 @@ public interface SimHypervisor : SimWorkload { * @param model The machine to create. * @param interferenceId An identifier for the interference model. */ - public fun createMachine(model: MachineModel, interferenceId: String? = null): SimVirtualMachine + public fun newMachine(model: MachineModel, interferenceId: String? = null): SimVirtualMachine + + /** + * Remove the specified [machine] from the hypervisor. + * + * @param machine The machine to remove. + */ + public fun removeMachine(machine: SimVirtualMachine) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt index b737d61a..09b03306 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt @@ -30,14 +30,30 @@ import org.opendc.simulator.flow.interference.InterferenceKey */ public interface VmInterferenceDomain : InterferenceDomain { /** - * Join this interference domain. + * Construct an [InterferenceKey] for the specified [id]. * * @param id The identifier of the virtual machine. + * @return A key identifying the virtual machine as part of the interference domain. `null` if the virtual machine + * does not participate in the domain. */ - public fun join(id: String): InterferenceKey + public fun createKey(id: String): InterferenceKey? /** - * Leave this interference domain. + * Remove the specified [key] from this domain. + */ + public fun removeKey(key: InterferenceKey) + + /** + * Mark the specified [key] as active in this interference domain. + * + * @param key The key to join the interference domain with. + */ + public fun join(key: InterferenceKey) + + /** + * Mark the specified [key] as inactive in this interference domain. + * + * @param key The key of the virtual machine that wants to leave the domain. */ public fun leave(key: InterferenceKey) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt index b3d72507..977292be 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt @@ -28,143 +28,366 @@ import java.util.* /** * An interference model that models the resource interference between virtual machines on a host. * - * @param groups The groups of virtual machines that interfere with each other. - * @param random The [Random] instance to select the affected virtual machines. + * @param targets The target load of each group. + * @param scores The performance score of each group. + * @param members The members belonging to each group. + * @param membership The identifier of each key. + * @param size The number of groups. + * @param seed The seed to use for randomly selecting the virtual machines that are affected. */ -public class VmInterferenceModel( - private val groups: List<VmInterferenceGroup>, - private val random: Random = Random(0) +public class VmInterferenceModel private constructor( + private val targets: DoubleArray, + private val scores: DoubleArray, + private val idMapping: Map<String, Int>, + private val members: Array<IntArray>, + private val membership: Array<IntArray>, + private val size: Int, + seed: Long, ) { /** + * A [SplittableRandom] used for selecting the virtual machines that are affected. + */ + private val random = SplittableRandom(seed) + + /** * Construct a new [VmInterferenceDomain]. */ - public fun newDomain(): VmInterferenceDomain = object : VmInterferenceDomain { + public fun newDomain(): VmInterferenceDomain = InterferenceDomainImpl(targets, scores, idMapping, members, membership, random) + + /** + * Create a copy of this model with a different seed. + */ + public fun withSeed(seed: Long): VmInterferenceModel { + return VmInterferenceModel(targets, scores, idMapping, members, membership, size, seed) + } + + public companion object { /** - * The stateful groups of this domain. + * Construct a [Builder] instance. */ - private val groups = this@VmInterferenceModel.groups.map { GroupContext(it) } + @JvmStatic + public fun builder(): Builder = Builder() + } + /** + * Builder class for a [VmInterferenceModel] + */ + public class Builder internal constructor() { /** - * The set of keys active in this domain. + * The initial capacity of the builder. */ - private val keys = mutableSetOf<InterferenceKeyImpl>() + private val INITIAL_CAPACITY = 256 - override fun join(id: String): InterferenceKey { - val key = InterferenceKeyImpl(id, groups.filter { id in it }.sortedBy { it.group.targetLoad }) - keys += key - return key - } + /** + * The target load of each group. + */ + private var _targets = DoubleArray(INITIAL_CAPACITY) { Double.POSITIVE_INFINITY } - override fun leave(key: InterferenceKey) { - if (key is InterferenceKeyImpl) { - keys -= key - key.leave() + /** + * The performance score of each group. + */ + private var _scores = DoubleArray(INITIAL_CAPACITY) { Double.POSITIVE_INFINITY } + + /** + * The members of each group. + */ + private var _members = ArrayList<Set<String>>(INITIAL_CAPACITY) + + /** + * The mapping from member to group id. + */ + private val ids = TreeSet<String>() + + /** + * The number of groups in the model. + */ + private var size = 0 + + /** + * Add the specified group to the model. + */ + public fun addGroup(members: Set<String>, targetLoad: Double, score: Double): Builder { + val size = size + + if (size == _targets.size) { + grow() } + + _targets[size] = targetLoad + _scores[size] = score + _members.add(members) + ids.addAll(members) + + this.size++ + + return this } - override fun apply(key: InterferenceKey?, load: Double): Double { - if (key == null || key !is InterferenceKeyImpl) { - return 1.0 - } + /** + * Build the [VmInterferenceModel]. + */ + public fun build(seed: Long = 0): VmInterferenceModel { + val size = size + val targets = _targets + val scores = _scores + val members = _members - val ctx = key.findGroup(load) - val group = ctx?.group + val indices = Array(size) { it } + indices.sortWith( + Comparator { l, r -> + var cmp = targets[l].compareTo(targets[r]) // Order by target load + if (cmp != 0) { + return@Comparator cmp + } - // Apply performance penalty to (on average) only one of the VMs - return if (group != null && random.nextInt(group.members.size) == 0) { - group.score - } else { - 1.0 + cmp = scores[l].compareTo(scores[r]) // Higher penalty first (this means lower performance score first) + if (cmp != 0) + cmp + else + l.compareTo(r) + } + ) + + val newTargets = DoubleArray(size) + val newScores = DoubleArray(size) + val newMembers = arrayOfNulls<IntArray>(size) + + var nextId = 0 + val idMapping = ids.associateWith { nextId++ } + val membership = ids.associateWithTo(TreeMap()) { ArrayList<Int>() } + + for ((group, j) in indices.withIndex()) { + newTargets[group] = targets[j] + newScores[group] = scores[j] + val groupMembers = members[j] + val newGroupMembers = groupMembers.map { idMapping.getValue(it) }.toIntArray() + + newGroupMembers.sort() + newMembers[group] = newGroupMembers + + for (member in groupMembers) { + membership.getValue(member).add(group) + } } + + @Suppress("UNCHECKED_CAST") + return VmInterferenceModel( + newTargets, + newScores, + idMapping, + newMembers as Array<IntArray>, + membership.map { it.value.toIntArray() }.toTypedArray(), + size, + seed + ) } - override fun toString(): String = "VmInterferenceDomain" + /** + * Helper function to grow the capacity of the internal arrays. + */ + private fun grow() { + val oldSize = _targets.size + val newSize = oldSize + (oldSize shr 1) + + _targets = _targets.copyOf(newSize) + _scores = _scores.copyOf(newSize) + } } /** - * An interference key. - * - * @param id The identifier of the member. - * @param groups The groups to which the key belongs. + * Internal implementation of [VmInterferenceDomain]. */ - private inner class InterferenceKeyImpl(val id: String, private val groups: List<GroupContext>) : InterferenceKey { - init { - for (group in groups) { - group.join(this) - } - } + private class InterferenceDomainImpl( + private val targets: DoubleArray, + private val scores: DoubleArray, + private val idMapping: Map<String, Int>, + private val members: Array<IntArray>, + private val membership: Array<IntArray>, + private val random: SplittableRandom + ) : VmInterferenceDomain { + /** + * Keys registered with this domain. + */ + private val keys = HashMap<Int, InterferenceKeyImpl>() /** - * Find the active group that applies for the interference member. + * The set of keys active in this domain. */ - fun findGroup(load: Double): GroupContext? { - // Find the first active group whose target load is lower than the current load - val index = groups.binarySearchBy(load) { it.group.targetLoad } - val target = if (index >= 0) index else -(index + 1) + private val activeKeys = ArrayList<InterferenceKeyImpl>() - // Check whether there are active groups ahead of the index - for (i in target until groups.size) { - val group = groups[i] - if (group.group.targetLoad > load) { - break - } else if (group.isActive) { - return group + override fun createKey(id: String): InterferenceKey? { + val intId = idMapping[id] ?: return null + return keys.computeIfAbsent(intId) { InterferenceKeyImpl(intId) } + } + + override fun removeKey(key: InterferenceKey) { + if (key !is InterferenceKeyImpl) { + return + } + + if (activeKeys.remove(key)) { + computeActiveGroups(key.id) + } + + keys.remove(key.id) + } + + override fun join(key: InterferenceKey) { + if (key !is InterferenceKeyImpl) { + return + } + + if (key.acquire()) { + val pos = activeKeys.binarySearch(key) + if (pos < 0) { + activeKeys.add(-pos - 1, key) } + computeActiveGroups(key.id) } + } + + override fun leave(key: InterferenceKey) { + if (key is InterferenceKeyImpl && key.release()) { + activeKeys.remove(key) + computeActiveGroups(key.id) + } + } + + override fun apply(key: InterferenceKey?, load: Double): Double { + if (key == null || key !is InterferenceKeyImpl) { + return 1.0 + } + + val groups = key.groups + val groupSize = groups.size + + if (groupSize == 0) { + return 1.0 + } + + val targets = targets + val scores = scores + var low = 0 + var high = groups.size - 1 - // Check whether there are active groups before the index - for (i in (target - 1) downTo 0) { - val group = groups[i] - if (group.isActive) { - return group + var group = -1 + var score = 1.0 + + // Perform binary search over the groups based on target load + while (low <= high) { + val mid = low + high ushr 1 + val midGroup = groups[mid] + val target = targets[midGroup] + + if (target < load) { + low = mid + 1 + group = midGroup + score = scores[midGroup] + } else if (target > load) { + high = mid - 1 + } else { + group = midGroup + score = scores[midGroup] + break } } - return null + return if (group >= 0 && random.nextInt(members[group].size) == 0) { + score + } else { + 1.0 + } } + override fun toString(): String = "VmInterferenceDomain" + /** - * Leave all the groups. + * Queue of participants that will be removed or added to the active groups. */ - fun leave() { + private val _participants = ArrayDeque<InterferenceKeyImpl>() + + /** + * (Re-)Compute the active groups. + */ + private fun computeActiveGroups(id: Int) { + val activeKeys = activeKeys + val groups = membership[id] + + if (activeKeys.isEmpty()) { + return + } + + val members = members + val participants = _participants + for (group in groups) { - group.leave(this) + val groupMembers = members[group] + + var i = 0 + var j = 0 + var intersection = 0 + + // Compute the intersection of the group members and the current active members + while (i < groupMembers.size && j < activeKeys.size) { + val l = groupMembers[i] + val rightEntry = activeKeys[j] + val r = rightEntry.id + + if (l < r) { + i++ + } else if (l > r) { + j++ + } else { + participants.add(rightEntry) + intersection++ + + i++ + j++ + } + } + + while (true) { + val participant = participants.poll() ?: break + val participantGroups = participant.groups + if (intersection <= 1) { + participantGroups.remove(group) + } else { + val pos = participantGroups.binarySearch(group) + if (pos < 0) { + participantGroups.add(-pos - 1, group) + } + } + } } } } /** - * A group context is used to track the active keys per interference group. + * An interference key. + * + * @param id The identifier of the member. */ - private inner class GroupContext(val group: VmInterferenceGroup) { + private class InterferenceKeyImpl(@JvmField val id: Int) : InterferenceKey, Comparable<InterferenceKeyImpl> { /** - * The active keys that are part of this group. + * The active groups to which the key belongs. */ - private val keys = mutableSetOf<InterferenceKeyImpl>() + @JvmField val groups: MutableList<Int> = ArrayList() /** - * A flag to indicate that the group is active. + * The number of users of the interference key. */ - val isActive - get() = keys.size > 1 + private var refCount: Int = 0 /** - * Determine whether the specified [id] is part of this group. + * Join the domain. */ - operator fun contains(id: String): Boolean = id in group.members + fun acquire(): Boolean = refCount++ <= 0 /** - * Join this group with the specified [key]. + * Leave the domain. */ - fun join(key: InterferenceKeyImpl) { - keys += key - } + fun release(): Boolean = --refCount <= 0 - /** - * Leave this group with the specified [key]. - */ - fun leave(key: InterferenceKeyImpl) { - keys -= key - } + override fun compareTo(other: InterferenceKeyImpl): Int = id.compareTo(other.id) } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt index 99f4a1e1..726d1f56 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt @@ -48,5 +48,7 @@ public class SimFlopsWorkload( } } + override fun onStop(ctx: SimMachineContext) {} + override fun toString(): String = "SimFlopsWorkload(FLOPs=$flops,utilization=$utilization)" } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt index 2ef3bc43..8a3f5f84 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt @@ -48,5 +48,7 @@ public class SimRuntimeWorkload( } } + override fun onStop(ctx: SimMachineContext) {} + override fun toString(): String = "SimRuntimeWorkload(duration=$duration,utilization=$utilization)" } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt index 53c98409..ce04a790 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt @@ -40,5 +40,7 @@ public class SimTraceWorkload(private val trace: SimTrace, private val offset: L } } + override fun onStop(ctx: SimMachineContext) {} + override fun toString(): String = "SimTraceWorkload" } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt index b80665fa..61c6e2ad 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt @@ -37,4 +37,11 @@ public interface SimWorkload { * @param ctx The execution context in which the machine runs. */ public fun onStart(ctx: SimMachineContext) + + /** + * This method is invoked when the workload is stopped. + * + * @param ctx The execution context in which the machine runs. + */ + public fun onStop(ctx: SimMachineContext) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt index cc4f1f6a..742470a1 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt @@ -33,31 +33,50 @@ public class SimWorkloadLifecycle(private val ctx: SimMachineContext) { /** * The resource consumers which represent the lifecycle of the workload. */ - private val waiting = mutableSetOf<FlowSource>() + private val waiting = HashSet<Wrapper>() /** - * Wait for the specified [consumer] to complete before ending the lifecycle of the workload. + * Wait for the specified [source] to complete before ending the lifecycle of the workload. */ - public fun waitFor(consumer: FlowSource): FlowSource { - waiting.add(consumer) - return object : FlowSource by consumer { - override fun onStop(conn: FlowConnection, now: Long, delta: Long) { - try { - consumer.onStop(conn, now, delta) - } finally { - complete(consumer) - } - } - override fun toString(): String = "SimWorkloadLifecycle.Consumer[delegate=$consumer]" - } + public fun waitFor(source: FlowSource): FlowSource { + val wrapper = Wrapper(source) + waiting.add(wrapper) + return wrapper } /** - * Complete the specified [FlowSource]. + * Complete the specified [Wrapper]. */ - private fun complete(consumer: FlowSource) { - if (waiting.remove(consumer) && waiting.isEmpty()) { + private fun complete(wrapper: Wrapper) { + if (waiting.remove(wrapper) && waiting.isEmpty()) { ctx.close() } } + + /** + * A [FlowSource] that wraps [delegate] and informs [SimWorkloadLifecycle] that is has completed. + */ + private inner class Wrapper(private val delegate: FlowSource) : FlowSource { + override fun onStart(conn: FlowConnection, now: Long) { + delegate.onStart(conn, now) + } + + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + return delegate.onPull(conn, now, delta) + } + + override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + delegate.onConverge(conn, now, delta) + } + + override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + try { + delegate.onStop(conn, now, delta) + } finally { + complete(this) + } + } + + override fun toString(): String = "SimWorkloadLifecycle.Wrapper[delegate=$delegate]" + } } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt index 0bb24ed8..644eb497 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt @@ -65,14 +65,10 @@ class SimMachineTest { SimplePowerDriver(ConstantPowerModel(0.0)) ) - try { - machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) + machine.runWorkload(SimFlopsWorkload(2_000, utilization = 1.0)) - // Two cores execute 1000 MFlOps per second (1000 ms) - assertEquals(1000, clock.millis()) - } finally { - machine.close() - } + // Two cores execute 1000 MFlOps per second (1000 ms) + assertEquals(1000, clock.millis()) } @Test @@ -88,14 +84,10 @@ class SimMachineTest { SimplePowerDriver(ConstantPowerModel(0.0)) ) - try { - machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) + machine.runWorkload(SimFlopsWorkload(2_000, utilization = 1.0)) - // Two sockets with two cores execute 2000 MFlOps per second (500 ms) - assertEquals(500, clock.millis()) - } finally { - machine.close() - } + // Two sockets with two cores execute 2000 MFlOps per second (500 ms) + assertEquals(500, clock.millis()) } @Test @@ -109,16 +101,12 @@ class SimMachineTest { val source = SimPowerSource(engine, capacity = 1000.0) source.connect(machine.psu) - try { - coroutineScope { - launch { machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) } - assertAll( - { assertEquals(100.0, machine.psu.powerDraw) }, - { assertEquals(100.0, source.powerDraw) } - ) - } - } finally { - machine.close() + coroutineScope { + launch { machine.runWorkload(SimFlopsWorkload(2_000, utilization = 1.0)) } + assertAll( + { assertEquals(100.0, machine.psu.powerDraw) }, + { assertEquals(100.0, source.powerDraw) } + ) } } @@ -130,22 +118,20 @@ class SimMachineTest { SimplePowerDriver(ConstantPowerModel(0.0)) ) - try { - machine.run(object : SimWorkload { - override fun onStart(ctx: SimMachineContext) { - val cpu = ctx.cpus[0] - - cpu.capacity = cpu.model.frequency + 1000.0 - assertEquals(cpu.model.frequency, cpu.capacity) - cpu.capacity = -1.0 - assertEquals(0.0, cpu.capacity) - - ctx.close() - } - }) - } finally { - machine.close() - } + machine.runWorkload(object : SimWorkload { + override fun onStart(ctx: SimMachineContext) { + val cpu = ctx.cpus[0] + + cpu.capacity = cpu.model.frequency + 1000.0 + assertEquals(cpu.model.frequency, cpu.capacity) + cpu.capacity = -1.0 + assertEquals(0.0, cpu.capacity) + + ctx.close() + } + + override fun onStop(ctx: SimMachineContext) {} + }) } @Test @@ -156,16 +142,14 @@ class SimMachineTest { SimplePowerDriver(ConstantPowerModel(0.0)) ) - try { - machine.run(object : SimWorkload { - override fun onStart(ctx: SimMachineContext) { - assertEquals(32_000 * 4.0, ctx.memory.capacity) - ctx.close() - } - }) - } finally { - machine.close() - } + machine.runWorkload(object : SimWorkload { + override fun onStart(ctx: SimMachineContext) { + assertEquals(32_000 * 4.0, ctx.memory.capacity) + ctx.close() + } + + override fun onStop(ctx: SimMachineContext) {} + }) } @Test @@ -176,18 +160,16 @@ class SimMachineTest { SimplePowerDriver(ConstantPowerModel(0.0)) ) - try { - machine.run(object : SimWorkload { - override fun onStart(ctx: SimMachineContext) { - val lifecycle = SimWorkloadLifecycle(ctx) - ctx.memory.startConsumer(lifecycle.waitFor(FixedFlowSource(ctx.memory.capacity, utilization = 0.8))) - } - }) - - assertEquals(1250, clock.millis()) - } finally { - machine.close() - } + machine.runWorkload(object : SimWorkload { + override fun onStart(ctx: SimMachineContext) { + val lifecycle = SimWorkloadLifecycle(ctx) + ctx.memory.startConsumer(lifecycle.waitFor(FixedFlowSource(ctx.memory.capacity, utilization = 0.8))) + } + + override fun onStop(ctx: SimMachineContext) {} + }) + + assertEquals(1250, clock.millis()) } @Test @@ -202,19 +184,17 @@ class SimMachineTest { val adapter = (machine.peripherals[0] as SimNetworkAdapter) adapter.connect(SimNetworkSink(engine, adapter.bandwidth)) - try { - machine.run(object : SimWorkload { - override fun onStart(ctx: SimMachineContext) { - val lifecycle = SimWorkloadLifecycle(ctx) - val iface = ctx.net[0] - iface.tx.startConsumer(lifecycle.waitFor(FixedFlowSource(iface.bandwidth, utilization = 0.8))) - } - }) - - assertEquals(1250, clock.millis()) - } finally { - machine.close() - } + machine.runWorkload(object : SimWorkload { + override fun onStart(ctx: SimMachineContext) { + val lifecycle = SimWorkloadLifecycle(ctx) + val iface = ctx.net[0] + iface.tx.startConsumer(lifecycle.waitFor(FixedFlowSource(iface.bandwidth, utilization = 0.8))) + } + + override fun onStop(ctx: SimMachineContext) {} + }) + + assertEquals(1250, clock.millis()) } @Test @@ -226,19 +206,17 @@ class SimMachineTest { SimplePowerDriver(ConstantPowerModel(0.0)) ) - try { - machine.run(object : SimWorkload { - override fun onStart(ctx: SimMachineContext) { - val lifecycle = SimWorkloadLifecycle(ctx) - val disk = ctx.storage[0] - disk.read.startConsumer(lifecycle.waitFor(FixedFlowSource(disk.read.capacity, utilization = 0.8))) - } - }) - - assertEquals(1250, clock.millis()) - } finally { - machine.close() - } + machine.runWorkload(object : SimWorkload { + override fun onStart(ctx: SimMachineContext) { + val lifecycle = SimWorkloadLifecycle(ctx) + val disk = ctx.storage[0] + disk.read.startConsumer(lifecycle.waitFor(FixedFlowSource(disk.read.capacity, utilization = 0.8))) + } + + override fun onStop(ctx: SimMachineContext) {} + }) + + assertEquals(1250, clock.millis()) } @Test @@ -250,19 +228,17 @@ class SimMachineTest { SimplePowerDriver(ConstantPowerModel(0.0)) ) - try { - machine.run(object : SimWorkload { - override fun onStart(ctx: SimMachineContext) { - val lifecycle = SimWorkloadLifecycle(ctx) - val disk = ctx.storage[0] - disk.write.startConsumer(lifecycle.waitFor(FixedFlowSource(disk.write.capacity, utilization = 0.8))) - } - }) - - assertEquals(1250, clock.millis()) - } finally { - machine.close() - } + machine.runWorkload(object : SimWorkload { + override fun onStart(ctx: SimMachineContext) { + val lifecycle = SimWorkloadLifecycle(ctx) + val disk = ctx.storage[0] + disk.write.startConsumer(lifecycle.waitFor(FixedFlowSource(disk.write.capacity, utilization = 0.8))) + } + + override fun onStop(ctx: SimMachineContext) {} + }) + + assertEquals(1250, clock.millis()) } @Test @@ -275,13 +251,11 @@ class SimMachineTest { try { coroutineScope { - launch { machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) } + launch { machine.runWorkload(SimFlopsWorkload(2_000, utilization = 1.0)) } cancel() } } catch (_: CancellationException) { // Ignore - } finally { - machine.close() } assertEquals(0, clock.millis()) @@ -295,31 +269,14 @@ class SimMachineTest { SimplePowerDriver(ConstantPowerModel(0.0)) ) - try { - coroutineScope { - launch { - machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) - } + coroutineScope { + launch { + machine.runWorkload(SimFlopsWorkload(2_000, utilization = 1.0)) + } - assertThrows<IllegalStateException> { - machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) - } + assertThrows<IllegalStateException> { + machine.runWorkload(SimFlopsWorkload(2_000, utilization = 1.0)) } - } finally { - machine.close() } } - - @Test - fun testClose() = runBlockingSimulation { - val machine = SimBareMetalMachine( - FlowEngine(coroutineContext, clock), - machineModel, - SimplePowerDriver(ConstantPowerModel(0.0)) - ) - - machine.close() - assertDoesNotThrow { machine.close() } - assertThrows<IllegalStateException> { machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) } - } } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt index 6f32cf46..91855e8d 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt @@ -30,7 +30,6 @@ import org.junit.jupiter.api.assertAll import org.junit.jupiter.api.assertDoesNotThrow import org.opendc.simulator.compute.SimBareMetalMachine import org.opendc.simulator.compute.kernel.cpufreq.PerformanceScalingGovernor -import org.opendc.simulator.compute.kernel.interference.VmInterferenceGroup import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit @@ -38,6 +37,7 @@ import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.power.SimplePowerDriver +import org.opendc.simulator.compute.runWorkload import org.opendc.simulator.compute.workload.SimTrace import org.opendc.simulator.compute.workload.SimTraceFragment import org.opendc.simulator.compute.workload.SimTraceWorkload @@ -81,16 +81,16 @@ internal class SimFairShareHypervisorTest { val hypervisor = SimFairShareHypervisor(platform, null, PerformanceScalingGovernor(), null) launch { - machine.run(hypervisor) + machine.runWorkload(hypervisor) println("Hypervisor finished") } yield() - val vm = hypervisor.createMachine(model) - vm.run(workloadA) + val vm = hypervisor.newMachine(model) + vm.runWorkload(workloadA) yield() - machine.close() + machine.cancel() assertAll( { assertEquals(319781, hypervisor.counters.cpuActiveTime, "Active time does not match") }, @@ -132,22 +132,22 @@ internal class SimFairShareHypervisorTest { val hypervisor = SimFairShareHypervisor(platform, null, null, null) launch { - machine.run(hypervisor) + machine.runWorkload(hypervisor) } yield() coroutineScope { launch { - val vm = hypervisor.createMachine(model) - vm.run(workloadA) - vm.close() + val vm = hypervisor.newMachine(model) + vm.runWorkload(workloadA) + hypervisor.removeMachine(vm) } - val vm = hypervisor.createMachine(model) - vm.run(workloadB) - vm.close() + val vm = hypervisor.newMachine(model) + vm.runWorkload(workloadB) + hypervisor.removeMachine(vm) } yield() - machine.close() + machine.cancel() yield() assertAll( @@ -172,11 +172,11 @@ internal class SimFairShareHypervisorTest { assertDoesNotThrow { launch { - machine.run(hypervisor) + machine.runWorkload(hypervisor) } } - machine.close() + machine.cancel() } @Test @@ -187,12 +187,11 @@ internal class SimFairShareHypervisorTest { memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } ) - val groups = listOf( - VmInterferenceGroup(targetLoad = 0.0, score = 0.9, members = setOf("a", "b")), - VmInterferenceGroup(targetLoad = 0.0, score = 0.6, members = setOf("a", "c")), - VmInterferenceGroup(targetLoad = 0.1, score = 0.8, members = setOf("a", "n")) - ) - val interferenceModel = VmInterferenceModel(groups) + val interferenceModel = VmInterferenceModel.builder() + .addGroup(targetLoad = 0.0, score = 0.9, members = setOf("a", "b")) + .addGroup(targetLoad = 0.0, score = 0.6, members = setOf("a", "c")) + .addGroup(targetLoad = 0.1, score = 0.8, members = setOf("a", "n")) + .build() val platform = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine( @@ -221,20 +220,20 @@ internal class SimFairShareHypervisorTest { ) launch { - machine.run(hypervisor) + machine.runWorkload(hypervisor) } coroutineScope { launch { - val vm = hypervisor.createMachine(model, "a") - vm.run(workloadA) - vm.close() + val vm = hypervisor.newMachine(model, "a") + vm.runWorkload(workloadA) + hypervisor.removeMachine(vm) } - val vm = hypervisor.createMachine(model, "b") - vm.run(workloadB) - vm.close() + val vm = hypervisor.newMachine(model, "b") + vm.runWorkload(workloadB) + hypervisor.removeMachine(vm) } - machine.close() + machine.cancel() } } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt index 02d308ff..823a0ae3 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt @@ -36,6 +36,7 @@ import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.power.SimplePowerDriver +import org.opendc.simulator.compute.runWorkload import org.opendc.simulator.compute.workload.* import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.flow.FlowEngine @@ -76,13 +77,13 @@ internal class SimSpaceSharedHypervisorTest { val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) val hypervisor = SimSpaceSharedHypervisor(engine, null, null) - launch { machine.run(hypervisor) } - val vm = hypervisor.createMachine(machineModel) - vm.run(workloadA) + launch { machine.runWorkload(hypervisor) } + val vm = hypervisor.newMachine(machineModel) + vm.runWorkload(workloadA) yield() - vm.close() - machine.close() + hypervisor.removeMachine(vm) + machine.cancel() assertEquals(5 * 60L * 4000, clock.millis()) { "Took enough time" } } @@ -98,12 +99,13 @@ internal class SimSpaceSharedHypervisorTest { val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) val hypervisor = SimSpaceSharedHypervisor(engine, null, null) - launch { machine.run(hypervisor) } + launch { machine.runWorkload(hypervisor) } yield() - val vm = hypervisor.createMachine(machineModel) - vm.run(workload) - vm.close() - machine.close() + val vm = hypervisor.newMachine(machineModel) + vm.runWorkload(workload) + hypervisor.removeMachine(vm) + + machine.cancel() assertEquals(duration, clock.millis()) { "Took enough time" } } @@ -121,11 +123,11 @@ internal class SimSpaceSharedHypervisorTest { ) val hypervisor = SimSpaceSharedHypervisor(engine, null, null) - launch { machine.run(hypervisor) } + launch { machine.runWorkload(hypervisor) } yield() - val vm = hypervisor.createMachine(machineModel) - vm.run(workload) - machine.close() + val vm = hypervisor.newMachine(machineModel) + vm.runWorkload(workload) + machine.cancel() assertEquals(duration, clock.millis()) { "Took enough time" } } @@ -142,19 +144,20 @@ internal class SimSpaceSharedHypervisorTest { ) val hypervisor = SimSpaceSharedHypervisor(engine, null, null) - launch { machine.run(hypervisor) } + launch { machine.runWorkload(hypervisor) } yield() - val vm = hypervisor.createMachine(machineModel) - vm.run(SimRuntimeWorkload(duration)) - vm.close() + val vm = hypervisor.newMachine(machineModel) + vm.runWorkload(SimRuntimeWorkload(duration)) + hypervisor.removeMachine(vm) yield() - val vm2 = hypervisor.createMachine(machineModel) - vm2.run(SimRuntimeWorkload(duration)) - vm2.close() - machine.close() + val vm2 = hypervisor.newMachine(machineModel) + vm2.runWorkload(SimRuntimeWorkload(duration)) + hypervisor.removeMachine(vm2) + + machine.cancel() assertEquals(duration * 2, clock.millis()) { "Took enough time" } } @@ -168,17 +171,17 @@ internal class SimSpaceSharedHypervisorTest { val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) val hypervisor = SimSpaceSharedHypervisor(engine, null, null) - launch { machine.run(hypervisor) } + launch { machine.runWorkload(hypervisor) } yield() - hypervisor.createMachine(machineModel) + hypervisor.newMachine(machineModel) assertAll( { assertFalse(hypervisor.canFit(machineModel)) }, - { assertThrows<IllegalArgumentException> { hypervisor.createMachine(machineModel) } } + { assertThrows<IllegalArgumentException> { hypervisor.newMachine(machineModel) } } ) - machine.close() + machine.cancel() } /** @@ -192,16 +195,16 @@ internal class SimSpaceSharedHypervisorTest { ) val hypervisor = SimSpaceSharedHypervisor(interpreter, null, null) - launch { machine.run(hypervisor) } + launch { machine.runWorkload(hypervisor) } yield() - hypervisor.createMachine(machineModel).close() + hypervisor.removeMachine(hypervisor.newMachine(machineModel)) assertAll( { assertTrue(hypervisor.canFit(machineModel)) }, - { assertDoesNotThrow { hypervisor.createMachine(machineModel) } } + { assertDoesNotThrow { hypervisor.newMachine(machineModel) } } ) - machine.close() + machine.cancel() } } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt index 574860e8..aa91984a 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt @@ -30,6 +30,7 @@ import org.opendc.simulator.compute.SimBareMetalMachine import org.opendc.simulator.compute.model.* import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.power.SimplePowerDriver +import org.opendc.simulator.compute.runWorkload import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.flow.FlowEngine @@ -67,13 +68,9 @@ class SimTraceWorkloadTest { offset = 0 ) - try { - machine.run(workload) + machine.runWorkload(workload) - assertEquals(4000, clock.millis()) - } finally { - machine.close() - } + assertEquals(4000, clock.millis()) } @Test @@ -94,13 +91,9 @@ class SimTraceWorkloadTest { offset = 1000 ) - try { - machine.run(workload) + machine.runWorkload(workload) - assertEquals(5000, clock.millis()) - } finally { - machine.close() - } + assertEquals(5000, clock.millis()) } @Test @@ -121,14 +114,10 @@ class SimTraceWorkloadTest { offset = 0 ) - try { - delay(1000L) - machine.run(workload) + delay(1000L) + machine.runWorkload(workload) - assertEquals(4000, clock.millis()) - } finally { - machine.close() - } + assertEquals(4000, clock.millis()) } @Test @@ -149,12 +138,8 @@ class SimTraceWorkloadTest { offset = 0 ) - try { - machine.run(workload) + machine.runWorkload(workload) - assertEquals(4000, clock.millis()) - } finally { - machine.close() - } + assertEquals(4000, clock.millis()) } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt index 04ba7f21..a7877546 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt @@ -62,13 +62,21 @@ public interface FlowMultiplexer { public val counters: FlowCounters /** - * Create a new input on this multiplexer. + * Create a new input on this multiplexer with a coupled capacity. * * @param key The key of the interference member to which the input belongs. */ public fun newInput(key: InterferenceKey? = null): FlowConsumer /** + * Create a new input on this multiplexer with the specified [capacity]. + * + * @param capacity The capacity of the input. + * @param key The key of the interference member to which the input belongs. + */ + public fun newInput(capacity: Double, key: InterferenceKey? = null): FlowConsumer + + /** * Remove [input] from this multiplexer. */ public fun removeInput(input: FlowConsumer) diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt index 125d10fe..b68a8baa 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt @@ -78,6 +78,8 @@ public class ForwardingFlowMultiplexer(private val engine: FlowEngine) : FlowMul return input } + override fun newInput(capacity: Double, key: InterferenceKey?): FlowConsumer = newInput(key) + override fun removeInput(input: FlowConsumer) { if (!_inputs.remove(input)) { return diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index a0fb8a4e..3d26efda 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -86,7 +86,15 @@ public class MaxMinFlowMultiplexer( private val scheduler = Scheduler(engine, parent) override fun newInput(key: InterferenceKey?): FlowConsumer { - val provider = Input(engine, scheduler, interferenceDomain, key, scheduler.capacity) + return newInput(isCoupled = true, Double.POSITIVE_INFINITY, key) + } + + override fun newInput(capacity: Double, key: InterferenceKey?): FlowConsumer { + return newInput(isCoupled = false, capacity, key) + } + + private fun newInput(isCoupled: Boolean, initialCapacity: Double, key: InterferenceKey?): FlowConsumer { + val provider = Input(engine, scheduler, interferenceDomain, key, isCoupled, initialCapacity) _inputs.add(provider) return provider } @@ -206,7 +214,10 @@ public class MaxMinFlowMultiplexer( // Disable timers and convergence of the source if one of the output manages it input.shouldConsumerConverge = !hasActivationOutput input.enableTimers = !hasActivationOutput - input.capacity = capacity + + if (input.isCoupled) { + input.capacity = capacity + } trigger(_clock.millis()) } @@ -340,7 +351,9 @@ public class MaxMinFlowMultiplexer( capacity = newCapacity for (input in _activeInputs) { - input.capacity = newCapacity + if (input.isCoupled) { + input.capacity = newCapacity + } } // Sort outputs by their capacity @@ -495,6 +508,7 @@ public class MaxMinFlowMultiplexer( private val scheduler: Scheduler, private val interferenceDomain: InterferenceDomain?, @JvmField val key: InterferenceKey?, + @JvmField val isCoupled: Boolean, initialCapacity: Double, ) : FlowConsumer, FlowConsumerLogic, Comparable<Input> { /** diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt index 738ec38b..418dc201 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt @@ -20,6 +20,8 @@ * SOFTWARE. */ +@file:Suppress("PropertyName") + package org.opendc.telemetry.compute import io.opentelemetry.api.common.AttributeKey @@ -30,10 +32,9 @@ import io.opentelemetry.sdk.resources.Resource import io.opentelemetry.semconv.resource.attributes.ResourceAttributes import org.opendc.telemetry.compute.table.* import java.time.Instant -import kotlin.math.roundToLong /** - * Helper class responsible for aggregating [MetricData] into [ServiceData], [HostData] and [ServerData]. + * Helper class responsible for aggregating [MetricData] into [ServiceTableReader], [HostTableReader] and [ServerTableReader]. */ public class ComputeMetricAggregator { private val _service = ServiceAggregator() @@ -59,34 +60,28 @@ public class ComputeMetricAggregator { service.recordTimestamp(point) when (point.attributes[STATE_KEY]) { - "up" -> service.hostsUp = point.value.toInt() - "down" -> service.hostsDown = point.value.toInt() + "up" -> service._hostsUp = point.value.toInt() + "down" -> service._hostsDown = point.value.toInt() } } } "scheduler.servers" -> { for (point in metric.longSumData.points) { when (point.attributes[STATE_KEY]) { - "pending" -> service.serversPending = point.value.toInt() - "active" -> service.serversActive = point.value.toInt() + "pending" -> service._serversPending = point.value.toInt() + "active" -> service._serversActive = point.value.toInt() } } } "scheduler.attempts" -> { for (point in metric.longSumData.points) { when (point.attributes[RESULT_KEY]) { - "success" -> service.attemptsSuccess = point.value.toInt() - "failure" -> service.attemptsFailure = point.value.toInt() - "error" -> service.attemptsError = point.value.toInt() + "success" -> service._attemptsSuccess = point.value.toInt() + "failure" -> service._attemptsFailure = point.value.toInt() + "error" -> service._attemptsError = point.value.toInt() } } } - "scheduler.latency" -> { - for (point in metric.doubleHistogramData.points) { - val server = getServer(servers, point) ?: continue - server.schedulingLatency = (point.sum / point.count).roundToLong() - } - } // SimHost "system.guests" -> { @@ -94,10 +89,10 @@ public class ComputeMetricAggregator { for (point in metric.longSumData.points) { when (point.attributes[STATE_KEY]) { - "terminated" -> agg.guestsTerminated = point.value.toInt() - "running" -> agg.guestsRunning = point.value.toInt() - "error" -> agg.guestsRunning = point.value.toInt() - "invalid" -> agg.guestsInvalid = point.value.toInt() + "terminated" -> agg._guestsTerminated = point.value.toInt() + "running" -> agg._guestsRunning = point.value.toInt() + "error" -> agg._guestsRunning = point.value.toInt() + "invalid" -> agg._guestsInvalid = point.value.toInt() } } } @@ -108,24 +103,24 @@ public class ComputeMetricAggregator { val server = getServer(servers, point) if (server != null) { - server.cpuLimit = point.value - server.host = agg.host + server._cpuLimit = point.value + server._host = agg.host } else { - agg.cpuLimit = point.value + agg._cpuLimit = point.value } } } "system.cpu.usage" -> { val agg = getHost(hosts, resource) ?: continue - agg.cpuUsage = metric.doubleGaugeData.points.first().value + agg._cpuUsage = metric.doubleGaugeData.points.first().value } "system.cpu.demand" -> { val agg = getHost(hosts, resource) ?: continue - agg.cpuDemand = metric.doubleGaugeData.points.first().value + agg._cpuDemand = metric.doubleGaugeData.points.first().value } "system.cpu.utilization" -> { val agg = getHost(hosts, resource) ?: continue - agg.cpuUtilization = metric.doubleGaugeData.points.first().value + agg._cpuUtilization = metric.doubleGaugeData.points.first().value } "system.cpu.time" -> { val agg = getHost(hosts, resource) ?: continue @@ -135,29 +130,29 @@ public class ComputeMetricAggregator { val state = point.attributes[STATE_KEY] if (server != null) { when (state) { - "active" -> server.cpuActiveTime = point.value - "idle" -> server.cpuIdleTime = point.value - "steal" -> server.cpuStealTime = point.value - "lost" -> server.cpuLostTime = point.value + "active" -> server._cpuActiveTime = point.value + "idle" -> server._cpuIdleTime = point.value + "steal" -> server._cpuStealTime = point.value + "lost" -> server._cpuLostTime = point.value } - server.host = agg.host + server._host = agg.host } else { when (state) { - "active" -> agg.cpuActiveTime = point.value - "idle" -> agg.cpuIdleTime = point.value - "steal" -> agg.cpuStealTime = point.value - "lost" -> agg.cpuLostTime = point.value + "active" -> agg._cpuActiveTime = point.value + "idle" -> agg._cpuIdleTime = point.value + "steal" -> agg._cpuStealTime = point.value + "lost" -> agg._cpuLostTime = point.value } } } } "system.power.usage" -> { val agg = getHost(hosts, resource) ?: continue - agg.powerUsage = metric.doubleGaugeData.points.first().value + agg._powerUsage = metric.doubleGaugeData.points.first().value } "system.power.total" -> { val agg = getHost(hosts, resource) ?: continue - agg.powerTotal = metric.doubleSumData.points.first().value + agg._powerTotal = metric.doubleSumData.points.first().value } "system.time" -> { val agg = getHost(hosts, resource) ?: continue @@ -169,16 +164,16 @@ public class ComputeMetricAggregator { server.recordTimestamp(point) when (point.attributes[STATE_KEY]) { - "up" -> server.uptime = point.value - "down" -> server.downtime = point.value + "up" -> server._uptime = point.value + "down" -> server._downtime = point.value } - server.host = agg.host + server._host = agg.host } else { agg.recordTimestamp(point) when (point.attributes[STATE_KEY]) { - "up" -> agg.uptime = point.value - "down" -> agg.downtime = point.value + "up" -> agg._uptime = point.value + "down" -> agg._downtime = point.value } } } @@ -190,13 +185,20 @@ public class ComputeMetricAggregator { val server = getServer(servers, point) if (server != null) { - server.bootTime = point.value - server.host = agg.host + server._bootTime = Instant.ofEpochMilli(point.value) + server._host = agg.host } else { - agg.bootTime = point.value + agg._bootTime = Instant.ofEpochMilli(point.value) } } } + "system.time.provision" -> { + for (point in metric.longGaugeData.points) { + val server = getServer(servers, point) ?: continue + server.recordTimestamp(point) + server._provisionTime = Instant.ofEpochMilli(point.value) + } + } } } } @@ -205,14 +207,16 @@ public class ComputeMetricAggregator { * Collect the data via the [monitor]. */ public fun collect(monitor: ComputeMonitor) { - monitor.record(_service.collect()) + monitor.record(_service) for (host in _hosts.values) { - monitor.record(host.collect()) + monitor.record(host) + host.reset() } for (server in _servers.values) { - monitor.record(server.collect()) + monitor.record(server) + server.reset() } } @@ -222,7 +226,7 @@ public class ComputeMetricAggregator { private fun getHost(hosts: MutableMap<String, HostAggregator>, resource: Resource): HostAggregator? { val id = resource.attributes[HOST_ID] return if (id != null) { - hosts.computeIfAbsent(id) { HostAggregator(resource) } + hosts.getOrPut(id) { HostAggregator(resource) } } else { null } @@ -234,7 +238,7 @@ public class ComputeMetricAggregator { private fun getServer(servers: MutableMap<String, ServerAggregator>, point: PointData): ServerAggregator? { val id = point.attributes[ResourceAttributes.HOST_ID] return if (id != null) { - servers.computeIfAbsent(id) { ServerAggregator(point.attributes) } + servers.getOrPut(id) { ServerAggregator(point.attributes) } } else { null } @@ -243,50 +247,55 @@ public class ComputeMetricAggregator { /** * An aggregator for service metrics before they are reported. */ - internal class ServiceAggregator { - private var timestamp = Long.MIN_VALUE + internal class ServiceAggregator : ServiceTableReader { + private var _timestamp: Instant = Instant.MIN + override val timestamp: Instant + get() = _timestamp - @JvmField var hostsUp = 0 - @JvmField var hostsDown = 0 + override val hostsUp: Int + get() = _hostsUp + @JvmField var _hostsUp = 0 - @JvmField var serversPending = 0 - @JvmField var serversActive = 0 + override val hostsDown: Int + get() = _hostsDown + @JvmField var _hostsDown = 0 - @JvmField var attemptsSuccess = 0 - @JvmField var attemptsFailure = 0 - @JvmField var attemptsError = 0 + override val serversPending: Int + get() = _serversPending + @JvmField var _serversPending = 0 - /** - * Finish the aggregation for this cycle. - */ - fun collect(): ServiceData { - val now = Instant.ofEpochMilli(timestamp) - return toServiceData(now) - } + override val serversActive: Int + get() = _serversActive + @JvmField var _serversActive = 0 - /** - * Convert the aggregator state to an immutable [ServiceData]. - */ - private fun toServiceData(now: Instant): ServiceData { - return ServiceData(now, hostsUp, hostsDown, serversPending, serversActive, attemptsSuccess, attemptsFailure, attemptsError) - } + override val attemptsSuccess: Int + get() = _attemptsSuccess + @JvmField var _attemptsSuccess = 0 + + override val attemptsFailure: Int + get() = _attemptsFailure + @JvmField var _attemptsFailure = 0 + + override val attemptsError: Int + get() = _attemptsError + @JvmField var _attemptsError = 0 /** * Record the timestamp of a [point] for this aggregator. */ fun recordTimestamp(point: PointData) { - timestamp = point.epochNanos / 1_000_000L // ns to ms + _timestamp = Instant.ofEpochMilli(point.epochNanos / 1_000_000L) // ns to ms } } /** * An aggregator for host metrics before they are reported. */ - internal class HostAggregator(resource: Resource) { + internal class HostAggregator(resource: Resource) : HostTableReader { /** * The static information about this host. */ - val host = HostInfo( + override val host = HostInfo( resource.attributes[HOST_ID]!!, resource.attributes[HOST_NAME] ?: "", resource.attributes[HOST_ARCH] ?: "", @@ -294,111 +303,127 @@ public class ComputeMetricAggregator { resource.attributes[HOST_MEM_CAPACITY] ?: 0, ) - private var timestamp = Long.MIN_VALUE + override val timestamp: Instant + get() = _timestamp + private var _timestamp = Instant.MIN + + override val guestsTerminated: Int + get() = _guestsTerminated + @JvmField var _guestsTerminated = 0 + + override val guestsRunning: Int + get() = _guestsRunning + @JvmField var _guestsRunning = 0 + + override val guestsError: Int + get() = _guestsError + @JvmField var _guestsError = 0 + + override val guestsInvalid: Int + get() = _guestsInvalid + @JvmField var _guestsInvalid = 0 + + override val cpuLimit: Double + get() = _cpuLimit + @JvmField var _cpuLimit = 0.0 - @JvmField var guestsTerminated = 0 - @JvmField var guestsRunning = 0 - @JvmField var guestsError = 0 - @JvmField var guestsInvalid = 0 + override val cpuUsage: Double + get() = _cpuUsage + @JvmField var _cpuUsage = 0.0 - @JvmField var cpuLimit = 0.0 - @JvmField var cpuUsage = 0.0 - @JvmField var cpuDemand = 0.0 - @JvmField var cpuUtilization = 0.0 + override val cpuDemand: Double + get() = _cpuDemand + @JvmField var _cpuDemand = 0.0 - @JvmField var cpuActiveTime = 0L - @JvmField var cpuIdleTime = 0L - @JvmField var cpuStealTime = 0L - @JvmField var cpuLostTime = 0L + override val cpuUtilization: Double + get() = _cpuUtilization + @JvmField var _cpuUtilization = 0.0 + + override val cpuActiveTime: Long + get() = _cpuActiveTime - previousCpuActiveTime + @JvmField var _cpuActiveTime = 0L private var previousCpuActiveTime = 0L + + override val cpuIdleTime: Long + get() = _cpuIdleTime - previousCpuIdleTime + @JvmField var _cpuIdleTime = 0L private var previousCpuIdleTime = 0L + + override val cpuStealTime: Long + get() = _cpuStealTime - previousCpuStealTime + @JvmField var _cpuStealTime = 0L private var previousCpuStealTime = 0L + + override val cpuLostTime: Long + get() = _cpuLostTime - previousCpuLostTime + @JvmField var _cpuLostTime = 0L private var previousCpuLostTime = 0L - @JvmField var powerUsage = 0.0 - @JvmField var powerTotal = 0.0 + override val powerUsage: Double + get() = _powerUsage + @JvmField var _powerUsage = 0.0 + + override val powerTotal: Double + get() = _powerTotal - previousPowerTotal + @JvmField var _powerTotal = 0.0 private var previousPowerTotal = 0.0 - @JvmField var uptime = 0L + override val uptime: Long + get() = _uptime - previousUptime + @JvmField var _uptime = 0L private var previousUptime = 0L - @JvmField var downtime = 0L + + override val downtime: Long + get() = _downtime - previousDowntime + @JvmField var _downtime = 0L private var previousDowntime = 0L - @JvmField var bootTime = Long.MIN_VALUE + + override val bootTime: Instant? + get() = _bootTime + @JvmField var _bootTime: Instant? = null /** * Finish the aggregation for this cycle. */ - fun collect(): HostData { - val now = Instant.ofEpochMilli(timestamp) - val data = toHostData(now) - + fun reset() { // Reset intermediate state for next aggregation - previousCpuActiveTime = cpuActiveTime - previousCpuIdleTime = cpuIdleTime - previousCpuStealTime = cpuStealTime - previousCpuLostTime = cpuLostTime - previousPowerTotal = powerTotal - previousUptime = uptime - previousDowntime = downtime - - guestsTerminated = 0 - guestsRunning = 0 - guestsError = 0 - guestsInvalid = 0 - - cpuLimit = 0.0 - cpuUsage = 0.0 - cpuDemand = 0.0 - cpuUtilization = 0.0 - - powerUsage = 0.0 - - return data - } - - /** - * Convert the aggregator state to an immutable [HostData] instance. - */ - private fun toHostData(now: Instant): HostData { - return HostData( - now, - host, - guestsTerminated, - guestsRunning, - guestsError, - guestsInvalid, - cpuLimit, - cpuUsage, - cpuDemand, - cpuUtilization, - cpuActiveTime - previousCpuActiveTime, - cpuIdleTime - previousCpuIdleTime, - cpuStealTime - previousCpuStealTime, - cpuLostTime - previousCpuLostTime, - powerUsage, - powerTotal - previousPowerTotal, - uptime - previousUptime, - downtime - previousDowntime, - if (bootTime != Long.MIN_VALUE) Instant.ofEpochMilli(bootTime) else null - ) + previousCpuActiveTime = _cpuActiveTime + previousCpuIdleTime = _cpuIdleTime + previousCpuStealTime = _cpuStealTime + previousCpuLostTime = _cpuLostTime + previousPowerTotal = _powerTotal + previousUptime = _uptime + previousDowntime = _downtime + + _guestsTerminated = 0 + _guestsRunning = 0 + _guestsError = 0 + _guestsInvalid = 0 + + _cpuLimit = 0.0 + _cpuUsage = 0.0 + _cpuDemand = 0.0 + _cpuUtilization = 0.0 + + _powerUsage = 0.0 } /** * Record the timestamp of a [point] for this aggregator. */ fun recordTimestamp(point: PointData) { - timestamp = point.epochNanos / 1_000_000L // ns to ms + _timestamp = Instant.ofEpochMilli(point.epochNanos / 1_000_000L) // ns to ms } } /** * An aggregator for server metrics before they are reported. */ - internal class ServerAggregator(attributes: Attributes) { + internal class ServerAggregator(attributes: Attributes) : ServerTableReader { /** * The static information about this server. */ - val server = ServerInfo( + override val server = ServerInfo( attributes[ResourceAttributes.HOST_ID]!!, attributes[ResourceAttributes.HOST_NAME]!!, attributes[ResourceAttributes.HOST_TYPE]!!, @@ -412,70 +437,76 @@ public class ComputeMetricAggregator { /** * The [HostInfo] of the host on which the server is hosted. */ - @JvmField var host: HostInfo? = null + override val host: HostInfo? + get() = _host + @JvmField var _host: HostInfo? = null + + private var _timestamp = Instant.MIN + override val timestamp: Instant + get() = _timestamp - private var timestamp = Long.MIN_VALUE - @JvmField var uptime: Long = 0 + override val uptime: Long + get() = _uptime - previousUptime + @JvmField var _uptime: Long = 0 private var previousUptime = 0L - @JvmField var downtime: Long = 0 + + override val downtime: Long + get() = _downtime - previousDowntime + @JvmField var _downtime: Long = 0 private var previousDowntime = 0L - @JvmField var bootTime: Long = 0 - @JvmField var schedulingLatency = 0L - @JvmField var cpuLimit = 0.0 - @JvmField var cpuActiveTime = 0L - @JvmField var cpuIdleTime = 0L - @JvmField var cpuStealTime = 0L - @JvmField var cpuLostTime = 0L + + override val provisionTime: Instant? + get() = _provisionTime + @JvmField var _provisionTime: Instant? = null + + override val bootTime: Instant? + get() = _bootTime + @JvmField var _bootTime: Instant? = null + + override val cpuLimit: Double + get() = _cpuLimit + @JvmField var _cpuLimit = 0.0 + + override val cpuActiveTime: Long + get() = _cpuActiveTime - previousCpuActiveTime + @JvmField var _cpuActiveTime = 0L private var previousCpuActiveTime = 0L + + override val cpuIdleTime: Long + get() = _cpuIdleTime - previousCpuIdleTime + @JvmField var _cpuIdleTime = 0L private var previousCpuIdleTime = 0L + + override val cpuStealTime: Long + get() = _cpuStealTime - previousCpuStealTime + @JvmField var _cpuStealTime = 0L private var previousCpuStealTime = 0L + + override val cpuLostTime: Long + get() = _cpuLostTime - previousCpuLostTime + @JvmField var _cpuLostTime = 0L private var previousCpuLostTime = 0L /** * Finish the aggregation for this cycle. */ - fun collect(): ServerData { - val now = Instant.ofEpochMilli(timestamp) - val data = toServerData(now) - - previousUptime = uptime - previousDowntime = downtime + fun reset() { + previousUptime = _uptime + previousDowntime = _downtime previousCpuActiveTime = cpuActiveTime previousCpuIdleTime = cpuIdleTime previousCpuStealTime = cpuStealTime previousCpuLostTime = cpuLostTime - host = null - cpuLimit = 0.0 - - return data - } - - /** - * Convert the aggregator state into an immutable [ServerData]. - */ - private fun toServerData(now: Instant): ServerData { - return ServerData( - now, - server, - host, - uptime - previousUptime, - downtime - previousDowntime, - if (bootTime != Long.MIN_VALUE) Instant.ofEpochMilli(bootTime) else null, - schedulingLatency, - cpuLimit, - cpuActiveTime - previousCpuActiveTime, - cpuIdleTime - previousCpuIdleTime, - cpuStealTime - previousCpuStealTime, - cpuLostTime - previousCpuLostTime - ) + _host = null + _cpuLimit = 0.0 } /** * Record the timestamp of a [point] for this aggregator. */ fun recordTimestamp(point: PointData) { - timestamp = point.epochNanos / 1_000_000L // ns to ms + _timestamp = Instant.ofEpochMilli(point.epochNanos / 1_000_000L) // ns to ms } } diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt index d51bcab4..64b5f337 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt @@ -22,26 +22,26 @@ package org.opendc.telemetry.compute -import org.opendc.telemetry.compute.table.HostData -import org.opendc.telemetry.compute.table.ServerData -import org.opendc.telemetry.compute.table.ServiceData +import org.opendc.telemetry.compute.table.HostTableReader +import org.opendc.telemetry.compute.table.ServerTableReader +import org.opendc.telemetry.compute.table.ServiceTableReader /** * A monitor that tracks the metrics and events of the OpenDC Compute service. */ public interface ComputeMonitor { /** - * Record the specified [data]. + * Record an entry with the specified [reader]. */ - public fun record(data: ServerData) {} + public fun record(reader: ServerTableReader) {} /** - * Record the specified [data]. + * Record an entry with the specified [reader]. */ - public fun record(data: HostData) {} + public fun record(reader: HostTableReader) {} /** - * Record the specified [data]. + * Record an entry with the specified [reader]. */ - public fun record(data: ServiceData) {} + public fun record(reader: ServiceTableReader) {} } diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt index ce89061b..41315b15 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt @@ -24,6 +24,8 @@ package org.opendc.telemetry.compute import io.opentelemetry.sdk.metrics.export.MetricProducer import org.opendc.telemetry.compute.table.ServiceData +import org.opendc.telemetry.compute.table.ServiceTableReader +import org.opendc.telemetry.compute.table.toServiceData /** * Collect the metrics of the compute service. @@ -32,8 +34,8 @@ public fun collectServiceMetrics(metricProducer: MetricProducer): ServiceData { lateinit var serviceData: ServiceData val agg = ComputeMetricAggregator() val monitor = object : ComputeMonitor { - override fun record(data: ServiceData) { - serviceData = data + override fun record(reader: ServiceTableReader) { + serviceData = reader.toServiceData() } } diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostTableReader.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostTableReader.kt new file mode 100644 index 00000000..1e1ad94e --- /dev/null +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostTableReader.kt @@ -0,0 +1,125 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.telemetry.compute.table + +import java.time.Instant + +/** + * An interface that is used to read a row of a host trace entry. + */ +public interface HostTableReader { + /** + * The timestamp of the current entry of the reader. + */ + public val timestamp: Instant + + /** + * The [HostInfo] of the host to which the row belongs to. + */ + public val host: HostInfo + + /** + * The number of guests that are in a terminated state. + */ + public val guestsTerminated: Int + + /** + * The number of guests that are in a running state. + */ + public val guestsRunning: Int + + /** + * The number of guests that are in an error state. + */ + public val guestsError: Int + + /** + * The number of guests that are in an unknown state. + */ + public val guestsInvalid: Int + + /** + * The capacity of the CPUs in the host (in MHz). + */ + public val cpuLimit: Double + + /** + * The usage of all CPUs in the host (in MHz). + */ + public val cpuUsage: Double + + /** + * The demand of all vCPUs of the guests (in MHz) + */ + public val cpuDemand: Double + + /** + * The CPU utilization of the host. + */ + public val cpuUtilization: Double + + /** + * The duration (in seconds) that a CPU was active in the host. + */ + public val cpuActiveTime: Long + + /** + * The duration (in seconds) that a CPU was idle in the host. + */ + public val cpuIdleTime: Long + + /** + * The duration (in seconds) that a vCPU wanted to run, but no capacity was available. + */ + public val cpuStealTime: Long + + /** + * The duration (in seconds) of CPU time that was lost due to interference. + */ + public val cpuLostTime: Long + + /** + * The current power usage of the host in W. + */ + public val powerUsage: Double + + /** + * The total power consumption of the host since last time in J. + */ + public val powerTotal: Double + + /** + * The uptime of the host since last time in ms. + */ + public val uptime: Long + + /** + * The downtime of the host since last time in ms. + */ + public val downtime: Long + + /** + * The [Instant] at which the host booted. + */ + public val bootTime: Instant? +} diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerTableReader.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerTableReader.kt new file mode 100644 index 00000000..c23d1467 --- /dev/null +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerTableReader.kt @@ -0,0 +1,90 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.telemetry.compute.table + +import java.time.Instant + +/** + * An interface that is used to read a row of a server trace entry. + */ +public interface ServerTableReader { + /** + * The timestamp of the current entry of the reader. + */ + public val timestamp: Instant + + /** + * The [ServerInfo] of the server to which the row belongs to. + */ + public val server: ServerInfo + + /** + * The [HostInfo] of the host on which the server is hosted or `null` if it has no host. + */ + public val host: HostInfo? + + /** + * The uptime of the host since last time in ms. + */ + public val uptime: Long + + /** + * The downtime of the host since last time in ms. + */ + public val downtime: Long + + /** + * The [Instant] at which the server was enqueued for the scheduler. + */ + public val provisionTime: Instant? + + /** + * The [Instant] at which the server booted. + */ + public val bootTime: Instant? + + /** + * The capacity of the CPUs of the servers (in MHz). + */ + public val cpuLimit: Double + + /** + * The duration (in seconds) that a CPU was active in the server. + */ + public val cpuActiveTime: Long + + /** + * The duration (in seconds) that a CPU was idle in the server. + */ + public val cpuIdleTime: Long + + /** + * The duration (in seconds) that a vCPU wanted to run, but no capacity was available. + */ + public val cpuStealTime: Long + + /** + * The duration (in seconds) of CPU time that was lost due to interference. + */ + public val cpuLostTime: Long +} diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt index 6db1399d..39bf96f4 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt @@ -37,3 +37,10 @@ public data class ServiceData( val attemptsFailure: Int, val attemptsError: Int ) + +/** + * Convert a [ServiceTableReader] into a persistent object. + */ +public fun ServiceTableReader.toServiceData(): ServiceData { + return ServiceData(timestamp, hostsUp, hostsDown, serversPending, serversActive, attemptsSuccess, attemptsFailure, attemptsError) +} diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReaderTest.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceTableReader.kt index c79f0584..908f6748 100644 --- a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReaderTest.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceTableReader.kt @@ -20,26 +20,51 @@ * SOFTWARE. */ -package org.opendc.compute.workload.util +package org.opendc.telemetry.compute.table -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertAll +import java.time.Instant /** - * Test suite for the [PerformanceInterferenceReader] class. + * An interface that is used to read a row of a service trace entry. */ -class PerformanceInterferenceReaderTest { - @Test - fun testSmoke() { - val input = checkNotNull(PerformanceInterferenceReader::class.java.getResourceAsStream("/perf-interference.json")) - val result = PerformanceInterferenceReader().read(input) - - assertAll( - { assertEquals(2, result.size) }, - { assertEquals(setOf("vm_a", "vm_c", "vm_x", "vm_y"), result[0].members) }, - { assertEquals(0.0, result[0].targetLoad, 0.001) }, - { assertEquals(0.8830158730158756, result[0].score, 0.001) } - ) - } +public interface ServiceTableReader { + /** + * The timestamp of the current entry of the reader. + */ + public val timestamp: Instant + + /** + * The number of hosts that are up at this instant. + */ + public val hostsUp: Int + + /** + * The number of hosts that are down at this instant. + */ + public val hostsDown: Int + + /** + * The number of servers that are pending to be scheduled. + */ + public val serversPending: Int + + /** + * The number of servers that are currently active. + */ + public val serversActive: Int + + /** + * The scheduling attempts that were successful. + */ + public val attemptsSuccess: Int + + /** + * The scheduling attempts that were unsuccessful due to client error. + */ + public val attemptsFailure: Int + + /** + * The scheduling attempts that were unsuccessful due to scheduler error. + */ + public val attemptsError: Int } diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt index 59308e11..a1bc869e 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt @@ -32,7 +32,7 @@ import org.opendc.compute.workload.* import org.opendc.compute.workload.topology.HostSpec import org.opendc.compute.workload.topology.Topology import org.opendc.compute.workload.topology.apply -import org.opendc.compute.workload.util.PerformanceInterferenceReader +import org.opendc.compute.workload.util.VmInterferenceModelReader import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit @@ -131,7 +131,7 @@ class RunnerCli : CliktCommand(name = "runner") { logger.info { "Constructing performance interference model" } val workloadLoader = ComputeWorkloadLoader(tracePath) - val interferenceGroups = let { + val interferenceModel = let { val path = tracePath.resolve(scenario.trace.traceId).resolve("performance-interference-model.json") val operational = scenario.operationalPhenomena val enabled = operational.performanceInterferenceEnabled @@ -140,15 +140,14 @@ class RunnerCli : CliktCommand(name = "runner") { return@let null } - PerformanceInterferenceReader().read(path.inputStream()) + VmInterferenceModelReader().read(path.inputStream()) } val targets = portfolio.targets val results = (0 until targets.repeatsPerScenario).map { repeat -> logger.info { "Starting repeat $repeat" } withTimeout(runTimeout * 1000) { - val interferenceModel = interferenceGroups?.let { VmInterferenceModel(it, Random(repeat.toLong())) } - runRepeat(scenario, repeat, topology, workloadLoader, interferenceModel) + runRepeat(scenario, repeat, topology, workloadLoader, interferenceModel?.withSeed(repeat.toLong())) } } diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMetricExporter.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMetricExporter.kt index 7913660d..d39a0c74 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMetricExporter.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMetricExporter.kt @@ -24,8 +24,8 @@ package org.opendc.web.runner import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.ComputeMonitor -import org.opendc.telemetry.compute.table.HostData -import org.opendc.telemetry.compute.table.ServiceData +import org.opendc.telemetry.compute.table.HostTableReader +import org.opendc.telemetry.compute.table.ServiceTableReader import kotlin.math.max import kotlin.math.roundToLong @@ -33,24 +33,24 @@ import kotlin.math.roundToLong * A [ComputeMonitor] that tracks the aggregate metrics for each repeat. */ class WebComputeMetricExporter : ComputeMetricExporter() { - override fun record(data: HostData) { - val slices = data.downtime / SLICE_LENGTH + override fun record(reader: HostTableReader) { + val slices = reader.downtime / SLICE_LENGTH hostAggregateMetrics = AggregateHostMetrics( - hostAggregateMetrics.totalActiveTime + data.cpuActiveTime, - hostAggregateMetrics.totalIdleTime + data.cpuIdleTime, - hostAggregateMetrics.totalStealTime + data.cpuStealTime, - hostAggregateMetrics.totalLostTime + data.cpuLostTime, - hostAggregateMetrics.totalPowerDraw + data.powerTotal, + hostAggregateMetrics.totalActiveTime + reader.cpuActiveTime, + hostAggregateMetrics.totalIdleTime + reader.cpuIdleTime, + hostAggregateMetrics.totalStealTime + reader.cpuStealTime, + hostAggregateMetrics.totalLostTime + reader.cpuLostTime, + hostAggregateMetrics.totalPowerDraw + reader.powerTotal, hostAggregateMetrics.totalFailureSlices + slices, - hostAggregateMetrics.totalFailureVmSlices + data.guestsRunning * slices + hostAggregateMetrics.totalFailureVmSlices + reader.guestsRunning * slices ) - hostMetrics.compute(data.host.id) { _, prev -> + hostMetrics.compute(reader.host.id) { _, prev -> HostMetrics( - data.cpuUsage + (prev?.cpuUsage ?: 0.0), - data.cpuDemand + (prev?.cpuDemand ?: 0.0), - data.guestsRunning + (prev?.instanceCount ?: 0), + reader.cpuUsage + (prev?.cpuUsage ?: 0.0), + reader.cpuDemand + (prev?.cpuDemand ?: 0.0), + reader.guestsRunning + (prev?.instanceCount ?: 0), 1 + (prev?.count ?: 0) ) } @@ -79,13 +79,13 @@ class WebComputeMetricExporter : ComputeMetricExporter() { private var serviceMetrics: AggregateServiceMetrics = AggregateServiceMetrics() - override fun record(data: ServiceData) { + override fun record(reader: ServiceTableReader) { serviceMetrics = AggregateServiceMetrics( - max(data.attemptsSuccess, serviceMetrics.vmTotalCount), - max(data.serversPending, serviceMetrics.vmWaitingCount), - max(data.serversActive, serviceMetrics.vmActiveCount), + max(reader.attemptsSuccess, serviceMetrics.vmTotalCount), + max(reader.serversPending, serviceMetrics.vmWaitingCount), + max(reader.serversActive, serviceMetrics.vmActiveCount), max(0, serviceMetrics.vmInactiveCount), - max(data.attemptsFailure, serviceMetrics.vmFailedCount), + max(reader.attemptsFailure, serviceMetrics.vmFailedCount), ) } |
