summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt9
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt22
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt2
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt9
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt2
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuCapacityFilter.kt (renamed from opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceGroup.kt)32
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuCapacityWeigher.kt (renamed from opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt)38
-rw-r--r--opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt16
-rw-r--r--opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt65
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt66
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt10
-rw-r--r--opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt28
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt13
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt3
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt4
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt18
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt23
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt8
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt15
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt8
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReader.kt68
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReader.kt128
-rw-r--r--opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReaderTest.kt (renamed from opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt)28
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt8
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt25
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt7
-rw-r--r--opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionInstance.kt4
-rw-r--r--opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt5
-rw-r--r--opendc-simulator/opendc-simulator-compute/build.gradle.kts2
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt32
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt69
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt147
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt17
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt107
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt9
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt22
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt385
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt2
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt2
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt2
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt7
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt53
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt211
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt57
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt63
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt35
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt10
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt2
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt20
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt409
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt18
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt6
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostTableReader.kt125
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerTableReader.kt90
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt7
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceTableReader.kt (renamed from opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReaderTest.kt)61
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt9
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMetricExporter.kt38
58 files changed, 1738 insertions, 943 deletions
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt
index fc092a3f..f3b94e3d 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt
@@ -25,7 +25,12 @@ package org.opendc.compute.service.driver
/**
* Describes the static machine properties of the host.
*
+ * @property cpuCapacity The total CPU capacity of the host in MHz.
* @property cpuCount The number of logical processing cores available for this host.
- * @property memorySize The amount of memory available for this host in MB.
+ * @property memoryCapacity The amount of memory available for this host in MB.
*/
-public data class HostModel(public val cpuCount: Int, public val memorySize: Long)
+public data class HostModel(
+ public val cpuCapacity: Double,
+ public val cpuCount: Int,
+ public val memoryCapacity: Long
+)
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
index 57e70fcd..292feabe 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
@@ -26,6 +26,7 @@ import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.metrics.Meter
import io.opentelemetry.api.metrics.MeterProvider
+import io.opentelemetry.api.metrics.ObservableLongMeasurement
import kotlinx.coroutines.*
import mu.KotlinLogging
import org.opendc.compute.api.*
@@ -173,6 +174,12 @@ internal class ComputeServiceImpl(
result.observe(available, upState)
result.observe(total - available, downState)
}
+
+ meter.gaugeBuilder("system.time.provision")
+ .setDescription("The most recent timestamp where the server entered a provisioned state")
+ .setUnit("1")
+ .ofLongs()
+ .buildWithCallback(::collectProvisionTime)
}
override fun newClient(): ComputeClient {
@@ -298,7 +305,7 @@ internal class ComputeServiceImpl(
val hv = HostView(host)
maxCores = max(maxCores, host.model.cpuCount)
- maxMemory = max(maxMemory, host.model.memorySize)
+ maxMemory = max(maxMemory, host.model.memoryCapacity)
hostToView[host] = hv
if (host.state == HostState.UP) {
@@ -324,8 +331,10 @@ internal class ComputeServiceImpl(
internal fun schedule(server: InternalServer): SchedulingRequest {
logger.debug { "Enqueueing server ${server.uid} to be assigned to host." }
+ val now = clock.millis()
+ val request = SchedulingRequest(server, now)
- val request = SchedulingRequest(server, clock.millis())
+ server.lastProvisioningTimestamp = now
queue.add(request)
_serversPending.add(1)
requestSchedulingCycle()
@@ -501,4 +510,13 @@ internal class ComputeServiceImpl(
requestSchedulingCycle()
}
}
+
+ /**
+ * Collect the timestamp when each server entered its provisioning state most recently.
+ */
+ private fun collectProvisionTime(result: ObservableLongMeasurement) {
+ for ((_, server) in servers) {
+ result.observe(server.lastProvisioningTimestamp, server.attributes)
+ }
+ }
}
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt
index e2f33f11..0876209a 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt
@@ -37,7 +37,7 @@ public class HostView(public val host: Host) {
get() = host.uid
public var instanceCount: Int = 0
- public var availableMemory: Long = host.model.memorySize
+ public var availableMemory: Long = host.model.memoryCapacity
public var provisionedCores: Int = 0
override fun toString(): String = "HostView[host=$host]"
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt
index 05a7e1bf..f1b92c66 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt
@@ -55,7 +55,7 @@ internal class InternalServer(
/**
* The attributes of a server.
*/
- internal val attributes: Attributes = Attributes.builder()
+ @JvmField internal val attributes: Attributes = Attributes.builder()
.put(ResourceAttributes.HOST_NAME, name)
.put(ResourceAttributes.HOST_ID, uid.toString())
.put(ResourceAttributes.HOST_TYPE, flavor.name)
@@ -70,7 +70,12 @@ internal class InternalServer(
/**
* The [Host] that has been assigned to host the server.
*/
- internal var host: Host? = null
+ @JvmField internal var host: Host? = null
+
+ /**
+ * The most recent timestamp when the server entered a provisioning state.
+ */
+ @JvmField internal var lastProvisioningTimestamp: Long = Long.MIN_VALUE
/**
* The current scheduling request.
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt
index a470a453..8a7a646c 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt
@@ -34,7 +34,7 @@ public class RamFilter(private val allocationRatio: Double) : HostFilter {
override fun test(host: HostView, server: Server): Boolean {
val requested = server.flavor.memorySize
val available = host.availableMemory
- val total = host.host.model.memorySize
+ val total = host.host.model.memoryCapacity
// Do not allow an instance to overcommit against itself, only against
// other instances.
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceGroup.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuCapacityFilter.kt
index 708ddede..791710c8 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceGroup.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuCapacityFilter.kt
@@ -20,25 +20,21 @@
* SOFTWARE.
*/
-package org.opendc.simulator.compute.kernel.interference
+package org.opendc.compute.service.scheduler.filters
+
+import org.opendc.compute.api.Server
+import org.opendc.compute.service.internal.HostView
/**
- * A group of virtual machines that together can interfere when operating on the same resources, causing performance
- * variability.
+ * A [HostFilter] that filters hosts based on the vCPU speed requirements of a [Server] and the available
+ * capacity on the host.
*/
-public data class VmInterferenceGroup(
- /**
- * The minimum load of the host before the interference occurs.
- */
- public val targetLoad: Double,
-
- /**
- * A score in [0, 1] representing the performance variability as a result of resource interference.
- */
- public val score: Double,
+public class VCpuCapacityFilter : HostFilter {
+ override fun test(host: HostView, server: Server): Boolean {
+ val requiredCapacity = server.flavor.meta["cpu-capacity"] as? Double
+ val hostModel = host.host.model
+ val availableCapacity = hostModel.cpuCapacity / hostModel.cpuCount
- /**
- * The members of this interference group.
- */
- public val members: Set<String>
-)
+ return requiredCapacity == null || availableCapacity >= (requiredCapacity / server.flavor.cpuCount)
+ }
+}
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuCapacityWeigher.kt
index 8e787b97..a86226e2 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuCapacityWeigher.kt
@@ -20,31 +20,21 @@
* SOFTWARE.
*/
-package org.opendc.telemetry.compute.table
+package org.opendc.compute.service.scheduler.weights
-import java.time.Instant
+import org.opendc.compute.api.Server
+import org.opendc.compute.service.internal.HostView
/**
- * A trace entry for a particular host.
+ * A [HostWeigher] that weighs the hosts based on the difference required vCPU capacity and the available CPU capacity.
*/
-public data class HostData(
- val timestamp: Instant,
- val host: HostInfo,
- val guestsTerminated: Int,
- val guestsRunning: Int,
- val guestsError: Int,
- val guestsInvalid: Int,
- val cpuLimit: Double,
- val cpuUsage: Double,
- val cpuDemand: Double,
- val cpuUtilization: Double,
- val cpuActiveTime: Long,
- val cpuIdleTime: Long,
- val cpuStealTime: Long,
- val cpuLostTime: Long,
- val powerUsage: Double,
- val powerTotal: Double,
- val uptime: Long,
- val downtime: Long,
- val bootTime: Instant?
-)
+public class VCpuCapacityWeigher(override val multiplier: Double = 1.0) : HostWeigher {
+
+ override fun getWeight(host: HostView, server: Server): Double {
+ val model = host.host.model
+ val requiredCapacity = server.flavor.meta["cpu-capacity"] as? Double ?: 0.0
+ return model.cpuCapacity / model.cpuCount - requiredCapacity / server.flavor.cpuCount
+ }
+
+ override fun toString(): String = "VCpuWeigher"
+}
diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
index 564f9493..7b8d0fe2 100644
--- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
+++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
@@ -125,7 +125,7 @@ internal class ComputeServiceTest {
fun testAddHost() = scope.runBlockingSimulation {
val host = mockk<Host>(relaxUnitFun = true)
- every { host.model } returns HostModel(4, 2048)
+ every { host.model } returns HostModel(4 * 2600.0, 4, 2048)
every { host.state } returns HostState.UP
assertEquals(0, service.hostCount)
@@ -147,7 +147,7 @@ internal class ComputeServiceTest {
fun testAddHostDouble() = scope.runBlockingSimulation {
val host = mockk<Host>(relaxUnitFun = true)
- every { host.model } returns HostModel(4, 2048)
+ every { host.model } returns HostModel(4 * 2600.0, 4, 2048)
every { host.state } returns HostState.DOWN
assertEquals(0, service.hostCount)
@@ -216,7 +216,7 @@ internal class ComputeServiceTest {
fun testServerCannotFitOnHost() = scope.runBlockingSimulation {
val host = mockk<Host>(relaxUnitFun = true)
- every { host.model } returns HostModel(4, 2048)
+ every { host.model } returns HostModel(4 * 2600.0, 4, 2048)
every { host.state } returns HostState.UP
every { host.canFit(any()) } returns false
@@ -241,7 +241,7 @@ internal class ComputeServiceTest {
val listeners = mutableListOf<HostListener>()
every { host.uid } returns UUID.randomUUID()
- every { host.model } returns HostModel(4, 2048)
+ every { host.model } returns HostModel(4 * 2600.0, 4, 2048)
every { host.state } returns HostState.DOWN
every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) }
every { host.canFit(any()) } returns false
@@ -272,7 +272,7 @@ internal class ComputeServiceTest {
val listeners = mutableListOf<HostListener>()
every { host.uid } returns UUID.randomUUID()
- every { host.model } returns HostModel(4, 2048)
+ every { host.model } returns HostModel(4 * 2600.0, 4, 2048)
every { host.state } returns HostState.UP
every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) }
every { host.canFit(any()) } returns false
@@ -303,7 +303,7 @@ internal class ComputeServiceTest {
val listeners = mutableListOf<HostListener>()
every { host.uid } returns UUID.randomUUID()
- every { host.model } returns HostModel(4, 2048)
+ every { host.model } returns HostModel(4 * 2600.0, 4, 2048)
every { host.state } returns HostState.UP
every { host.canFit(any()) } returns true
every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) }
@@ -326,7 +326,7 @@ internal class ComputeServiceTest {
val listeners = mutableListOf<HostListener>()
every { host.uid } returns UUID.randomUUID()
- every { host.model } returns HostModel(4, 2048)
+ every { host.model } returns HostModel(4 * 2600.0, 4, 2048)
every { host.state } returns HostState.UP
every { host.canFit(any()) } returns true
every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) }
@@ -369,7 +369,7 @@ internal class ComputeServiceTest {
val listeners = mutableListOf<HostListener>()
every { host.uid } returns UUID.randomUUID()
- every { host.model } returns HostModel(4, 2048)
+ every { host.model } returns HostModel(4 * 2600.0, 4, 2048)
every { host.state } returns HostState.UP
every { host.canFit(any()) } returns true
every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) }
diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt
index cafd4498..3f2ce43b 100644
--- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt
+++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt
@@ -33,10 +33,7 @@ import org.opendc.compute.api.Server
import org.opendc.compute.service.driver.HostModel
import org.opendc.compute.service.driver.HostState
import org.opendc.compute.service.internal.HostView
-import org.opendc.compute.service.scheduler.filters.ComputeFilter
-import org.opendc.compute.service.scheduler.filters.InstanceCountFilter
-import org.opendc.compute.service.scheduler.filters.RamFilter
-import org.opendc.compute.service.scheduler.filters.VCpuFilter
+import org.opendc.compute.service.scheduler.filters.*
import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher
import org.opendc.compute.service.scheduler.weights.RamWeigher
@@ -183,12 +180,12 @@ internal class FilterSchedulerTest {
val hostA = mockk<HostView>()
every { hostA.host.state } returns HostState.UP
- every { hostA.host.model } returns HostModel(4, 2048)
+ every { hostA.host.model } returns HostModel(4 * 2600.0, 4, 2048)
every { hostA.availableMemory } returns 512
val hostB = mockk<HostView>()
every { hostB.host.state } returns HostState.UP
- every { hostB.host.model } returns HostModel(4, 2048)
+ every { hostB.host.model } returns HostModel(4 * 2600.0, 4, 2048)
every { hostB.availableMemory } returns 2048
scheduler.addHost(hostA)
@@ -210,7 +207,7 @@ internal class FilterSchedulerTest {
val host = mockk<HostView>()
every { host.host.state } returns HostState.UP
- every { host.host.model } returns HostModel(4, 2048)
+ every { host.host.model } returns HostModel(4 * 2600.0, 4, 2048)
every { host.availableMemory } returns 2048
scheduler.addHost(host)
@@ -231,12 +228,12 @@ internal class FilterSchedulerTest {
val hostA = mockk<HostView>()
every { hostA.host.state } returns HostState.UP
- every { hostA.host.model } returns HostModel(4, 2048)
+ every { hostA.host.model } returns HostModel(4 * 2600.0, 4, 2048)
every { hostA.provisionedCores } returns 3
val hostB = mockk<HostView>()
every { hostB.host.state } returns HostState.UP
- every { hostB.host.model } returns HostModel(4, 2048)
+ every { hostB.host.model } returns HostModel(4 * 2600.0, 4, 2048)
every { hostB.provisionedCores } returns 0
scheduler.addHost(hostA)
@@ -258,7 +255,7 @@ internal class FilterSchedulerTest {
val host = mockk<HostView>()
every { host.host.state } returns HostState.UP
- every { host.host.model } returns HostModel(4, 2048)
+ every { host.host.model } returns HostModel(4 * 2600.0, 4, 2048)
every { host.provisionedCores } returns 0
scheduler.addHost(host)
@@ -271,6 +268,34 @@ internal class FilterSchedulerTest {
}
@Test
+ fun testVCpuCapacityFilter() {
+ val scheduler = FilterScheduler(
+ filters = listOf(VCpuCapacityFilter()),
+ weighers = emptyList(),
+ )
+
+ val hostA = mockk<HostView>()
+ every { hostA.host.state } returns HostState.UP
+ every { hostA.host.model } returns HostModel(8 * 2600.0, 8, 2048)
+ every { hostA.availableMemory } returns 512
+ scheduler.addHost(hostA)
+
+ val hostB = mockk<HostView>()
+ every { hostB.host.state } returns HostState.UP
+ every { hostB.host.model } returns HostModel(4 * 3200.0, 4, 2048)
+ every { hostB.availableMemory } returns 512
+
+ scheduler.addHost(hostB)
+
+ val server = mockk<Server>()
+ every { server.flavor.cpuCount } returns 2
+ every { server.flavor.memorySize } returns 1024
+ every { server.flavor.meta } returns mapOf("cpu-capacity" to 2 * 3200.0)
+
+ assertEquals(hostB, scheduler.select(server))
+ }
+
+ @Test
fun testInstanceCountFilter() {
val scheduler = FilterScheduler(
filters = listOf(InstanceCountFilter(limit = 2)),
@@ -279,12 +304,12 @@ internal class FilterSchedulerTest {
val hostA = mockk<HostView>()
every { hostA.host.state } returns HostState.UP
- every { hostA.host.model } returns HostModel(4, 2048)
+ every { hostA.host.model } returns HostModel(4 * 2600.0, 4, 2048)
every { hostA.instanceCount } returns 2
val hostB = mockk<HostView>()
every { hostB.host.state } returns HostState.UP
- every { hostB.host.model } returns HostModel(4, 2048)
+ every { hostB.host.model } returns HostModel(4 * 2600.0, 4, 2048)
every { hostB.instanceCount } returns 0
scheduler.addHost(hostA)
@@ -306,12 +331,12 @@ internal class FilterSchedulerTest {
val hostA = mockk<HostView>()
every { hostA.host.state } returns HostState.UP
- every { hostA.host.model } returns HostModel(4, 2048)
+ every { hostA.host.model } returns HostModel(4 * 2600.0, 4, 2048)
every { hostA.availableMemory } returns 1024
val hostB = mockk<HostView>()
every { hostB.host.state } returns HostState.UP
- every { hostB.host.model } returns HostModel(4, 2048)
+ every { hostB.host.model } returns HostModel(4 * 2600.0, 4, 2048)
every { hostB.availableMemory } returns 512
scheduler.addHost(hostA)
@@ -333,12 +358,12 @@ internal class FilterSchedulerTest {
val hostA = mockk<HostView>()
every { hostA.host.state } returns HostState.UP
- every { hostA.host.model } returns HostModel(12, 2048)
+ every { hostA.host.model } returns HostModel(12 * 2600.0, 12, 2048)
every { hostA.availableMemory } returns 1024
val hostB = mockk<HostView>()
every { hostB.host.state } returns HostState.UP
- every { hostB.host.model } returns HostModel(4, 2048)
+ every { hostB.host.model } returns HostModel(4 * 2600.0, 4, 2048)
every { hostB.availableMemory } returns 512
scheduler.addHost(hostA)
@@ -360,12 +385,12 @@ internal class FilterSchedulerTest {
val hostA = mockk<HostView>()
every { hostA.host.state } returns HostState.UP
- every { hostA.host.model } returns HostModel(4, 2048)
+ every { hostA.host.model } returns HostModel(4 * 2600.0, 4, 2048)
every { hostA.provisionedCores } returns 2
val hostB = mockk<HostView>()
every { hostB.host.state } returns HostState.UP
- every { hostB.host.model } returns HostModel(4, 2048)
+ every { hostB.host.model } returns HostModel(4 * 2600.0, 4, 2048)
every { hostB.provisionedCores } returns 0
scheduler.addHost(hostA)
@@ -387,12 +412,12 @@ internal class FilterSchedulerTest {
val hostA = mockk<HostView>()
every { hostA.host.state } returns HostState.UP
- every { hostA.host.model } returns HostModel(4, 2048)
+ every { hostA.host.model } returns HostModel(4 * 2600.0, 4, 2048)
every { hostA.instanceCount } returns 2
val hostB = mockk<HostView>()
every { hostB.host.state } returns HostState.UP
- every { hostB.host.model } returns HostModel(4, 2048)
+ every { hostB.host.model } returns HostModel(4 * 2600.0, 4, 2048)
every { hostB.instanceCount } returns 0
scheduler.addHost(hostA)
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index b9d02185..908a58e9 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -47,6 +47,7 @@ import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.power.ConstantPowerModel
import org.opendc.simulator.compute.power.PowerDriver
import org.opendc.simulator.compute.power.SimplePowerDriver
+import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.flow.FlowEngine
import java.util.*
import kotlin.coroutines.CoroutineContext
@@ -121,7 +122,7 @@ public class SimHost(
field = value
}
- override val model: HostModel = HostModel(model.cpus.size, model.memory.sumOf { it.size })
+ override val model: HostModel = HostModel(model.cpus.sumOf { it.frequency }, model.cpus.size, model.memory.sumOf { it.size })
/**
* The [GuestListener] that listens for guest events.
@@ -136,11 +137,6 @@ public class SimHost(
}
}
- /**
- * The [Job] that represents the machine running the hypervisor.
- */
- private var _job: Job? = null
-
init {
launch()
@@ -188,7 +184,7 @@ public class SimHost(
}
override fun canFit(server: Server): Boolean {
- val sufficientMemory = model.memorySize >= server.flavor.memorySize
+ val sufficientMemory = model.memoryCapacity >= server.flavor.memorySize
val enoughCpus = model.cpuCount >= server.flavor.cpuCount
val canFit = hypervisor.canFit(server.flavor.toMachineModel())
@@ -199,11 +195,12 @@ public class SimHost(
val guest = guests.computeIfAbsent(server) { key ->
require(canFit(key)) { "Server does not fit" }
- val machine = hypervisor.createMachine(key.flavor.toMachineModel(), key.name)
+ val machine = hypervisor.newMachine(key.flavor.toMachineModel(), key.name)
val newGuest = Guest(
scope.coroutineContext,
clock,
this,
+ hypervisor,
mapper,
guestListener,
server,
@@ -249,7 +246,7 @@ public class SimHost(
override fun close() {
reset()
scope.cancel()
- machine.close()
+ machine.cancel()
}
override fun toString(): String = "SimHost[uid=$uid,name=$name,model=$model]"
@@ -276,26 +273,39 @@ public class SimHost(
}
/**
+ * The [Job] that represents the machine running the hypervisor.
+ */
+ private var _ctx: SimMachineContext? = null
+
+ /**
* Launch the hypervisor.
*/
private fun launch() {
- check(_job == null) { "Concurrent hypervisor running" }
+ check(_ctx == null) { "Concurrent hypervisor running" }
// Launch hypervisor onto machine
- _job = scope.launch {
- try {
- _bootTime = clock.millis()
- _state = HostState.UP
- machine.run(hypervisor, emptyMap())
- } catch (_: CancellationException) {
- // Ignored
- } catch (cause: Throwable) {
- logger.error(cause) { "Host failed" }
- throw cause
- } finally {
- _state = HostState.DOWN
+ _ctx = machine.startWorkload(object : SimWorkload {
+ override fun onStart(ctx: SimMachineContext) {
+ try {
+ _bootTime = clock.millis()
+ _state = HostState.UP
+ hypervisor.onStart(ctx)
+ } catch (cause: Throwable) {
+ _state = HostState.DOWN
+ _ctx = null
+ throw cause
+ }
}
- }
+
+ override fun onStop(ctx: SimMachineContext) {
+ try {
+ hypervisor.onStop(ctx)
+ } finally {
+ _state = HostState.DOWN
+ _ctx = null
+ }
+ }
+ })
}
/**
@@ -305,12 +315,7 @@ public class SimHost(
updateUptime()
// Stop the hypervisor
- val job = _job
- if (job != null) {
- job.cancel()
- _job = null
- }
-
+ _ctx?.close()
_state = HostState.DOWN
}
@@ -319,8 +324,9 @@ public class SimHost(
*/
private fun Flavor.toMachineModel(): MachineModel {
val originalCpu = machine.model.cpus[0]
+ val cpuCapacity = (this.meta["cpu-capacity"] as? Double ?: Double.MAX_VALUE).coerceAtMost(originalCpu.frequency)
val processingNode = originalCpu.node.copy(coreCount = cpuCount)
- val processingUnits = (0 until cpuCount).map { originalCpu.copy(id = it, node = processingNode) }
+ val processingUnits = (0 until cpuCount).map { originalCpu.copy(id = it, node = processingNode, frequency = cpuCapacity) }
val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize))
return MachineModel(processingUnits, memoryUnits).optimize()
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
index 5ea1860d..9f3122db 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
@@ -34,7 +34,9 @@ import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
import org.opendc.compute.simulator.SimHost
import org.opendc.compute.simulator.SimWorkloadMapper
+import org.opendc.simulator.compute.kernel.SimHypervisor
import org.opendc.simulator.compute.kernel.SimVirtualMachine
+import org.opendc.simulator.compute.runWorkload
import org.opendc.simulator.compute.workload.SimWorkload
import java.time.Clock
import kotlin.coroutines.CoroutineContext
@@ -46,6 +48,7 @@ internal class Guest(
context: CoroutineContext,
private val clock: Clock,
val host: SimHost,
+ private val hypervisor: SimHypervisor,
private val mapper: SimWorkloadMapper,
private val listener: GuestListener,
val server: Server,
@@ -114,8 +117,7 @@ internal class Guest(
stop()
state = ServerState.DELETED
-
- machine.close()
+ hypervisor.removeMachine(machine)
scope.cancel()
}
@@ -191,7 +193,7 @@ internal class Guest(
*/
private suspend fun runMachine(workload: SimWorkload) {
delay(1) // TODO Introduce model for boot time
- machine.run(workload, mapOf("driver" to host, "server" to server))
+ machine.runWorkload(workload, mapOf("driver" to host, "server" to server))
}
/**
@@ -248,7 +250,7 @@ internal class Guest(
*/
fun collectBootTime(result: ObservableLongMeasurement) {
if (_bootTime != Long.MIN_VALUE) {
- result.observe(_bootTime)
+ result.observe(_bootTime, attributes)
}
}
diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index a0ff9228..799a8cf0 100644
--- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -46,8 +46,8 @@ import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.flow.FlowEngine
import org.opendc.telemetry.compute.ComputeMetricExporter
import org.opendc.telemetry.compute.HOST_ID
-import org.opendc.telemetry.compute.table.HostData
-import org.opendc.telemetry.compute.table.ServerData
+import org.opendc.telemetry.compute.table.HostTableReader
+import org.opendc.telemetry.compute.table.ServerTableReader
import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import org.opendc.telemetry.sdk.toOtelClock
import java.time.Duration
@@ -140,10 +140,10 @@ internal class SimHostTest {
val reader = CoroutineMetricReader(
this, listOf(meterProvider as MetricProducer),
object : ComputeMetricExporter() {
- override fun record(data: HostData) {
- activeTime += data.cpuActiveTime
- idleTime += data.cpuIdleTime
- stealTime += data.cpuStealTime
+ override fun record(reader: HostTableReader) {
+ activeTime += reader.cpuActiveTime
+ idleTime += reader.cpuIdleTime
+ stealTime += reader.cpuStealTime
}
},
exportInterval = Duration.ofSeconds(duration)
@@ -236,16 +236,16 @@ internal class SimHostTest {
val reader = CoroutineMetricReader(
this, listOf(meterProvider as MetricProducer),
object : ComputeMetricExporter() {
- override fun record(data: HostData) {
- activeTime += data.cpuActiveTime
- idleTime += data.cpuIdleTime
- uptime += data.uptime
- downtime += data.downtime
+ override fun record(reader: HostTableReader) {
+ activeTime += reader.cpuActiveTime
+ idleTime += reader.cpuIdleTime
+ uptime += reader.uptime
+ downtime += reader.downtime
}
- override fun record(data: ServerData) {
- guestUptime += data.uptime
- guestDowntime += data.downtime
+ override fun record(reader: ServerTableReader) {
+ guestUptime += reader.uptime
+ guestDowntime += reader.downtime
}
},
exportInterval = Duration.ofSeconds(duration)
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
index 1a6624f7..f23becda 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
@@ -92,7 +92,8 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
val idCol = reader.resolve(RESOURCE_ID)
val startTimeCol = reader.resolve(RESOURCE_START_TIME)
val stopTimeCol = reader.resolve(RESOURCE_STOP_TIME)
- val coresCol = reader.resolve(RESOURCE_CPU_COUNT)
+ val cpuCountCol = reader.resolve(RESOURCE_CPU_COUNT)
+ val cpuCapacityCol = reader.resolve(RESOURCE_CPU_CAPACITY)
val memCol = reader.resolve(RESOURCE_MEM_CAPACITY)
var counter = 0
@@ -108,8 +109,9 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
val submissionTime = reader.get(startTimeCol) as Instant
val endTime = reader.get(stopTimeCol) as Instant
- val maxCores = reader.getInt(coresCol)
- val requiredMemory = reader.getDouble(memCol) / 1000.0 // Convert from KB to MB
+ val cpuCount = reader.getInt(cpuCountCol)
+ val cpuCapacity = reader.getDouble(cpuCapacityCol)
+ val memCapacity = reader.getDouble(memCol) / 1000.0 // Convert from KB to MB
val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray())
val builder = fragments.getValue(id)
@@ -119,8 +121,9 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
VirtualMachine(
uid,
id,
- maxCores,
- requiredMemory.roundToLong(),
+ cpuCount,
+ cpuCapacity,
+ memCapacity.roundToLong(),
totalLoad,
submissionTime,
endTime,
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt
index 283f82fe..90ee56cb 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt
@@ -128,7 +128,8 @@ public class ComputeWorkloadRunner(
client.newFlavor(
entry.name,
entry.cpuCount,
- entry.memCapacity
+ entry.memCapacity,
+ meta = if (entry.cpuCapacity > 0.0) mapOf("cpu-capacity" to entry.cpuCapacity) else emptyMap()
),
meta = mapOf("workload" to workload)
)
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt
index 5dd239f6..88e80719 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt
@@ -31,8 +31,9 @@ import java.util.*
*
* @param uid The unique identifier of the virtual machine.
* @param name The name of the virtual machine.
+ * @param cpuCapacity The required CPU capacity for the VM in MHz.
* @param cpuCount The number of vCPUs in the VM.
- * @param memCapacity The provisioned memory for the VM.
+ * @param memCapacity The provisioned memory for the VM in MB.
* @param startTime The start time of the VM.
* @param stopTime The stop time of the VM.
* @param trace The trace that belong to this VM.
@@ -41,6 +42,7 @@ public data class VirtualMachine(
val uid: UUID,
val name: String,
val cpuCount: Int,
+ val cpuCapacity: Double,
val memCapacity: Long,
val totalLoad: Double,
val startTime: Instant,
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt
index ad182d67..a46885f4 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt
@@ -25,9 +25,9 @@ package org.opendc.compute.workload.export.parquet
import io.opentelemetry.sdk.common.CompletableResultCode
import org.opendc.telemetry.compute.ComputeMetricExporter
import org.opendc.telemetry.compute.ComputeMonitor
-import org.opendc.telemetry.compute.table.HostData
-import org.opendc.telemetry.compute.table.ServerData
-import org.opendc.telemetry.compute.table.ServiceData
+import org.opendc.telemetry.compute.table.HostTableReader
+import org.opendc.telemetry.compute.table.ServerTableReader
+import org.opendc.telemetry.compute.table.ServiceTableReader
import java.io.File
/**
@@ -49,16 +49,16 @@ public class ParquetComputeMetricExporter(base: File, partition: String, bufferS
bufferSize
)
- override fun record(data: ServerData) {
- serverWriter.write(data)
+ override fun record(reader: ServerTableReader) {
+ serverWriter.write(reader)
}
- override fun record(data: HostData) {
- hostWriter.write(data)
+ override fun record(reader: HostTableReader) {
+ hostWriter.write(reader)
}
- override fun record(data: ServiceData) {
- serviceWriter.write(data)
+ override fun record(reader: ServiceTableReader) {
+ serviceWriter.write(reader)
}
override fun shutdown(): CompletableResultCode {
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt
index 4172d729..84387bbc 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt
@@ -50,9 +50,9 @@ public abstract class ParquetDataWriter<in T>(
private val logger = KotlinLogging.logger {}
/**
- * The queue of commands to process.
+ * The queue of records to process.
*/
- private val queue: BlockingQueue<T> = ArrayBlockingQueue(bufferSize)
+ private val queue: BlockingQueue<GenericData.Record> = ArrayBlockingQueue(bufferSize)
/**
* An exception to be propagated to the actual writer.
@@ -72,20 +72,20 @@ public abstract class ParquetDataWriter<in T>(
}
val queue = queue
- val buf = mutableListOf<T>()
+ val buf = mutableListOf<GenericData.Record>()
var shouldStop = false
try {
while (!shouldStop) {
try {
- process(writer, queue.take())
+ writer.write(queue.take())
} catch (e: InterruptedException) {
shouldStop = true
}
if (queue.drainTo(buf) > 0) {
for (data in buf) {
- process(writer, data)
+ writer.write(data)
}
buf.clear()
}
@@ -119,7 +119,9 @@ public abstract class ParquetDataWriter<in T>(
throw IllegalStateException("Writer thread failed", exception)
}
- queue.put(data)
+ val builder = GenericRecordBuilder(schema)
+ convert(builder, data)
+ queue.put(builder.build())
}
/**
@@ -133,13 +135,4 @@ public abstract class ParquetDataWriter<in T>(
init {
writerThread.start()
}
-
- /**
- * Process the specified [data] to be written to the Parquet file.
- */
- private fun process(writer: ParquetWriter<GenericData.Record>, data: T) {
- val builder = GenericRecordBuilder(schema)
- convert(builder, data)
- writer.write(builder.build())
- }
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt
index 98a0739e..2b7cac8f 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt
@@ -28,17 +28,17 @@ import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericRecordBuilder
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
-import org.opendc.telemetry.compute.table.HostData
+import org.opendc.telemetry.compute.table.HostTableReader
import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA
import org.opendc.trace.util.parquet.UUID_SCHEMA
import org.opendc.trace.util.parquet.optional
import java.io.File
/**
- * A Parquet event writer for [HostData]s.
+ * A Parquet event writer for [HostTableReader]s.
*/
public class ParquetHostDataWriter(path: File, bufferSize: Int) :
- ParquetDataWriter<HostData>(path, SCHEMA, bufferSize) {
+ ParquetDataWriter<HostTableReader>(path, SCHEMA, bufferSize) {
override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> {
return builder
@@ -46,7 +46,7 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) :
.build()
}
- override fun convert(builder: GenericRecordBuilder, data: HostData) {
+ override fun convert(builder: GenericRecordBuilder, data: HostTableReader) {
builder["timestamp"] = data.timestamp.toEpochMilli()
builder["host_id"] = data.host.id
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt
index 0d11ec23..144b6624 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt
@@ -28,17 +28,17 @@ import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericRecordBuilder
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
-import org.opendc.telemetry.compute.table.ServerData
+import org.opendc.telemetry.compute.table.ServerTableReader
import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA
import org.opendc.trace.util.parquet.UUID_SCHEMA
import org.opendc.trace.util.parquet.optional
import java.io.File
/**
- * A Parquet event writer for [ServerData]s.
+ * A Parquet event writer for [ServerTableReader]s.
*/
public class ParquetServerDataWriter(path: File, bufferSize: Int) :
- ParquetDataWriter<ServerData>(path, SCHEMA, bufferSize) {
+ ParquetDataWriter<ServerTableReader>(path, SCHEMA, bufferSize) {
override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> {
return builder
@@ -47,7 +47,7 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) :
.build()
}
- override fun convert(builder: GenericRecordBuilder, data: ServerData) {
+ override fun convert(builder: GenericRecordBuilder, data: ServerTableReader) {
builder["timestamp"] = data.timestamp.toEpochMilli()
builder["server_id"] = data.server.id
@@ -55,9 +55,8 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) :
builder["uptime"] = data.uptime
builder["downtime"] = data.downtime
- val bootTime = data.bootTime
- builder["boot_time"] = bootTime?.toEpochMilli()
- builder["scheduling_latency"] = data.schedulingLatency
+ builder["boot_time"] = data.bootTime?.toEpochMilli()
+ builder["provision_time"] = data.provisionTime?.toEpochMilli()
builder["cpu_count"] = data.server.cpuCount
builder["cpu_limit"] = data.cpuLimit
@@ -81,8 +80,8 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) :
.name("host_id").type(UUID_SCHEMA.optional()).noDefault()
.requiredLong("uptime")
.requiredLong("downtime")
+ .name("provision_time").type(TIMESTAMP_SCHEMA.optional()).noDefault()
.name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault()
- .requiredLong("scheduling_latency")
.requiredInt("cpu_count")
.requiredDouble("cpu_limit")
.requiredLong("cpu_time_active")
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt
index 47824b29..ec8a2b65 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt
@@ -25,17 +25,17 @@ package org.opendc.compute.workload.export.parquet
import org.apache.avro.Schema
import org.apache.avro.SchemaBuilder
import org.apache.avro.generic.GenericRecordBuilder
-import org.opendc.telemetry.compute.table.ServiceData
+import org.opendc.telemetry.compute.table.ServiceTableReader
import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA
import java.io.File
/**
- * A Parquet event writer for [ServiceData]s.
+ * A Parquet event writer for [ServiceTableReader]s.
*/
public class ParquetServiceDataWriter(path: File, bufferSize: Int) :
- ParquetDataWriter<ServiceData>(path, SCHEMA, bufferSize) {
+ ParquetDataWriter<ServiceTableReader>(path, SCHEMA, bufferSize) {
- override fun convert(builder: GenericRecordBuilder, data: ServiceData) {
+ override fun convert(builder: GenericRecordBuilder, data: ServiceTableReader) {
builder["timestamp"] = data.timestamp.toEpochMilli()
builder["hosts_up"] = data.hostsUp
builder["hosts_down"] = data.hostsDown
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReader.kt
deleted file mode 100644
index 67f9626c..00000000
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReader.kt
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.workload.util
-
-import com.fasterxml.jackson.annotation.JsonProperty
-import com.fasterxml.jackson.databind.ObjectMapper
-import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
-import com.fasterxml.jackson.module.kotlin.readValue
-import org.opendc.simulator.compute.kernel.interference.VmInterferenceGroup
-import java.io.File
-import java.io.InputStream
-
-/**
- * A parser for the JSON performance interference setup files used for the TPDS article on Capelin.
- */
-public class PerformanceInterferenceReader {
- /**
- * The [ObjectMapper] to use.
- */
- private val mapper = jacksonObjectMapper()
-
- init {
- mapper.addMixIn(VmInterferenceGroup::class.java, GroupMixin::class.java)
- }
-
- /**
- * Read the performance interface model from [file].
- */
- public fun read(file: File): List<VmInterferenceGroup> {
- return mapper.readValue(file)
- }
-
- /**
- * Read the performance interface model from the input.
- */
- public fun read(input: InputStream): List<VmInterferenceGroup> {
- return mapper.readValue(input)
- }
-
- private data class GroupMixin(
- @JsonProperty("minServerLoad")
- val targetLoad: Double,
- @JsonProperty("performanceScore")
- val score: Double,
- @JsonProperty("vms")
- val members: Set<String>,
- )
-}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReader.kt
new file mode 100644
index 00000000..e0fa8904
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReader.kt
@@ -0,0 +1,128 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.util
+
+import com.fasterxml.jackson.annotation.JsonProperty
+import com.fasterxml.jackson.core.JsonParseException
+import com.fasterxml.jackson.core.JsonParser
+import com.fasterxml.jackson.core.JsonToken
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
+import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
+import java.io.File
+import java.io.InputStream
+
+/**
+ * A parser for the JSON performance interference setup files used for the TPDS article on Capelin.
+ */
+public class VmInterferenceModelReader {
+ /**
+ * The [ObjectMapper] to use.
+ */
+ private val mapper = jacksonObjectMapper()
+
+ /**
+ * Read the performance interface model from [file].
+ */
+ public fun read(file: File): VmInterferenceModel {
+ val builder = VmInterferenceModel.builder()
+ val parser = mapper.createParser(file)
+ parseGroups(parser, builder)
+ return builder.build()
+ }
+
+ /**
+ * Read the performance interface model from the input.
+ */
+ public fun read(input: InputStream): VmInterferenceModel {
+ val builder = VmInterferenceModel.builder()
+ val parser = mapper.createParser(input)
+ parseGroups(parser, builder)
+ return builder.build()
+ }
+
+ /**
+ * Parse all groups in an interference JSON file.
+ */
+ private fun parseGroups(parser: JsonParser, builder: VmInterferenceModel.Builder) {
+ parser.nextToken()
+
+ if (!parser.isExpectedStartArrayToken) {
+ throw JsonParseException(parser, "Expected array at start, but got ${parser.currentToken()}")
+ }
+
+ while (parser.nextToken() != JsonToken.END_ARRAY) {
+ parseGroup(parser, builder)
+ }
+ }
+
+ /**
+ * Parse a group an interference JSON file.
+ */
+ private fun parseGroup(parser: JsonParser, builder: VmInterferenceModel.Builder) {
+ var targetLoad = Double.POSITIVE_INFINITY
+ var score = 1.0
+ val members = mutableSetOf<String>()
+
+ if (!parser.isExpectedStartObjectToken) {
+ throw JsonParseException(parser, "Expected object, but got ${parser.currentToken()}")
+ }
+
+ while (parser.nextValue() != JsonToken.END_OBJECT) {
+ when (parser.currentName) {
+ "vms" -> parseGroupMembers(parser, members)
+ "minServerLoad" -> targetLoad = parser.doubleValue
+ "performanceScore" -> score = parser.doubleValue
+ }
+ }
+
+ builder.addGroup(members, targetLoad, score)
+ }
+
+ /**
+ * Parse the members of a group.
+ */
+ private fun parseGroupMembers(parser: JsonParser, members: MutableSet<String>) {
+ if (!parser.isExpectedStartArrayToken) {
+ throw JsonParseException(parser, "Expected array for group members, but got ${parser.currentToken()}")
+ }
+
+ while (parser.nextValue() != JsonToken.END_ARRAY) {
+ if (parser.currentToken() != JsonToken.VALUE_STRING) {
+ throw JsonParseException(parser, "Expected string value for group member")
+ }
+
+ val member = parser.text.removePrefix("vm__workload__").removeSuffix(".txt")
+ members.add(member)
+ }
+ }
+
+ private data class Group(
+ @JsonProperty("minServerLoad")
+ val targetLoad: Double,
+ @JsonProperty("performanceScore")
+ val score: Double,
+ @JsonProperty("vms")
+ val members: Set<String>,
+ )
+}
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReaderTest.kt
index c48bff3a..1c3e7149 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt
+++ b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReaderTest.kt
@@ -20,24 +20,18 @@
* SOFTWARE.
*/
-package org.opendc.telemetry.compute.table
+package org.opendc.compute.workload.util
-import java.time.Instant
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertDoesNotThrow
/**
- * A trace entry for a particular server.
+ * Test suite for the [VmInterferenceModelReader] class.
*/
-public data class ServerData(
- val timestamp: Instant,
- val server: ServerInfo,
- val host: HostInfo?,
- val uptime: Long,
- val downtime: Long,
- val bootTime: Instant?,
- val schedulingLatency: Long,
- val cpuLimit: Double,
- val cpuActiveTime: Long,
- val cpuIdleTime: Long,
- val cpuStealTime: Long,
- val cpuLostTime: Long,
-)
+class VmInterferenceModelReaderTest {
+ @Test
+ fun testSmoke() {
+ val input = checkNotNull(VmInterferenceModelReader::class.java.getResourceAsStream("/perf-interference.json"))
+ assertDoesNotThrow { VmInterferenceModelReader().read(input) }
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index 4e855f82..53c9de11 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -30,14 +30,13 @@ import org.opendc.compute.workload.createComputeScheduler
import org.opendc.compute.workload.export.parquet.ParquetComputeMetricExporter
import org.opendc.compute.workload.grid5000
import org.opendc.compute.workload.topology.apply
-import org.opendc.compute.workload.util.PerformanceInterferenceReader
+import org.opendc.compute.workload.util.VmInterferenceModelReader
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
import org.opendc.experiments.capelin.topology.clusterTopology
import org.opendc.harness.dsl.Experiment
import org.opendc.harness.dsl.anyOf
-import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.telemetry.compute.collectServiceMetrics
import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
@@ -99,9 +98,8 @@ abstract class Portfolio(name: String) : Experiment(name) {
val seeder = Random(repeat.toLong())
val performanceInterferenceModel = if (operationalPhenomena.hasInterference)
- PerformanceInterferenceReader()
+ VmInterferenceModelReader()
.read(File(config.getString("interference-model")))
- .let { VmInterferenceModel(it, Random(seeder.nextLong())) }
else
null
@@ -116,7 +114,7 @@ abstract class Portfolio(name: String) : Experiment(name) {
clock,
computeScheduler,
failureModel,
- performanceInterferenceModel
+ performanceInterferenceModel?.withSeed(repeat.toLong())
)
val exporter = ParquetComputeMetricExporter(
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 94e92c1b..f3a6ed1a 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -34,13 +34,12 @@ import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
import org.opendc.compute.workload.*
import org.opendc.compute.workload.topology.Topology
import org.opendc.compute.workload.topology.apply
-import org.opendc.compute.workload.util.PerformanceInterferenceReader
+import org.opendc.compute.workload.util.VmInterferenceModelReader
import org.opendc.experiments.capelin.topology.clusterTopology
-import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.telemetry.compute.ComputeMetricExporter
import org.opendc.telemetry.compute.collectServiceMetrics
-import org.opendc.telemetry.compute.table.HostData
+import org.opendc.telemetry.compute.table.HostTableReader
import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import java.io.File
import java.time.Duration
@@ -177,9 +176,9 @@ class CapelinIntegrationTest {
val workload = createTestWorkload(1.0, seed)
val perfInterferenceInput = checkNotNull(CapelinIntegrationTest::class.java.getResourceAsStream("/bitbrains-perf-interference.json"))
val performanceInterferenceModel =
- PerformanceInterferenceReader()
+ VmInterferenceModelReader()
.read(perfInterferenceInput)
- .let { VmInterferenceModel(it, Random(seed.toLong())) }
+ .withSeed(seed.toLong())
val simulator = ComputeWorkloadRunner(
coroutineContext,
@@ -213,7 +212,7 @@ class CapelinIntegrationTest {
{ assertEquals(6013515, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } },
{ assertEquals(14724500, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } },
{ assertEquals(12530742, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } },
- { assertEquals(481251, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } }
+ { assertEquals(465088, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } }
)
}
@@ -285,13 +284,13 @@ class CapelinIntegrationTest {
var energyUsage = 0.0
var uptime = 0L
- override fun record(data: HostData) {
- idleTime += data.cpuIdleTime
- activeTime += data.cpuActiveTime
- stealTime += data.cpuStealTime
- lostTime += data.cpuLostTime
- energyUsage += data.powerTotal
- uptime += data.uptime
+ override fun record(reader: HostTableReader) {
+ idleTime += reader.cpuIdleTime
+ activeTime += reader.cpuActiveTime
+ stealTime += reader.cpuStealTime
+ lostTime += reader.cpuLostTime
+ energyUsage += reader.powerTotal
+ uptime += reader.uptime
}
}
}
diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
index fb36d2c7..1752802f 100644
--- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
+++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
@@ -34,6 +34,7 @@ import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.PowerModel
import org.opendc.simulator.compute.power.SimplePowerDriver
+import org.opendc.simulator.compute.runWorkload
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.flow.*
import java.time.Clock
@@ -128,6 +129,8 @@ public class SimTFDevice(
}
}
+ override fun onStop(ctx: SimMachineContext) {}
+
override fun onStart(conn: FlowConnection, now: Long) {
ctx = conn
capacity = conn.capacity
@@ -172,7 +175,7 @@ public class SimTFDevice(
init {
scope.launch {
- machine.run(workload)
+ machine.runWorkload(workload)
}
}
@@ -189,7 +192,7 @@ public class SimTFDevice(
}
override fun close() {
- machine.close()
+ machine.cancel()
scope.cancel()
}
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionInstance.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionInstance.kt
index a8b04df4..77eadbbe 100644
--- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionInstance.kt
+++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionInstance.kt
@@ -25,9 +25,9 @@ package org.opendc.faas.service.deployer
import org.opendc.faas.service.FunctionObject
/**
- * A [FunctionInstance] is a a self-contained worker—typically a container—capable of handling function executions.
+ * A [FunctionInstance] is a self-contained worker—typically a container—capable of handling function executions.
*
- * Multiple, concurrent function instances can exists for a single function, for scalability purposes.
+ * Multiple, concurrent function instances can exist for a single function, for scalability purposes.
*/
public interface FunctionInstance : AutoCloseable {
/**
diff --git a/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt b/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt
index 020d75b5..68233c1a 100644
--- a/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt
+++ b/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt
@@ -36,6 +36,7 @@ import org.opendc.simulator.compute.SimMachine
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.power.ConstantPowerModel
import org.opendc.simulator.compute.power.SimplePowerDriver
+import org.opendc.simulator.compute.runWorkload
import org.opendc.simulator.flow.FlowEngine
import java.time.Clock
import java.util.ArrayDeque
@@ -114,7 +115,7 @@ public class SimFunctionDeployer(
override fun close() {
state = FunctionInstanceState.Deleted
stop()
- machine.close()
+ machine.cancel()
}
override fun toString(): String = "FunctionInstance[state=$state]"
@@ -130,7 +131,7 @@ public class SimFunctionDeployer(
launch {
try {
- machine.run(workload)
+ machine.runWorkload(workload)
} finally {
state = FunctionInstanceState.Deleted
}
diff --git a/opendc-simulator/opendc-simulator-compute/build.gradle.kts b/opendc-simulator/opendc-simulator-compute/build.gradle.kts
index a2bb89c2..ca8b912a 100644
--- a/opendc-simulator/opendc-simulator-compute/build.gradle.kts
+++ b/opendc-simulator/opendc-simulator-compute/build.gradle.kts
@@ -35,7 +35,7 @@ dependencies {
api(projects.opendcSimulator.opendcSimulatorPower)
api(projects.opendcSimulator.opendcSimulatorNetwork)
implementation(projects.opendcSimulator.opendcSimulatorCore)
- implementation(projects.opendcUtils)
+ implementation(libs.kotlin.logging)
testImplementation(libs.slf4j.simple)
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
index cb52d24f..91e91f9d 100644
--- a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
@@ -76,7 +76,7 @@ class SimMachineBenchmarks {
val machine = SimBareMetalMachine(
engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
- return@runBlockingSimulation machine.run(SimTraceWorkload(trace))
+ return@runBlockingSimulation machine.runWorkload(SimTraceWorkload(trace))
}
}
@@ -89,15 +89,15 @@ class SimMachineBenchmarks {
)
val hypervisor = SimSpaceSharedHypervisor(engine, null, null)
- launch { machine.run(hypervisor) }
+ launch { machine.runWorkload(hypervisor) }
- val vm = hypervisor.createMachine(machineModel)
+ val vm = hypervisor.newMachine(machineModel)
try {
- return@runBlockingSimulation vm.run(SimTraceWorkload(trace))
+ return@runBlockingSimulation vm.runWorkload(SimTraceWorkload(trace))
} finally {
- vm.close()
- machine.close()
+ vm.cancel()
+ machine.cancel()
}
}
}
@@ -111,15 +111,15 @@ class SimMachineBenchmarks {
)
val hypervisor = SimFairShareHypervisor(engine, null, null, null)
- launch { machine.run(hypervisor) }
+ launch { machine.runWorkload(hypervisor) }
- val vm = hypervisor.createMachine(machineModel)
+ val vm = hypervisor.newMachine(machineModel)
try {
- return@runBlockingSimulation vm.run(SimTraceWorkload(trace))
+ return@runBlockingSimulation vm.runWorkload(SimTraceWorkload(trace))
} finally {
- vm.close()
- machine.close()
+ vm.cancel()
+ machine.cancel()
}
}
}
@@ -133,22 +133,22 @@ class SimMachineBenchmarks {
)
val hypervisor = SimFairShareHypervisor(engine, null, null, null)
- launch { machine.run(hypervisor) }
+ launch { machine.runWorkload(hypervisor) }
coroutineScope {
repeat(2) {
- val vm = hypervisor.createMachine(machineModel)
+ val vm = hypervisor.newMachine(machineModel)
launch {
try {
- vm.run(SimTraceWorkload(trace))
+ vm.runWorkload(SimTraceWorkload(trace))
} finally {
- machine.close()
+ machine.cancel()
}
}
}
}
- machine.close()
+ machine.cancel()
}
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt
new file mode 100644
index 00000000..c23f48dc
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt
@@ -0,0 +1,69 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.compute
+
+import kotlinx.coroutines.suspendCancellableCoroutine
+import org.opendc.simulator.compute.workload.SimWorkload
+import kotlin.coroutines.resume
+import kotlin.coroutines.resumeWithException
+
+/**
+ * Run the specified [SimWorkload] on this machine and suspend execution util [workload] has finished.
+ *
+ * @param workload The workload to start on the machine.
+ * @param meta The metadata to pass to the workload.
+ * @return A [SimMachineContext] that represents the execution context for the workload.
+ * @throws IllegalStateException if a workload is already active on the machine or if the machine is closed.
+ */
+public suspend fun SimMachine.runWorkload(workload: SimWorkload, meta: Map<String, Any> = emptyMap()) {
+ return suspendCancellableCoroutine { cont ->
+ cont.invokeOnCancellation { this@runWorkload.cancel() }
+
+ startWorkload(
+ object : SimWorkload {
+ override fun onStart(ctx: SimMachineContext) {
+ try {
+ workload.onStart(ctx)
+ } catch (cause: Throwable) {
+ cont.resumeWithException(cause)
+ throw cause
+ }
+ }
+
+ override fun onStop(ctx: SimMachineContext) {
+ try {
+ workload.onStop(ctx)
+
+ if (!cont.isCompleted) {
+ cont.resume(Unit)
+ }
+ } catch (cause: Throwable) {
+ cont.resumeWithException(cause)
+ throw cause
+ }
+ }
+ },
+ meta
+ )
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
index 60a10f20..6a4c594d 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
@@ -23,6 +23,7 @@
package org.opendc.simulator.compute
import kotlinx.coroutines.*
+import mu.KotlinLogging
import org.opendc.simulator.compute.device.SimNetworkAdapter
import org.opendc.simulator.compute.device.SimPeripheral
import org.opendc.simulator.compute.model.MachineModel
@@ -31,8 +32,6 @@ import org.opendc.simulator.compute.model.NetworkAdapter
import org.opendc.simulator.compute.model.StorageDevice
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.flow.*
-import kotlin.coroutines.Continuation
-import kotlin.coroutines.resume
/**
* Abstract implementation of the [SimMachine] interface.
@@ -72,48 +71,20 @@ public abstract class SimAbstractMachine(
public override val peripherals: List<SimPeripheral> = net.map { it as SimNetworkAdapter }
/**
- * A flag to indicate that the machine is terminated.
+ * The current active [Context].
*/
- private var isTerminated = false
+ private var _ctx: Context? = null
- /**
- * The continuation to resume when the virtual machine workload has finished.
- */
- private var cont: Continuation<Unit>? = null
-
- /**
- * Converge the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
- */
- override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) {
- check(!isTerminated) { "Machine is terminated" }
- check(cont == null) { "A machine cannot run concurrently" }
-
- val ctx = Context(meta)
-
- return suspendCancellableCoroutine { cont ->
- this.cont = cont
-
- // Cancel all cpus on cancellation
- cont.invokeOnCancellation {
- this.cont = null
- engine.batch {
- for (cpu in cpus) {
- cpu.cancel()
- }
- }
- }
+ override fun startWorkload(workload: SimWorkload, meta: Map<String, Any>): SimMachineContext {
+ check(_ctx == null) { "A machine cannot run concurrently" }
- engine.batch { workload.onStart(ctx) }
- }
+ val ctx = Context(workload, meta)
+ ctx.start()
+ return ctx
}
- override fun close() {
- if (isTerminated) {
- return
- }
-
- isTerminated = true
- cancel()
+ override fun cancel() {
+ _ctx?.close()
}
override fun onConverge(now: Long, delta: Long) {
@@ -121,29 +92,35 @@ public abstract class SimAbstractMachine(
}
/**
- * Cancel the workload that is currently running on the machine.
+ * The execution context in which the workload runs.
+ *
+ * @param workload The workload that is running on the machine.
+ * @param meta The metadata passed to the workload.
*/
- private fun cancel() {
- engine.batch {
- for (cpu in cpus) {
- cpu.cancel()
+ private inner class Context(
+ private val workload: SimWorkload,
+ override val meta: Map<String, Any>
+ ) : SimMachineContext {
+ /**
+ * A flag to indicate that the context has been closed.
+ */
+ private var isClosed = false
+
+ override val engine: FlowEngine = this@SimAbstractMachine.engine
+
+ /**
+ * Start this context.
+ */
+ fun start() {
+ try {
+ _ctx = this
+ engine.batch { workload.onStart(this) }
+ } catch (cause: Throwable) {
+ logger.warn(cause) { "Workload failed during onStart callback" }
+ close()
}
}
- val cont = cont
- if (cont != null) {
- this.cont = null
- cont.resume(Unit)
- }
- }
-
- /**
- * The execution context in which the workload runs.
- */
- private inner class Context(override val meta: Map<String, Any>) : SimMachineContext {
- override val engine: FlowEngine
- get() = this@SimAbstractMachine.engine
-
override val cpus: List<SimProcessingUnit> = this@SimAbstractMachine.cpus
override val memory: SimMemory = this@SimAbstractMachine.memory
@@ -152,7 +129,49 @@ public abstract class SimAbstractMachine(
override val storage: List<SimStorageInterface> = this@SimAbstractMachine.storage
- override fun close() = cancel()
+ override fun close() {
+ if (isClosed) {
+ return
+ }
+
+ isClosed = true
+ assert(_ctx == this) { "Invariant violation: multiple contexts active for a single machine" }
+ _ctx = null
+
+ // Cancel all the resources associated with the machine
+ doCancel()
+
+ try {
+ workload.onStop(this)
+ } catch (cause: Throwable) {
+ logger.warn(cause) { "Workload failed during onStop callback" }
+ }
+ }
+
+ /**
+ * Run the stop procedures for the resources associated with the machine.
+ */
+ private fun doCancel() {
+ engine.batch {
+ for (cpu in cpus) {
+ cpu.cancel()
+ }
+
+ memory.cancel()
+
+ for (ifx in net) {
+ (ifx as NetworkAdapterImpl).disconnect()
+ }
+
+ for (storage in storage) {
+ val impl = storage as StorageDeviceImpl
+ impl.read.cancel()
+ impl.write.cancel()
+ }
+ }
+ }
+
+ override fun toString(): String = "SimAbstractMachine.Context"
}
/**
@@ -166,7 +185,7 @@ public abstract class SimAbstractMachine(
* The [SimNetworkAdapter] implementation for a machine.
*/
private class NetworkAdapterImpl(
- private val engine: FlowEngine,
+ engine: FlowEngine,
model: NetworkAdapter,
index: Int
) : SimNetworkAdapter(), SimNetworkInterface {
@@ -208,4 +227,12 @@ public abstract class SimAbstractMachine(
override fun toString(): String = "SimAbstractMachine.StorageDeviceImpl[name=$name,capacity=$capacity]"
}
+
+ private companion object {
+ /**
+ * The logging instance associated with this class.
+ */
+ @JvmStatic
+ private val logger = KotlinLogging.logger {}
+ }
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt
index ab0b56ae..94581e89 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt
@@ -29,7 +29,7 @@ import org.opendc.simulator.compute.workload.SimWorkload
/**
* A generic machine that is able to run a [SimWorkload].
*/
-public interface SimMachine : AutoCloseable {
+public interface SimMachine {
/**
* The model of the machine containing its specifications.
*/
@@ -41,12 +41,19 @@ public interface SimMachine : AutoCloseable {
public val peripherals: List<SimPeripheral>
/**
- * Converge the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
+ * Start the specified [SimWorkload] on this machine.
+ *
+ * @param workload The workload to start on the machine.
+ * @param meta The metadata to pass to the workload.
+ * @return A [SimMachineContext] that represents the execution context for the workload.
+ * @throws IllegalStateException if a workload is already active on the machine or if the machine is closed.
*/
- public suspend fun run(workload: SimWorkload, meta: Map<String, Any> = emptyMap())
+ public fun startWorkload(workload: SimWorkload, meta: Map<String, Any> = emptyMap()): SimMachineContext
/**
- * Terminate this machine.
+ * Cancel the workload that is currently running on this machine.
+ *
+ * If no workload is active, this operation is a no-op.
*/
- public override fun close()
+ public fun cancel()
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt
index f6d8f628..07465126 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt
@@ -28,7 +28,9 @@ import org.opendc.simulator.compute.kernel.cpufreq.ScalingPolicy
import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.flow.*
+import org.opendc.simulator.flow.interference.InterferenceKey
import org.opendc.simulator.flow.mux.FlowMultiplexer
import kotlin.math.roundToLong
@@ -92,13 +94,20 @@ public abstract class SimAbstractHypervisor(
private val governors = mutableListOf<ScalingGovernor.Logic>()
/* SimHypervisor */
- override fun createMachine(model: MachineModel, interferenceId: String?): SimVirtualMachine {
+ override fun newMachine(model: MachineModel, interferenceId: String?): SimVirtualMachine {
require(canFit(model)) { "Machine does not fit" }
val vm = VirtualMachine(model, interferenceId)
_vms.add(vm)
return vm
}
+ override fun removeMachine(machine: SimVirtualMachine) {
+ if (_vms.remove(machine)) {
+ // This cast must always succeed, since `_vms` only contains `VirtualMachine` types.
+ (machine as VirtualMachine).close()
+ }
+ }
+
/* SimWorkload */
override fun onStart(ctx: SimMachineContext) {
context = ctx
@@ -121,6 +130,8 @@ public abstract class SimAbstractHypervisor(
}
}
+ override fun onStop(ctx: SimMachineContext) {}
+
private var _cpuCount = 0
private var _cpuCapacity = 0.0
@@ -141,33 +152,31 @@ public abstract class SimAbstractHypervisor(
*
* @param model The machine model of the virtual machine.
*/
- private inner class VirtualMachine(model: MachineModel, interferenceId: String? = null) : SimAbstractMachine(engine, parent = null, model), SimVirtualMachine {
+ private inner class VirtualMachine(
+ model: MachineModel,
+ interferenceId: String? = null
+ ) : SimAbstractMachine(engine, parent = null, model), SimVirtualMachine, AutoCloseable {
+ /**
+ * A flag to indicate that the machine is closed.
+ */
+ private var isClosed = false
+
/**
* The interference key of this virtual machine.
*/
- private val interferenceKey = interferenceId?.let { interferenceDomain?.join(interferenceId) }
+ private val interferenceKey: InterferenceKey? = interferenceId?.let { interferenceDomain?.createKey(it) }
/**
* The vCPUs of the machine.
*/
- override val cpus = model.cpus.map { VCpu(mux, mux.newInput(interferenceKey), it) }
+ override val cpus = model.cpus.map { cpu -> VCpu(mux, mux.newInput(cpu.frequency, interferenceKey), cpu) }
/**
* The resource counters associated with the hypervisor.
*/
override val counters: SimHypervisorCounters
get() = _counters
- private val _counters = object : SimHypervisorCounters {
- private val d = cpus.size / cpus.sumOf { it.model.frequency } * 1000
-
- override val cpuActiveTime: Long
- get() = (cpus.sumOf { it.counters.actual } * d).roundToLong()
- override val cpuIdleTime: Long
- get() = (cpus.sumOf { it.counters.actual + it.counters.remaining } * d).roundToLong()
- override val cpuStealTime: Long
- get() = (cpus.sumOf { it.counters.demand - it.counters.actual } * d).roundToLong()
- override val cpuLostTime: Long = (cpus.sumOf { it.counters.interference } * d).roundToLong()
- }
+ private val _counters = VmCountersImpl(cpus)
/**
* The CPU capacity of the hypervisor in MHz.
@@ -187,14 +196,58 @@ public abstract class SimAbstractHypervisor(
override val cpuUsage: Double
get() = cpus.sumOf(FlowConsumer::rate)
+ override fun startWorkload(workload: SimWorkload, meta: Map<String, Any>): SimMachineContext {
+ check(!isClosed) { "Machine is closed" }
+
+ return super.startWorkload(
+ object : SimWorkload {
+ override fun onStart(ctx: SimMachineContext) {
+ try {
+ joinInterferenceDomain()
+ workload.onStart(ctx)
+ } catch (cause: Throwable) {
+ leaveInterferenceDomain()
+ throw cause
+ }
+ }
+
+ override fun onStop(ctx: SimMachineContext) {
+ leaveInterferenceDomain()
+ workload.onStop(ctx)
+ }
+ },
+ meta
+ )
+ }
+
override fun close() {
- super.close()
+ if (isClosed) {
+ return
+ }
+
+ isClosed = true
+ cancel()
for (cpu in cpus) {
cpu.close()
}
+ }
- _vms.remove(this)
+ /**
+ * Join the interference domain of the hypervisor.
+ */
+ private fun joinInterferenceDomain() {
+ val interferenceKey = interferenceKey
+ if (interferenceKey != null) {
+ interferenceDomain?.join(interferenceKey)
+ }
+ }
+
+ /**
+ * Leave the interference domain of the hypervisor.
+ */
+ private fun leaveInterferenceDomain() {
+ val interferenceKey = interferenceKey
if (interferenceKey != null) {
interferenceDomain?.leave(interferenceKey)
}
@@ -211,9 +264,7 @@ public abstract class SimAbstractHypervisor(
) : SimProcessingUnit, FlowConsumer by source {
override var capacity: Double
get() = source.capacity
- set(_) {
- // Ignore capacity changes
- }
+ set(_) = TODO("Capacity changes on vCPU not supported")
override fun toString(): String = "SimAbstractHypervisor.VCpu[model=$model]"
@@ -287,4 +338,20 @@ public abstract class SimAbstractHypervisor(
cpuTime[3] += (interferenceDelta * d).roundToLong()
}
}
+
+ /**
+ * A [SimHypervisorCounters] implementation for a virtual machine.
+ */
+ private class VmCountersImpl(private val cpus: List<VCpu>) : SimHypervisorCounters {
+ private val d = cpus.size / cpus.sumOf { it.model.frequency } * 1000
+
+ override val cpuActiveTime: Long
+ get() = (cpus.sumOf { it.counters.actual } * d).roundToLong()
+ override val cpuIdleTime: Long
+ get() = (cpus.sumOf { it.counters.actual + it.counters.remaining } * d).roundToLong()
+ override val cpuStealTime: Long
+ get() = (cpus.sumOf { it.counters.demand - it.counters.actual } * d).roundToLong()
+ override val cpuLostTime: Long
+ get() = (cpus.sumOf { it.counters.interference } * d).roundToLong()
+ }
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt
index 57d4cf20..a69f419f 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt
@@ -67,5 +67,12 @@ public interface SimHypervisor : SimWorkload {
* @param model The machine to create.
* @param interferenceId An identifier for the interference model.
*/
- public fun createMachine(model: MachineModel, interferenceId: String? = null): SimVirtualMachine
+ public fun newMachine(model: MachineModel, interferenceId: String? = null): SimVirtualMachine
+
+ /**
+ * Remove the specified [machine] from the hypervisor.
+ *
+ * @param machine The machine to remove.
+ */
+ public fun removeMachine(machine: SimVirtualMachine)
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt
index b737d61a..09b03306 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt
@@ -30,14 +30,30 @@ import org.opendc.simulator.flow.interference.InterferenceKey
*/
public interface VmInterferenceDomain : InterferenceDomain {
/**
- * Join this interference domain.
+ * Construct an [InterferenceKey] for the specified [id].
*
* @param id The identifier of the virtual machine.
+ * @return A key identifying the virtual machine as part of the interference domain. `null` if the virtual machine
+ * does not participate in the domain.
*/
- public fun join(id: String): InterferenceKey
+ public fun createKey(id: String): InterferenceKey?
/**
- * Leave this interference domain.
+ * Remove the specified [key] from this domain.
+ */
+ public fun removeKey(key: InterferenceKey)
+
+ /**
+ * Mark the specified [key] as active in this interference domain.
+ *
+ * @param key The key to join the interference domain with.
+ */
+ public fun join(key: InterferenceKey)
+
+ /**
+ * Mark the specified [key] as inactive in this interference domain.
+ *
+ * @param key The key of the virtual machine that wants to leave the domain.
*/
public fun leave(key: InterferenceKey)
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt
index b3d72507..977292be 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt
@@ -28,143 +28,366 @@ import java.util.*
/**
* An interference model that models the resource interference between virtual machines on a host.
*
- * @param groups The groups of virtual machines that interfere with each other.
- * @param random The [Random] instance to select the affected virtual machines.
+ * @param targets The target load of each group.
+ * @param scores The performance score of each group.
+ * @param members The members belonging to each group.
+ * @param membership The identifier of each key.
+ * @param size The number of groups.
+ * @param seed The seed to use for randomly selecting the virtual machines that are affected.
*/
-public class VmInterferenceModel(
- private val groups: List<VmInterferenceGroup>,
- private val random: Random = Random(0)
+public class VmInterferenceModel private constructor(
+ private val targets: DoubleArray,
+ private val scores: DoubleArray,
+ private val idMapping: Map<String, Int>,
+ private val members: Array<IntArray>,
+ private val membership: Array<IntArray>,
+ private val size: Int,
+ seed: Long,
) {
/**
+ * A [SplittableRandom] used for selecting the virtual machines that are affected.
+ */
+ private val random = SplittableRandom(seed)
+
+ /**
* Construct a new [VmInterferenceDomain].
*/
- public fun newDomain(): VmInterferenceDomain = object : VmInterferenceDomain {
+ public fun newDomain(): VmInterferenceDomain = InterferenceDomainImpl(targets, scores, idMapping, members, membership, random)
+
+ /**
+ * Create a copy of this model with a different seed.
+ */
+ public fun withSeed(seed: Long): VmInterferenceModel {
+ return VmInterferenceModel(targets, scores, idMapping, members, membership, size, seed)
+ }
+
+ public companion object {
/**
- * The stateful groups of this domain.
+ * Construct a [Builder] instance.
*/
- private val groups = this@VmInterferenceModel.groups.map { GroupContext(it) }
+ @JvmStatic
+ public fun builder(): Builder = Builder()
+ }
+ /**
+ * Builder class for a [VmInterferenceModel]
+ */
+ public class Builder internal constructor() {
/**
- * The set of keys active in this domain.
+ * The initial capacity of the builder.
*/
- private val keys = mutableSetOf<InterferenceKeyImpl>()
+ private val INITIAL_CAPACITY = 256
- override fun join(id: String): InterferenceKey {
- val key = InterferenceKeyImpl(id, groups.filter { id in it }.sortedBy { it.group.targetLoad })
- keys += key
- return key
- }
+ /**
+ * The target load of each group.
+ */
+ private var _targets = DoubleArray(INITIAL_CAPACITY) { Double.POSITIVE_INFINITY }
- override fun leave(key: InterferenceKey) {
- if (key is InterferenceKeyImpl) {
- keys -= key
- key.leave()
+ /**
+ * The performance score of each group.
+ */
+ private var _scores = DoubleArray(INITIAL_CAPACITY) { Double.POSITIVE_INFINITY }
+
+ /**
+ * The members of each group.
+ */
+ private var _members = ArrayList<Set<String>>(INITIAL_CAPACITY)
+
+ /**
+ * The mapping from member to group id.
+ */
+ private val ids = TreeSet<String>()
+
+ /**
+ * The number of groups in the model.
+ */
+ private var size = 0
+
+ /**
+ * Add the specified group to the model.
+ */
+ public fun addGroup(members: Set<String>, targetLoad: Double, score: Double): Builder {
+ val size = size
+
+ if (size == _targets.size) {
+ grow()
}
+
+ _targets[size] = targetLoad
+ _scores[size] = score
+ _members.add(members)
+ ids.addAll(members)
+
+ this.size++
+
+ return this
}
- override fun apply(key: InterferenceKey?, load: Double): Double {
- if (key == null || key !is InterferenceKeyImpl) {
- return 1.0
- }
+ /**
+ * Build the [VmInterferenceModel].
+ */
+ public fun build(seed: Long = 0): VmInterferenceModel {
+ val size = size
+ val targets = _targets
+ val scores = _scores
+ val members = _members
- val ctx = key.findGroup(load)
- val group = ctx?.group
+ val indices = Array(size) { it }
+ indices.sortWith(
+ Comparator { l, r ->
+ var cmp = targets[l].compareTo(targets[r]) // Order by target load
+ if (cmp != 0) {
+ return@Comparator cmp
+ }
- // Apply performance penalty to (on average) only one of the VMs
- return if (group != null && random.nextInt(group.members.size) == 0) {
- group.score
- } else {
- 1.0
+ cmp = scores[l].compareTo(scores[r]) // Higher penalty first (this means lower performance score first)
+ if (cmp != 0)
+ cmp
+ else
+ l.compareTo(r)
+ }
+ )
+
+ val newTargets = DoubleArray(size)
+ val newScores = DoubleArray(size)
+ val newMembers = arrayOfNulls<IntArray>(size)
+
+ var nextId = 0
+ val idMapping = ids.associateWith { nextId++ }
+ val membership = ids.associateWithTo(TreeMap()) { ArrayList<Int>() }
+
+ for ((group, j) in indices.withIndex()) {
+ newTargets[group] = targets[j]
+ newScores[group] = scores[j]
+ val groupMembers = members[j]
+ val newGroupMembers = groupMembers.map { idMapping.getValue(it) }.toIntArray()
+
+ newGroupMembers.sort()
+ newMembers[group] = newGroupMembers
+
+ for (member in groupMembers) {
+ membership.getValue(member).add(group)
+ }
}
+
+ @Suppress("UNCHECKED_CAST")
+ return VmInterferenceModel(
+ newTargets,
+ newScores,
+ idMapping,
+ newMembers as Array<IntArray>,
+ membership.map { it.value.toIntArray() }.toTypedArray(),
+ size,
+ seed
+ )
}
- override fun toString(): String = "VmInterferenceDomain"
+ /**
+ * Helper function to grow the capacity of the internal arrays.
+ */
+ private fun grow() {
+ val oldSize = _targets.size
+ val newSize = oldSize + (oldSize shr 1)
+
+ _targets = _targets.copyOf(newSize)
+ _scores = _scores.copyOf(newSize)
+ }
}
/**
- * An interference key.
- *
- * @param id The identifier of the member.
- * @param groups The groups to which the key belongs.
+ * Internal implementation of [VmInterferenceDomain].
*/
- private inner class InterferenceKeyImpl(val id: String, private val groups: List<GroupContext>) : InterferenceKey {
- init {
- for (group in groups) {
- group.join(this)
- }
- }
+ private class InterferenceDomainImpl(
+ private val targets: DoubleArray,
+ private val scores: DoubleArray,
+ private val idMapping: Map<String, Int>,
+ private val members: Array<IntArray>,
+ private val membership: Array<IntArray>,
+ private val random: SplittableRandom
+ ) : VmInterferenceDomain {
+ /**
+ * Keys registered with this domain.
+ */
+ private val keys = HashMap<Int, InterferenceKeyImpl>()
/**
- * Find the active group that applies for the interference member.
+ * The set of keys active in this domain.
*/
- fun findGroup(load: Double): GroupContext? {
- // Find the first active group whose target load is lower than the current load
- val index = groups.binarySearchBy(load) { it.group.targetLoad }
- val target = if (index >= 0) index else -(index + 1)
+ private val activeKeys = ArrayList<InterferenceKeyImpl>()
- // Check whether there are active groups ahead of the index
- for (i in target until groups.size) {
- val group = groups[i]
- if (group.group.targetLoad > load) {
- break
- } else if (group.isActive) {
- return group
+ override fun createKey(id: String): InterferenceKey? {
+ val intId = idMapping[id] ?: return null
+ return keys.computeIfAbsent(intId) { InterferenceKeyImpl(intId) }
+ }
+
+ override fun removeKey(key: InterferenceKey) {
+ if (key !is InterferenceKeyImpl) {
+ return
+ }
+
+ if (activeKeys.remove(key)) {
+ computeActiveGroups(key.id)
+ }
+
+ keys.remove(key.id)
+ }
+
+ override fun join(key: InterferenceKey) {
+ if (key !is InterferenceKeyImpl) {
+ return
+ }
+
+ if (key.acquire()) {
+ val pos = activeKeys.binarySearch(key)
+ if (pos < 0) {
+ activeKeys.add(-pos - 1, key)
}
+ computeActiveGroups(key.id)
}
+ }
+
+ override fun leave(key: InterferenceKey) {
+ if (key is InterferenceKeyImpl && key.release()) {
+ activeKeys.remove(key)
+ computeActiveGroups(key.id)
+ }
+ }
+
+ override fun apply(key: InterferenceKey?, load: Double): Double {
+ if (key == null || key !is InterferenceKeyImpl) {
+ return 1.0
+ }
+
+ val groups = key.groups
+ val groupSize = groups.size
+
+ if (groupSize == 0) {
+ return 1.0
+ }
+
+ val targets = targets
+ val scores = scores
+ var low = 0
+ var high = groups.size - 1
- // Check whether there are active groups before the index
- for (i in (target - 1) downTo 0) {
- val group = groups[i]
- if (group.isActive) {
- return group
+ var group = -1
+ var score = 1.0
+
+ // Perform binary search over the groups based on target load
+ while (low <= high) {
+ val mid = low + high ushr 1
+ val midGroup = groups[mid]
+ val target = targets[midGroup]
+
+ if (target < load) {
+ low = mid + 1
+ group = midGroup
+ score = scores[midGroup]
+ } else if (target > load) {
+ high = mid - 1
+ } else {
+ group = midGroup
+ score = scores[midGroup]
+ break
}
}
- return null
+ return if (group >= 0 && random.nextInt(members[group].size) == 0) {
+ score
+ } else {
+ 1.0
+ }
}
+ override fun toString(): String = "VmInterferenceDomain"
+
/**
- * Leave all the groups.
+ * Queue of participants that will be removed or added to the active groups.
*/
- fun leave() {
+ private val _participants = ArrayDeque<InterferenceKeyImpl>()
+
+ /**
+ * (Re-)Compute the active groups.
+ */
+ private fun computeActiveGroups(id: Int) {
+ val activeKeys = activeKeys
+ val groups = membership[id]
+
+ if (activeKeys.isEmpty()) {
+ return
+ }
+
+ val members = members
+ val participants = _participants
+
for (group in groups) {
- group.leave(this)
+ val groupMembers = members[group]
+
+ var i = 0
+ var j = 0
+ var intersection = 0
+
+ // Compute the intersection of the group members and the current active members
+ while (i < groupMembers.size && j < activeKeys.size) {
+ val l = groupMembers[i]
+ val rightEntry = activeKeys[j]
+ val r = rightEntry.id
+
+ if (l < r) {
+ i++
+ } else if (l > r) {
+ j++
+ } else {
+ participants.add(rightEntry)
+ intersection++
+
+ i++
+ j++
+ }
+ }
+
+ while (true) {
+ val participant = participants.poll() ?: break
+ val participantGroups = participant.groups
+ if (intersection <= 1) {
+ participantGroups.remove(group)
+ } else {
+ val pos = participantGroups.binarySearch(group)
+ if (pos < 0) {
+ participantGroups.add(-pos - 1, group)
+ }
+ }
+ }
}
}
}
/**
- * A group context is used to track the active keys per interference group.
+ * An interference key.
+ *
+ * @param id The identifier of the member.
*/
- private inner class GroupContext(val group: VmInterferenceGroup) {
+ private class InterferenceKeyImpl(@JvmField val id: Int) : InterferenceKey, Comparable<InterferenceKeyImpl> {
/**
- * The active keys that are part of this group.
+ * The active groups to which the key belongs.
*/
- private val keys = mutableSetOf<InterferenceKeyImpl>()
+ @JvmField val groups: MutableList<Int> = ArrayList()
/**
- * A flag to indicate that the group is active.
+ * The number of users of the interference key.
*/
- val isActive
- get() = keys.size > 1
+ private var refCount: Int = 0
/**
- * Determine whether the specified [id] is part of this group.
+ * Join the domain.
*/
- operator fun contains(id: String): Boolean = id in group.members
+ fun acquire(): Boolean = refCount++ <= 0
/**
- * Join this group with the specified [key].
+ * Leave the domain.
*/
- fun join(key: InterferenceKeyImpl) {
- keys += key
- }
+ fun release(): Boolean = --refCount <= 0
- /**
- * Leave this group with the specified [key].
- */
- fun leave(key: InterferenceKeyImpl) {
- keys -= key
- }
+ override fun compareTo(other: InterferenceKeyImpl): Int = id.compareTo(other.id)
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt
index 99f4a1e1..726d1f56 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt
@@ -48,5 +48,7 @@ public class SimFlopsWorkload(
}
}
+ override fun onStop(ctx: SimMachineContext) {}
+
override fun toString(): String = "SimFlopsWorkload(FLOPs=$flops,utilization=$utilization)"
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt
index 2ef3bc43..8a3f5f84 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt
@@ -48,5 +48,7 @@ public class SimRuntimeWorkload(
}
}
+ override fun onStop(ctx: SimMachineContext) {}
+
override fun toString(): String = "SimRuntimeWorkload(duration=$duration,utilization=$utilization)"
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
index 53c98409..ce04a790 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
@@ -40,5 +40,7 @@ public class SimTraceWorkload(private val trace: SimTrace, private val offset: L
}
}
+ override fun onStop(ctx: SimMachineContext) {}
+
override fun toString(): String = "SimTraceWorkload"
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt
index b80665fa..61c6e2ad 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt
@@ -37,4 +37,11 @@ public interface SimWorkload {
* @param ctx The execution context in which the machine runs.
*/
public fun onStart(ctx: SimMachineContext)
+
+ /**
+ * This method is invoked when the workload is stopped.
+ *
+ * @param ctx The execution context in which the machine runs.
+ */
+ public fun onStop(ctx: SimMachineContext)
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt
index cc4f1f6a..742470a1 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt
@@ -33,31 +33,50 @@ public class SimWorkloadLifecycle(private val ctx: SimMachineContext) {
/**
* The resource consumers which represent the lifecycle of the workload.
*/
- private val waiting = mutableSetOf<FlowSource>()
+ private val waiting = HashSet<Wrapper>()
/**
- * Wait for the specified [consumer] to complete before ending the lifecycle of the workload.
+ * Wait for the specified [source] to complete before ending the lifecycle of the workload.
*/
- public fun waitFor(consumer: FlowSource): FlowSource {
- waiting.add(consumer)
- return object : FlowSource by consumer {
- override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
- try {
- consumer.onStop(conn, now, delta)
- } finally {
- complete(consumer)
- }
- }
- override fun toString(): String = "SimWorkloadLifecycle.Consumer[delegate=$consumer]"
- }
+ public fun waitFor(source: FlowSource): FlowSource {
+ val wrapper = Wrapper(source)
+ waiting.add(wrapper)
+ return wrapper
}
/**
- * Complete the specified [FlowSource].
+ * Complete the specified [Wrapper].
*/
- private fun complete(consumer: FlowSource) {
- if (waiting.remove(consumer) && waiting.isEmpty()) {
+ private fun complete(wrapper: Wrapper) {
+ if (waiting.remove(wrapper) && waiting.isEmpty()) {
ctx.close()
}
}
+
+ /**
+ * A [FlowSource] that wraps [delegate] and informs [SimWorkloadLifecycle] that is has completed.
+ */
+ private inner class Wrapper(private val delegate: FlowSource) : FlowSource {
+ override fun onStart(conn: FlowConnection, now: Long) {
+ delegate.onStart(conn, now)
+ }
+
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ return delegate.onPull(conn, now, delta)
+ }
+
+ override fun onConverge(conn: FlowConnection, now: Long, delta: Long) {
+ delegate.onConverge(conn, now, delta)
+ }
+
+ override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
+ try {
+ delegate.onStop(conn, now, delta)
+ } finally {
+ complete(this)
+ }
+ }
+
+ override fun toString(): String = "SimWorkloadLifecycle.Wrapper[delegate=$delegate]"
+ }
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
index 0bb24ed8..644eb497 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
@@ -65,14 +65,10 @@ class SimMachineTest {
SimplePowerDriver(ConstantPowerModel(0.0))
)
- try {
- machine.run(SimFlopsWorkload(2_000, utilization = 1.0))
+ machine.runWorkload(SimFlopsWorkload(2_000, utilization = 1.0))
- // Two cores execute 1000 MFlOps per second (1000 ms)
- assertEquals(1000, clock.millis())
- } finally {
- machine.close()
- }
+ // Two cores execute 1000 MFlOps per second (1000 ms)
+ assertEquals(1000, clock.millis())
}
@Test
@@ -88,14 +84,10 @@ class SimMachineTest {
SimplePowerDriver(ConstantPowerModel(0.0))
)
- try {
- machine.run(SimFlopsWorkload(2_000, utilization = 1.0))
+ machine.runWorkload(SimFlopsWorkload(2_000, utilization = 1.0))
- // Two sockets with two cores execute 2000 MFlOps per second (500 ms)
- assertEquals(500, clock.millis())
- } finally {
- machine.close()
- }
+ // Two sockets with two cores execute 2000 MFlOps per second (500 ms)
+ assertEquals(500, clock.millis())
}
@Test
@@ -109,16 +101,12 @@ class SimMachineTest {
val source = SimPowerSource(engine, capacity = 1000.0)
source.connect(machine.psu)
- try {
- coroutineScope {
- launch { machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) }
- assertAll(
- { assertEquals(100.0, machine.psu.powerDraw) },
- { assertEquals(100.0, source.powerDraw) }
- )
- }
- } finally {
- machine.close()
+ coroutineScope {
+ launch { machine.runWorkload(SimFlopsWorkload(2_000, utilization = 1.0)) }
+ assertAll(
+ { assertEquals(100.0, machine.psu.powerDraw) },
+ { assertEquals(100.0, source.powerDraw) }
+ )
}
}
@@ -130,22 +118,20 @@ class SimMachineTest {
SimplePowerDriver(ConstantPowerModel(0.0))
)
- try {
- machine.run(object : SimWorkload {
- override fun onStart(ctx: SimMachineContext) {
- val cpu = ctx.cpus[0]
-
- cpu.capacity = cpu.model.frequency + 1000.0
- assertEquals(cpu.model.frequency, cpu.capacity)
- cpu.capacity = -1.0
- assertEquals(0.0, cpu.capacity)
-
- ctx.close()
- }
- })
- } finally {
- machine.close()
- }
+ machine.runWorkload(object : SimWorkload {
+ override fun onStart(ctx: SimMachineContext) {
+ val cpu = ctx.cpus[0]
+
+ cpu.capacity = cpu.model.frequency + 1000.0
+ assertEquals(cpu.model.frequency, cpu.capacity)
+ cpu.capacity = -1.0
+ assertEquals(0.0, cpu.capacity)
+
+ ctx.close()
+ }
+
+ override fun onStop(ctx: SimMachineContext) {}
+ })
}
@Test
@@ -156,16 +142,14 @@ class SimMachineTest {
SimplePowerDriver(ConstantPowerModel(0.0))
)
- try {
- machine.run(object : SimWorkload {
- override fun onStart(ctx: SimMachineContext) {
- assertEquals(32_000 * 4.0, ctx.memory.capacity)
- ctx.close()
- }
- })
- } finally {
- machine.close()
- }
+ machine.runWorkload(object : SimWorkload {
+ override fun onStart(ctx: SimMachineContext) {
+ assertEquals(32_000 * 4.0, ctx.memory.capacity)
+ ctx.close()
+ }
+
+ override fun onStop(ctx: SimMachineContext) {}
+ })
}
@Test
@@ -176,18 +160,16 @@ class SimMachineTest {
SimplePowerDriver(ConstantPowerModel(0.0))
)
- try {
- machine.run(object : SimWorkload {
- override fun onStart(ctx: SimMachineContext) {
- val lifecycle = SimWorkloadLifecycle(ctx)
- ctx.memory.startConsumer(lifecycle.waitFor(FixedFlowSource(ctx.memory.capacity, utilization = 0.8)))
- }
- })
-
- assertEquals(1250, clock.millis())
- } finally {
- machine.close()
- }
+ machine.runWorkload(object : SimWorkload {
+ override fun onStart(ctx: SimMachineContext) {
+ val lifecycle = SimWorkloadLifecycle(ctx)
+ ctx.memory.startConsumer(lifecycle.waitFor(FixedFlowSource(ctx.memory.capacity, utilization = 0.8)))
+ }
+
+ override fun onStop(ctx: SimMachineContext) {}
+ })
+
+ assertEquals(1250, clock.millis())
}
@Test
@@ -202,19 +184,17 @@ class SimMachineTest {
val adapter = (machine.peripherals[0] as SimNetworkAdapter)
adapter.connect(SimNetworkSink(engine, adapter.bandwidth))
- try {
- machine.run(object : SimWorkload {
- override fun onStart(ctx: SimMachineContext) {
- val lifecycle = SimWorkloadLifecycle(ctx)
- val iface = ctx.net[0]
- iface.tx.startConsumer(lifecycle.waitFor(FixedFlowSource(iface.bandwidth, utilization = 0.8)))
- }
- })
-
- assertEquals(1250, clock.millis())
- } finally {
- machine.close()
- }
+ machine.runWorkload(object : SimWorkload {
+ override fun onStart(ctx: SimMachineContext) {
+ val lifecycle = SimWorkloadLifecycle(ctx)
+ val iface = ctx.net[0]
+ iface.tx.startConsumer(lifecycle.waitFor(FixedFlowSource(iface.bandwidth, utilization = 0.8)))
+ }
+
+ override fun onStop(ctx: SimMachineContext) {}
+ })
+
+ assertEquals(1250, clock.millis())
}
@Test
@@ -226,19 +206,17 @@ class SimMachineTest {
SimplePowerDriver(ConstantPowerModel(0.0))
)
- try {
- machine.run(object : SimWorkload {
- override fun onStart(ctx: SimMachineContext) {
- val lifecycle = SimWorkloadLifecycle(ctx)
- val disk = ctx.storage[0]
- disk.read.startConsumer(lifecycle.waitFor(FixedFlowSource(disk.read.capacity, utilization = 0.8)))
- }
- })
-
- assertEquals(1250, clock.millis())
- } finally {
- machine.close()
- }
+ machine.runWorkload(object : SimWorkload {
+ override fun onStart(ctx: SimMachineContext) {
+ val lifecycle = SimWorkloadLifecycle(ctx)
+ val disk = ctx.storage[0]
+ disk.read.startConsumer(lifecycle.waitFor(FixedFlowSource(disk.read.capacity, utilization = 0.8)))
+ }
+
+ override fun onStop(ctx: SimMachineContext) {}
+ })
+
+ assertEquals(1250, clock.millis())
}
@Test
@@ -250,19 +228,17 @@ class SimMachineTest {
SimplePowerDriver(ConstantPowerModel(0.0))
)
- try {
- machine.run(object : SimWorkload {
- override fun onStart(ctx: SimMachineContext) {
- val lifecycle = SimWorkloadLifecycle(ctx)
- val disk = ctx.storage[0]
- disk.write.startConsumer(lifecycle.waitFor(FixedFlowSource(disk.write.capacity, utilization = 0.8)))
- }
- })
-
- assertEquals(1250, clock.millis())
- } finally {
- machine.close()
- }
+ machine.runWorkload(object : SimWorkload {
+ override fun onStart(ctx: SimMachineContext) {
+ val lifecycle = SimWorkloadLifecycle(ctx)
+ val disk = ctx.storage[0]
+ disk.write.startConsumer(lifecycle.waitFor(FixedFlowSource(disk.write.capacity, utilization = 0.8)))
+ }
+
+ override fun onStop(ctx: SimMachineContext) {}
+ })
+
+ assertEquals(1250, clock.millis())
}
@Test
@@ -275,13 +251,11 @@ class SimMachineTest {
try {
coroutineScope {
- launch { machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) }
+ launch { machine.runWorkload(SimFlopsWorkload(2_000, utilization = 1.0)) }
cancel()
}
} catch (_: CancellationException) {
// Ignore
- } finally {
- machine.close()
}
assertEquals(0, clock.millis())
@@ -295,31 +269,14 @@ class SimMachineTest {
SimplePowerDriver(ConstantPowerModel(0.0))
)
- try {
- coroutineScope {
- launch {
- machine.run(SimFlopsWorkload(2_000, utilization = 1.0))
- }
+ coroutineScope {
+ launch {
+ machine.runWorkload(SimFlopsWorkload(2_000, utilization = 1.0))
+ }
- assertThrows<IllegalStateException> {
- machine.run(SimFlopsWorkload(2_000, utilization = 1.0))
- }
+ assertThrows<IllegalStateException> {
+ machine.runWorkload(SimFlopsWorkload(2_000, utilization = 1.0))
}
- } finally {
- machine.close()
}
}
-
- @Test
- fun testClose() = runBlockingSimulation {
- val machine = SimBareMetalMachine(
- FlowEngine(coroutineContext, clock),
- machineModel,
- SimplePowerDriver(ConstantPowerModel(0.0))
- )
-
- machine.close()
- assertDoesNotThrow { machine.close() }
- assertThrows<IllegalStateException> { machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) }
- }
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt
index 6f32cf46..91855e8d 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt
@@ -30,7 +30,6 @@ import org.junit.jupiter.api.assertAll
import org.junit.jupiter.api.assertDoesNotThrow
import org.opendc.simulator.compute.SimBareMetalMachine
import org.opendc.simulator.compute.kernel.cpufreq.PerformanceScalingGovernor
-import org.opendc.simulator.compute.kernel.interference.VmInterferenceGroup
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.MemoryUnit
@@ -38,6 +37,7 @@ import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.ConstantPowerModel
import org.opendc.simulator.compute.power.SimplePowerDriver
+import org.opendc.simulator.compute.runWorkload
import org.opendc.simulator.compute.workload.SimTrace
import org.opendc.simulator.compute.workload.SimTraceFragment
import org.opendc.simulator.compute.workload.SimTraceWorkload
@@ -81,16 +81,16 @@ internal class SimFairShareHypervisorTest {
val hypervisor = SimFairShareHypervisor(platform, null, PerformanceScalingGovernor(), null)
launch {
- machine.run(hypervisor)
+ machine.runWorkload(hypervisor)
println("Hypervisor finished")
}
yield()
- val vm = hypervisor.createMachine(model)
- vm.run(workloadA)
+ val vm = hypervisor.newMachine(model)
+ vm.runWorkload(workloadA)
yield()
- machine.close()
+ machine.cancel()
assertAll(
{ assertEquals(319781, hypervisor.counters.cpuActiveTime, "Active time does not match") },
@@ -132,22 +132,22 @@ internal class SimFairShareHypervisorTest {
val hypervisor = SimFairShareHypervisor(platform, null, null, null)
launch {
- machine.run(hypervisor)
+ machine.runWorkload(hypervisor)
}
yield()
coroutineScope {
launch {
- val vm = hypervisor.createMachine(model)
- vm.run(workloadA)
- vm.close()
+ val vm = hypervisor.newMachine(model)
+ vm.runWorkload(workloadA)
+ hypervisor.removeMachine(vm)
}
- val vm = hypervisor.createMachine(model)
- vm.run(workloadB)
- vm.close()
+ val vm = hypervisor.newMachine(model)
+ vm.runWorkload(workloadB)
+ hypervisor.removeMachine(vm)
}
yield()
- machine.close()
+ machine.cancel()
yield()
assertAll(
@@ -172,11 +172,11 @@ internal class SimFairShareHypervisorTest {
assertDoesNotThrow {
launch {
- machine.run(hypervisor)
+ machine.runWorkload(hypervisor)
}
}
- machine.close()
+ machine.cancel()
}
@Test
@@ -187,12 +187,11 @@ internal class SimFairShareHypervisorTest {
memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
)
- val groups = listOf(
- VmInterferenceGroup(targetLoad = 0.0, score = 0.9, members = setOf("a", "b")),
- VmInterferenceGroup(targetLoad = 0.0, score = 0.6, members = setOf("a", "c")),
- VmInterferenceGroup(targetLoad = 0.1, score = 0.8, members = setOf("a", "n"))
- )
- val interferenceModel = VmInterferenceModel(groups)
+ val interferenceModel = VmInterferenceModel.builder()
+ .addGroup(targetLoad = 0.0, score = 0.9, members = setOf("a", "b"))
+ .addGroup(targetLoad = 0.0, score = 0.6, members = setOf("a", "c"))
+ .addGroup(targetLoad = 0.1, score = 0.8, members = setOf("a", "n"))
+ .build()
val platform = FlowEngine(coroutineContext, clock)
val machine = SimBareMetalMachine(
@@ -221,20 +220,20 @@ internal class SimFairShareHypervisorTest {
)
launch {
- machine.run(hypervisor)
+ machine.runWorkload(hypervisor)
}
coroutineScope {
launch {
- val vm = hypervisor.createMachine(model, "a")
- vm.run(workloadA)
- vm.close()
+ val vm = hypervisor.newMachine(model, "a")
+ vm.runWorkload(workloadA)
+ hypervisor.removeMachine(vm)
}
- val vm = hypervisor.createMachine(model, "b")
- vm.run(workloadB)
- vm.close()
+ val vm = hypervisor.newMachine(model, "b")
+ vm.runWorkload(workloadB)
+ hypervisor.removeMachine(vm)
}
- machine.close()
+ machine.cancel()
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt
index 02d308ff..823a0ae3 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt
@@ -36,6 +36,7 @@ import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.ConstantPowerModel
import org.opendc.simulator.compute.power.SimplePowerDriver
+import org.opendc.simulator.compute.runWorkload
import org.opendc.simulator.compute.workload.*
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.flow.FlowEngine
@@ -76,13 +77,13 @@ internal class SimSpaceSharedHypervisorTest {
val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)))
val hypervisor = SimSpaceSharedHypervisor(engine, null, null)
- launch { machine.run(hypervisor) }
- val vm = hypervisor.createMachine(machineModel)
- vm.run(workloadA)
+ launch { machine.runWorkload(hypervisor) }
+ val vm = hypervisor.newMachine(machineModel)
+ vm.runWorkload(workloadA)
yield()
- vm.close()
- machine.close()
+ hypervisor.removeMachine(vm)
+ machine.cancel()
assertEquals(5 * 60L * 4000, clock.millis()) { "Took enough time" }
}
@@ -98,12 +99,13 @@ internal class SimSpaceSharedHypervisorTest {
val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)))
val hypervisor = SimSpaceSharedHypervisor(engine, null, null)
- launch { machine.run(hypervisor) }
+ launch { machine.runWorkload(hypervisor) }
yield()
- val vm = hypervisor.createMachine(machineModel)
- vm.run(workload)
- vm.close()
- machine.close()
+ val vm = hypervisor.newMachine(machineModel)
+ vm.runWorkload(workload)
+ hypervisor.removeMachine(vm)
+
+ machine.cancel()
assertEquals(duration, clock.millis()) { "Took enough time" }
}
@@ -121,11 +123,11 @@ internal class SimSpaceSharedHypervisorTest {
)
val hypervisor = SimSpaceSharedHypervisor(engine, null, null)
- launch { machine.run(hypervisor) }
+ launch { machine.runWorkload(hypervisor) }
yield()
- val vm = hypervisor.createMachine(machineModel)
- vm.run(workload)
- machine.close()
+ val vm = hypervisor.newMachine(machineModel)
+ vm.runWorkload(workload)
+ machine.cancel()
assertEquals(duration, clock.millis()) { "Took enough time" }
}
@@ -142,19 +144,20 @@ internal class SimSpaceSharedHypervisorTest {
)
val hypervisor = SimSpaceSharedHypervisor(engine, null, null)
- launch { machine.run(hypervisor) }
+ launch { machine.runWorkload(hypervisor) }
yield()
- val vm = hypervisor.createMachine(machineModel)
- vm.run(SimRuntimeWorkload(duration))
- vm.close()
+ val vm = hypervisor.newMachine(machineModel)
+ vm.runWorkload(SimRuntimeWorkload(duration))
+ hypervisor.removeMachine(vm)
yield()
- val vm2 = hypervisor.createMachine(machineModel)
- vm2.run(SimRuntimeWorkload(duration))
- vm2.close()
- machine.close()
+ val vm2 = hypervisor.newMachine(machineModel)
+ vm2.runWorkload(SimRuntimeWorkload(duration))
+ hypervisor.removeMachine(vm2)
+
+ machine.cancel()
assertEquals(duration * 2, clock.millis()) { "Took enough time" }
}
@@ -168,17 +171,17 @@ internal class SimSpaceSharedHypervisorTest {
val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)))
val hypervisor = SimSpaceSharedHypervisor(engine, null, null)
- launch { machine.run(hypervisor) }
+ launch { machine.runWorkload(hypervisor) }
yield()
- hypervisor.createMachine(machineModel)
+ hypervisor.newMachine(machineModel)
assertAll(
{ assertFalse(hypervisor.canFit(machineModel)) },
- { assertThrows<IllegalArgumentException> { hypervisor.createMachine(machineModel) } }
+ { assertThrows<IllegalArgumentException> { hypervisor.newMachine(machineModel) } }
)
- machine.close()
+ machine.cancel()
}
/**
@@ -192,16 +195,16 @@ internal class SimSpaceSharedHypervisorTest {
)
val hypervisor = SimSpaceSharedHypervisor(interpreter, null, null)
- launch { machine.run(hypervisor) }
+ launch { machine.runWorkload(hypervisor) }
yield()
- hypervisor.createMachine(machineModel).close()
+ hypervisor.removeMachine(hypervisor.newMachine(machineModel))
assertAll(
{ assertTrue(hypervisor.canFit(machineModel)) },
- { assertDoesNotThrow { hypervisor.createMachine(machineModel) } }
+ { assertDoesNotThrow { hypervisor.newMachine(machineModel) } }
)
- machine.close()
+ machine.cancel()
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt
index 574860e8..aa91984a 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt
@@ -30,6 +30,7 @@ import org.opendc.simulator.compute.SimBareMetalMachine
import org.opendc.simulator.compute.model.*
import org.opendc.simulator.compute.power.ConstantPowerModel
import org.opendc.simulator.compute.power.SimplePowerDriver
+import org.opendc.simulator.compute.runWorkload
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.flow.FlowEngine
@@ -67,13 +68,9 @@ class SimTraceWorkloadTest {
offset = 0
)
- try {
- machine.run(workload)
+ machine.runWorkload(workload)
- assertEquals(4000, clock.millis())
- } finally {
- machine.close()
- }
+ assertEquals(4000, clock.millis())
}
@Test
@@ -94,13 +91,9 @@ class SimTraceWorkloadTest {
offset = 1000
)
- try {
- machine.run(workload)
+ machine.runWorkload(workload)
- assertEquals(5000, clock.millis())
- } finally {
- machine.close()
- }
+ assertEquals(5000, clock.millis())
}
@Test
@@ -121,14 +114,10 @@ class SimTraceWorkloadTest {
offset = 0
)
- try {
- delay(1000L)
- machine.run(workload)
+ delay(1000L)
+ machine.runWorkload(workload)
- assertEquals(4000, clock.millis())
- } finally {
- machine.close()
- }
+ assertEquals(4000, clock.millis())
}
@Test
@@ -149,12 +138,8 @@ class SimTraceWorkloadTest {
offset = 0
)
- try {
- machine.run(workload)
+ machine.runWorkload(workload)
- assertEquals(4000, clock.millis())
- } finally {
- machine.close()
- }
+ assertEquals(4000, clock.millis())
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt
index 04ba7f21..a7877546 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt
@@ -62,13 +62,21 @@ public interface FlowMultiplexer {
public val counters: FlowCounters
/**
- * Create a new input on this multiplexer.
+ * Create a new input on this multiplexer with a coupled capacity.
*
* @param key The key of the interference member to which the input belongs.
*/
public fun newInput(key: InterferenceKey? = null): FlowConsumer
/**
+ * Create a new input on this multiplexer with the specified [capacity].
+ *
+ * @param capacity The capacity of the input.
+ * @param key The key of the interference member to which the input belongs.
+ */
+ public fun newInput(capacity: Double, key: InterferenceKey? = null): FlowConsumer
+
+ /**
* Remove [input] from this multiplexer.
*/
public fun removeInput(input: FlowConsumer)
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
index 125d10fe..b68a8baa 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
@@ -78,6 +78,8 @@ public class ForwardingFlowMultiplexer(private val engine: FlowEngine) : FlowMul
return input
}
+ override fun newInput(capacity: Double, key: InterferenceKey?): FlowConsumer = newInput(key)
+
override fun removeInput(input: FlowConsumer) {
if (!_inputs.remove(input)) {
return
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
index a0fb8a4e..3d26efda 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
@@ -86,7 +86,15 @@ public class MaxMinFlowMultiplexer(
private val scheduler = Scheduler(engine, parent)
override fun newInput(key: InterferenceKey?): FlowConsumer {
- val provider = Input(engine, scheduler, interferenceDomain, key, scheduler.capacity)
+ return newInput(isCoupled = true, Double.POSITIVE_INFINITY, key)
+ }
+
+ override fun newInput(capacity: Double, key: InterferenceKey?): FlowConsumer {
+ return newInput(isCoupled = false, capacity, key)
+ }
+
+ private fun newInput(isCoupled: Boolean, initialCapacity: Double, key: InterferenceKey?): FlowConsumer {
+ val provider = Input(engine, scheduler, interferenceDomain, key, isCoupled, initialCapacity)
_inputs.add(provider)
return provider
}
@@ -206,7 +214,10 @@ public class MaxMinFlowMultiplexer(
// Disable timers and convergence of the source if one of the output manages it
input.shouldConsumerConverge = !hasActivationOutput
input.enableTimers = !hasActivationOutput
- input.capacity = capacity
+
+ if (input.isCoupled) {
+ input.capacity = capacity
+ }
trigger(_clock.millis())
}
@@ -340,7 +351,9 @@ public class MaxMinFlowMultiplexer(
capacity = newCapacity
for (input in _activeInputs) {
- input.capacity = newCapacity
+ if (input.isCoupled) {
+ input.capacity = newCapacity
+ }
}
// Sort outputs by their capacity
@@ -495,6 +508,7 @@ public class MaxMinFlowMultiplexer(
private val scheduler: Scheduler,
private val interferenceDomain: InterferenceDomain?,
@JvmField val key: InterferenceKey?,
+ @JvmField val isCoupled: Boolean,
initialCapacity: Double,
) : FlowConsumer, FlowConsumerLogic, Comparable<Input> {
/**
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt
index 738ec38b..418dc201 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt
@@ -20,6 +20,8 @@
* SOFTWARE.
*/
+@file:Suppress("PropertyName")
+
package org.opendc.telemetry.compute
import io.opentelemetry.api.common.AttributeKey
@@ -30,10 +32,9 @@ import io.opentelemetry.sdk.resources.Resource
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
import org.opendc.telemetry.compute.table.*
import java.time.Instant
-import kotlin.math.roundToLong
/**
- * Helper class responsible for aggregating [MetricData] into [ServiceData], [HostData] and [ServerData].
+ * Helper class responsible for aggregating [MetricData] into [ServiceTableReader], [HostTableReader] and [ServerTableReader].
*/
public class ComputeMetricAggregator {
private val _service = ServiceAggregator()
@@ -59,34 +60,28 @@ public class ComputeMetricAggregator {
service.recordTimestamp(point)
when (point.attributes[STATE_KEY]) {
- "up" -> service.hostsUp = point.value.toInt()
- "down" -> service.hostsDown = point.value.toInt()
+ "up" -> service._hostsUp = point.value.toInt()
+ "down" -> service._hostsDown = point.value.toInt()
}
}
}
"scheduler.servers" -> {
for (point in metric.longSumData.points) {
when (point.attributes[STATE_KEY]) {
- "pending" -> service.serversPending = point.value.toInt()
- "active" -> service.serversActive = point.value.toInt()
+ "pending" -> service._serversPending = point.value.toInt()
+ "active" -> service._serversActive = point.value.toInt()
}
}
}
"scheduler.attempts" -> {
for (point in metric.longSumData.points) {
when (point.attributes[RESULT_KEY]) {
- "success" -> service.attemptsSuccess = point.value.toInt()
- "failure" -> service.attemptsFailure = point.value.toInt()
- "error" -> service.attemptsError = point.value.toInt()
+ "success" -> service._attemptsSuccess = point.value.toInt()
+ "failure" -> service._attemptsFailure = point.value.toInt()
+ "error" -> service._attemptsError = point.value.toInt()
}
}
}
- "scheduler.latency" -> {
- for (point in metric.doubleHistogramData.points) {
- val server = getServer(servers, point) ?: continue
- server.schedulingLatency = (point.sum / point.count).roundToLong()
- }
- }
// SimHost
"system.guests" -> {
@@ -94,10 +89,10 @@ public class ComputeMetricAggregator {
for (point in metric.longSumData.points) {
when (point.attributes[STATE_KEY]) {
- "terminated" -> agg.guestsTerminated = point.value.toInt()
- "running" -> agg.guestsRunning = point.value.toInt()
- "error" -> agg.guestsRunning = point.value.toInt()
- "invalid" -> agg.guestsInvalid = point.value.toInt()
+ "terminated" -> agg._guestsTerminated = point.value.toInt()
+ "running" -> agg._guestsRunning = point.value.toInt()
+ "error" -> agg._guestsRunning = point.value.toInt()
+ "invalid" -> agg._guestsInvalid = point.value.toInt()
}
}
}
@@ -108,24 +103,24 @@ public class ComputeMetricAggregator {
val server = getServer(servers, point)
if (server != null) {
- server.cpuLimit = point.value
- server.host = agg.host
+ server._cpuLimit = point.value
+ server._host = agg.host
} else {
- agg.cpuLimit = point.value
+ agg._cpuLimit = point.value
}
}
}
"system.cpu.usage" -> {
val agg = getHost(hosts, resource) ?: continue
- agg.cpuUsage = metric.doubleGaugeData.points.first().value
+ agg._cpuUsage = metric.doubleGaugeData.points.first().value
}
"system.cpu.demand" -> {
val agg = getHost(hosts, resource) ?: continue
- agg.cpuDemand = metric.doubleGaugeData.points.first().value
+ agg._cpuDemand = metric.doubleGaugeData.points.first().value
}
"system.cpu.utilization" -> {
val agg = getHost(hosts, resource) ?: continue
- agg.cpuUtilization = metric.doubleGaugeData.points.first().value
+ agg._cpuUtilization = metric.doubleGaugeData.points.first().value
}
"system.cpu.time" -> {
val agg = getHost(hosts, resource) ?: continue
@@ -135,29 +130,29 @@ public class ComputeMetricAggregator {
val state = point.attributes[STATE_KEY]
if (server != null) {
when (state) {
- "active" -> server.cpuActiveTime = point.value
- "idle" -> server.cpuIdleTime = point.value
- "steal" -> server.cpuStealTime = point.value
- "lost" -> server.cpuLostTime = point.value
+ "active" -> server._cpuActiveTime = point.value
+ "idle" -> server._cpuIdleTime = point.value
+ "steal" -> server._cpuStealTime = point.value
+ "lost" -> server._cpuLostTime = point.value
}
- server.host = agg.host
+ server._host = agg.host
} else {
when (state) {
- "active" -> agg.cpuActiveTime = point.value
- "idle" -> agg.cpuIdleTime = point.value
- "steal" -> agg.cpuStealTime = point.value
- "lost" -> agg.cpuLostTime = point.value
+ "active" -> agg._cpuActiveTime = point.value
+ "idle" -> agg._cpuIdleTime = point.value
+ "steal" -> agg._cpuStealTime = point.value
+ "lost" -> agg._cpuLostTime = point.value
}
}
}
}
"system.power.usage" -> {
val agg = getHost(hosts, resource) ?: continue
- agg.powerUsage = metric.doubleGaugeData.points.first().value
+ agg._powerUsage = metric.doubleGaugeData.points.first().value
}
"system.power.total" -> {
val agg = getHost(hosts, resource) ?: continue
- agg.powerTotal = metric.doubleSumData.points.first().value
+ agg._powerTotal = metric.doubleSumData.points.first().value
}
"system.time" -> {
val agg = getHost(hosts, resource) ?: continue
@@ -169,16 +164,16 @@ public class ComputeMetricAggregator {
server.recordTimestamp(point)
when (point.attributes[STATE_KEY]) {
- "up" -> server.uptime = point.value
- "down" -> server.downtime = point.value
+ "up" -> server._uptime = point.value
+ "down" -> server._downtime = point.value
}
- server.host = agg.host
+ server._host = agg.host
} else {
agg.recordTimestamp(point)
when (point.attributes[STATE_KEY]) {
- "up" -> agg.uptime = point.value
- "down" -> agg.downtime = point.value
+ "up" -> agg._uptime = point.value
+ "down" -> agg._downtime = point.value
}
}
}
@@ -190,13 +185,20 @@ public class ComputeMetricAggregator {
val server = getServer(servers, point)
if (server != null) {
- server.bootTime = point.value
- server.host = agg.host
+ server._bootTime = Instant.ofEpochMilli(point.value)
+ server._host = agg.host
} else {
- agg.bootTime = point.value
+ agg._bootTime = Instant.ofEpochMilli(point.value)
}
}
}
+ "system.time.provision" -> {
+ for (point in metric.longGaugeData.points) {
+ val server = getServer(servers, point) ?: continue
+ server.recordTimestamp(point)
+ server._provisionTime = Instant.ofEpochMilli(point.value)
+ }
+ }
}
}
}
@@ -205,14 +207,16 @@ public class ComputeMetricAggregator {
* Collect the data via the [monitor].
*/
public fun collect(monitor: ComputeMonitor) {
- monitor.record(_service.collect())
+ monitor.record(_service)
for (host in _hosts.values) {
- monitor.record(host.collect())
+ monitor.record(host)
+ host.reset()
}
for (server in _servers.values) {
- monitor.record(server.collect())
+ monitor.record(server)
+ server.reset()
}
}
@@ -222,7 +226,7 @@ public class ComputeMetricAggregator {
private fun getHost(hosts: MutableMap<String, HostAggregator>, resource: Resource): HostAggregator? {
val id = resource.attributes[HOST_ID]
return if (id != null) {
- hosts.computeIfAbsent(id) { HostAggregator(resource) }
+ hosts.getOrPut(id) { HostAggregator(resource) }
} else {
null
}
@@ -234,7 +238,7 @@ public class ComputeMetricAggregator {
private fun getServer(servers: MutableMap<String, ServerAggregator>, point: PointData): ServerAggregator? {
val id = point.attributes[ResourceAttributes.HOST_ID]
return if (id != null) {
- servers.computeIfAbsent(id) { ServerAggregator(point.attributes) }
+ servers.getOrPut(id) { ServerAggregator(point.attributes) }
} else {
null
}
@@ -243,50 +247,55 @@ public class ComputeMetricAggregator {
/**
* An aggregator for service metrics before they are reported.
*/
- internal class ServiceAggregator {
- private var timestamp = Long.MIN_VALUE
+ internal class ServiceAggregator : ServiceTableReader {
+ private var _timestamp: Instant = Instant.MIN
+ override val timestamp: Instant
+ get() = _timestamp
- @JvmField var hostsUp = 0
- @JvmField var hostsDown = 0
+ override val hostsUp: Int
+ get() = _hostsUp
+ @JvmField var _hostsUp = 0
- @JvmField var serversPending = 0
- @JvmField var serversActive = 0
+ override val hostsDown: Int
+ get() = _hostsDown
+ @JvmField var _hostsDown = 0
- @JvmField var attemptsSuccess = 0
- @JvmField var attemptsFailure = 0
- @JvmField var attemptsError = 0
+ override val serversPending: Int
+ get() = _serversPending
+ @JvmField var _serversPending = 0
- /**
- * Finish the aggregation for this cycle.
- */
- fun collect(): ServiceData {
- val now = Instant.ofEpochMilli(timestamp)
- return toServiceData(now)
- }
+ override val serversActive: Int
+ get() = _serversActive
+ @JvmField var _serversActive = 0
- /**
- * Convert the aggregator state to an immutable [ServiceData].
- */
- private fun toServiceData(now: Instant): ServiceData {
- return ServiceData(now, hostsUp, hostsDown, serversPending, serversActive, attemptsSuccess, attemptsFailure, attemptsError)
- }
+ override val attemptsSuccess: Int
+ get() = _attemptsSuccess
+ @JvmField var _attemptsSuccess = 0
+
+ override val attemptsFailure: Int
+ get() = _attemptsFailure
+ @JvmField var _attemptsFailure = 0
+
+ override val attemptsError: Int
+ get() = _attemptsError
+ @JvmField var _attemptsError = 0
/**
* Record the timestamp of a [point] for this aggregator.
*/
fun recordTimestamp(point: PointData) {
- timestamp = point.epochNanos / 1_000_000L // ns to ms
+ _timestamp = Instant.ofEpochMilli(point.epochNanos / 1_000_000L) // ns to ms
}
}
/**
* An aggregator for host metrics before they are reported.
*/
- internal class HostAggregator(resource: Resource) {
+ internal class HostAggregator(resource: Resource) : HostTableReader {
/**
* The static information about this host.
*/
- val host = HostInfo(
+ override val host = HostInfo(
resource.attributes[HOST_ID]!!,
resource.attributes[HOST_NAME] ?: "",
resource.attributes[HOST_ARCH] ?: "",
@@ -294,111 +303,127 @@ public class ComputeMetricAggregator {
resource.attributes[HOST_MEM_CAPACITY] ?: 0,
)
- private var timestamp = Long.MIN_VALUE
+ override val timestamp: Instant
+ get() = _timestamp
+ private var _timestamp = Instant.MIN
+
+ override val guestsTerminated: Int
+ get() = _guestsTerminated
+ @JvmField var _guestsTerminated = 0
+
+ override val guestsRunning: Int
+ get() = _guestsRunning
+ @JvmField var _guestsRunning = 0
+
+ override val guestsError: Int
+ get() = _guestsError
+ @JvmField var _guestsError = 0
+
+ override val guestsInvalid: Int
+ get() = _guestsInvalid
+ @JvmField var _guestsInvalid = 0
+
+ override val cpuLimit: Double
+ get() = _cpuLimit
+ @JvmField var _cpuLimit = 0.0
- @JvmField var guestsTerminated = 0
- @JvmField var guestsRunning = 0
- @JvmField var guestsError = 0
- @JvmField var guestsInvalid = 0
+ override val cpuUsage: Double
+ get() = _cpuUsage
+ @JvmField var _cpuUsage = 0.0
- @JvmField var cpuLimit = 0.0
- @JvmField var cpuUsage = 0.0
- @JvmField var cpuDemand = 0.0
- @JvmField var cpuUtilization = 0.0
+ override val cpuDemand: Double
+ get() = _cpuDemand
+ @JvmField var _cpuDemand = 0.0
- @JvmField var cpuActiveTime = 0L
- @JvmField var cpuIdleTime = 0L
- @JvmField var cpuStealTime = 0L
- @JvmField var cpuLostTime = 0L
+ override val cpuUtilization: Double
+ get() = _cpuUtilization
+ @JvmField var _cpuUtilization = 0.0
+
+ override val cpuActiveTime: Long
+ get() = _cpuActiveTime - previousCpuActiveTime
+ @JvmField var _cpuActiveTime = 0L
private var previousCpuActiveTime = 0L
+
+ override val cpuIdleTime: Long
+ get() = _cpuIdleTime - previousCpuIdleTime
+ @JvmField var _cpuIdleTime = 0L
private var previousCpuIdleTime = 0L
+
+ override val cpuStealTime: Long
+ get() = _cpuStealTime - previousCpuStealTime
+ @JvmField var _cpuStealTime = 0L
private var previousCpuStealTime = 0L
+
+ override val cpuLostTime: Long
+ get() = _cpuLostTime - previousCpuLostTime
+ @JvmField var _cpuLostTime = 0L
private var previousCpuLostTime = 0L
- @JvmField var powerUsage = 0.0
- @JvmField var powerTotal = 0.0
+ override val powerUsage: Double
+ get() = _powerUsage
+ @JvmField var _powerUsage = 0.0
+
+ override val powerTotal: Double
+ get() = _powerTotal - previousPowerTotal
+ @JvmField var _powerTotal = 0.0
private var previousPowerTotal = 0.0
- @JvmField var uptime = 0L
+ override val uptime: Long
+ get() = _uptime - previousUptime
+ @JvmField var _uptime = 0L
private var previousUptime = 0L
- @JvmField var downtime = 0L
+
+ override val downtime: Long
+ get() = _downtime - previousDowntime
+ @JvmField var _downtime = 0L
private var previousDowntime = 0L
- @JvmField var bootTime = Long.MIN_VALUE
+
+ override val bootTime: Instant?
+ get() = _bootTime
+ @JvmField var _bootTime: Instant? = null
/**
* Finish the aggregation for this cycle.
*/
- fun collect(): HostData {
- val now = Instant.ofEpochMilli(timestamp)
- val data = toHostData(now)
-
+ fun reset() {
// Reset intermediate state for next aggregation
- previousCpuActiveTime = cpuActiveTime
- previousCpuIdleTime = cpuIdleTime
- previousCpuStealTime = cpuStealTime
- previousCpuLostTime = cpuLostTime
- previousPowerTotal = powerTotal
- previousUptime = uptime
- previousDowntime = downtime
-
- guestsTerminated = 0
- guestsRunning = 0
- guestsError = 0
- guestsInvalid = 0
-
- cpuLimit = 0.0
- cpuUsage = 0.0
- cpuDemand = 0.0
- cpuUtilization = 0.0
-
- powerUsage = 0.0
-
- return data
- }
-
- /**
- * Convert the aggregator state to an immutable [HostData] instance.
- */
- private fun toHostData(now: Instant): HostData {
- return HostData(
- now,
- host,
- guestsTerminated,
- guestsRunning,
- guestsError,
- guestsInvalid,
- cpuLimit,
- cpuUsage,
- cpuDemand,
- cpuUtilization,
- cpuActiveTime - previousCpuActiveTime,
- cpuIdleTime - previousCpuIdleTime,
- cpuStealTime - previousCpuStealTime,
- cpuLostTime - previousCpuLostTime,
- powerUsage,
- powerTotal - previousPowerTotal,
- uptime - previousUptime,
- downtime - previousDowntime,
- if (bootTime != Long.MIN_VALUE) Instant.ofEpochMilli(bootTime) else null
- )
+ previousCpuActiveTime = _cpuActiveTime
+ previousCpuIdleTime = _cpuIdleTime
+ previousCpuStealTime = _cpuStealTime
+ previousCpuLostTime = _cpuLostTime
+ previousPowerTotal = _powerTotal
+ previousUptime = _uptime
+ previousDowntime = _downtime
+
+ _guestsTerminated = 0
+ _guestsRunning = 0
+ _guestsError = 0
+ _guestsInvalid = 0
+
+ _cpuLimit = 0.0
+ _cpuUsage = 0.0
+ _cpuDemand = 0.0
+ _cpuUtilization = 0.0
+
+ _powerUsage = 0.0
}
/**
* Record the timestamp of a [point] for this aggregator.
*/
fun recordTimestamp(point: PointData) {
- timestamp = point.epochNanos / 1_000_000L // ns to ms
+ _timestamp = Instant.ofEpochMilli(point.epochNanos / 1_000_000L) // ns to ms
}
}
/**
* An aggregator for server metrics before they are reported.
*/
- internal class ServerAggregator(attributes: Attributes) {
+ internal class ServerAggregator(attributes: Attributes) : ServerTableReader {
/**
* The static information about this server.
*/
- val server = ServerInfo(
+ override val server = ServerInfo(
attributes[ResourceAttributes.HOST_ID]!!,
attributes[ResourceAttributes.HOST_NAME]!!,
attributes[ResourceAttributes.HOST_TYPE]!!,
@@ -412,70 +437,76 @@ public class ComputeMetricAggregator {
/**
* The [HostInfo] of the host on which the server is hosted.
*/
- @JvmField var host: HostInfo? = null
+ override val host: HostInfo?
+ get() = _host
+ @JvmField var _host: HostInfo? = null
+
+ private var _timestamp = Instant.MIN
+ override val timestamp: Instant
+ get() = _timestamp
- private var timestamp = Long.MIN_VALUE
- @JvmField var uptime: Long = 0
+ override val uptime: Long
+ get() = _uptime - previousUptime
+ @JvmField var _uptime: Long = 0
private var previousUptime = 0L
- @JvmField var downtime: Long = 0
+
+ override val downtime: Long
+ get() = _downtime - previousDowntime
+ @JvmField var _downtime: Long = 0
private var previousDowntime = 0L
- @JvmField var bootTime: Long = 0
- @JvmField var schedulingLatency = 0L
- @JvmField var cpuLimit = 0.0
- @JvmField var cpuActiveTime = 0L
- @JvmField var cpuIdleTime = 0L
- @JvmField var cpuStealTime = 0L
- @JvmField var cpuLostTime = 0L
+
+ override val provisionTime: Instant?
+ get() = _provisionTime
+ @JvmField var _provisionTime: Instant? = null
+
+ override val bootTime: Instant?
+ get() = _bootTime
+ @JvmField var _bootTime: Instant? = null
+
+ override val cpuLimit: Double
+ get() = _cpuLimit
+ @JvmField var _cpuLimit = 0.0
+
+ override val cpuActiveTime: Long
+ get() = _cpuActiveTime - previousCpuActiveTime
+ @JvmField var _cpuActiveTime = 0L
private var previousCpuActiveTime = 0L
+
+ override val cpuIdleTime: Long
+ get() = _cpuIdleTime - previousCpuIdleTime
+ @JvmField var _cpuIdleTime = 0L
private var previousCpuIdleTime = 0L
+
+ override val cpuStealTime: Long
+ get() = _cpuStealTime - previousCpuStealTime
+ @JvmField var _cpuStealTime = 0L
private var previousCpuStealTime = 0L
+
+ override val cpuLostTime: Long
+ get() = _cpuLostTime - previousCpuLostTime
+ @JvmField var _cpuLostTime = 0L
private var previousCpuLostTime = 0L
/**
* Finish the aggregation for this cycle.
*/
- fun collect(): ServerData {
- val now = Instant.ofEpochMilli(timestamp)
- val data = toServerData(now)
-
- previousUptime = uptime
- previousDowntime = downtime
+ fun reset() {
+ previousUptime = _uptime
+ previousDowntime = _downtime
previousCpuActiveTime = cpuActiveTime
previousCpuIdleTime = cpuIdleTime
previousCpuStealTime = cpuStealTime
previousCpuLostTime = cpuLostTime
- host = null
- cpuLimit = 0.0
-
- return data
- }
-
- /**
- * Convert the aggregator state into an immutable [ServerData].
- */
- private fun toServerData(now: Instant): ServerData {
- return ServerData(
- now,
- server,
- host,
- uptime - previousUptime,
- downtime - previousDowntime,
- if (bootTime != Long.MIN_VALUE) Instant.ofEpochMilli(bootTime) else null,
- schedulingLatency,
- cpuLimit,
- cpuActiveTime - previousCpuActiveTime,
- cpuIdleTime - previousCpuIdleTime,
- cpuStealTime - previousCpuStealTime,
- cpuLostTime - previousCpuLostTime
- )
+ _host = null
+ _cpuLimit = 0.0
}
/**
* Record the timestamp of a [point] for this aggregator.
*/
fun recordTimestamp(point: PointData) {
- timestamp = point.epochNanos / 1_000_000L // ns to ms
+ _timestamp = Instant.ofEpochMilli(point.epochNanos / 1_000_000L) // ns to ms
}
}
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt
index d51bcab4..64b5f337 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt
@@ -22,26 +22,26 @@
package org.opendc.telemetry.compute
-import org.opendc.telemetry.compute.table.HostData
-import org.opendc.telemetry.compute.table.ServerData
-import org.opendc.telemetry.compute.table.ServiceData
+import org.opendc.telemetry.compute.table.HostTableReader
+import org.opendc.telemetry.compute.table.ServerTableReader
+import org.opendc.telemetry.compute.table.ServiceTableReader
/**
* A monitor that tracks the metrics and events of the OpenDC Compute service.
*/
public interface ComputeMonitor {
/**
- * Record the specified [data].
+ * Record an entry with the specified [reader].
*/
- public fun record(data: ServerData) {}
+ public fun record(reader: ServerTableReader) {}
/**
- * Record the specified [data].
+ * Record an entry with the specified [reader].
*/
- public fun record(data: HostData) {}
+ public fun record(reader: HostTableReader) {}
/**
- * Record the specified [data].
+ * Record an entry with the specified [reader].
*/
- public fun record(data: ServiceData) {}
+ public fun record(reader: ServiceTableReader) {}
}
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt
index ce89061b..41315b15 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt
@@ -24,6 +24,8 @@ package org.opendc.telemetry.compute
import io.opentelemetry.sdk.metrics.export.MetricProducer
import org.opendc.telemetry.compute.table.ServiceData
+import org.opendc.telemetry.compute.table.ServiceTableReader
+import org.opendc.telemetry.compute.table.toServiceData
/**
* Collect the metrics of the compute service.
@@ -32,8 +34,8 @@ public fun collectServiceMetrics(metricProducer: MetricProducer): ServiceData {
lateinit var serviceData: ServiceData
val agg = ComputeMetricAggregator()
val monitor = object : ComputeMonitor {
- override fun record(data: ServiceData) {
- serviceData = data
+ override fun record(reader: ServiceTableReader) {
+ serviceData = reader.toServiceData()
}
}
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostTableReader.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostTableReader.kt
new file mode 100644
index 00000000..1e1ad94e
--- /dev/null
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostTableReader.kt
@@ -0,0 +1,125 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.telemetry.compute.table
+
+import java.time.Instant
+
+/**
+ * An interface that is used to read a row of a host trace entry.
+ */
+public interface HostTableReader {
+ /**
+ * The timestamp of the current entry of the reader.
+ */
+ public val timestamp: Instant
+
+ /**
+ * The [HostInfo] of the host to which the row belongs to.
+ */
+ public val host: HostInfo
+
+ /**
+ * The number of guests that are in a terminated state.
+ */
+ public val guestsTerminated: Int
+
+ /**
+ * The number of guests that are in a running state.
+ */
+ public val guestsRunning: Int
+
+ /**
+ * The number of guests that are in an error state.
+ */
+ public val guestsError: Int
+
+ /**
+ * The number of guests that are in an unknown state.
+ */
+ public val guestsInvalid: Int
+
+ /**
+ * The capacity of the CPUs in the host (in MHz).
+ */
+ public val cpuLimit: Double
+
+ /**
+ * The usage of all CPUs in the host (in MHz).
+ */
+ public val cpuUsage: Double
+
+ /**
+ * The demand of all vCPUs of the guests (in MHz)
+ */
+ public val cpuDemand: Double
+
+ /**
+ * The CPU utilization of the host.
+ */
+ public val cpuUtilization: Double
+
+ /**
+ * The duration (in seconds) that a CPU was active in the host.
+ */
+ public val cpuActiveTime: Long
+
+ /**
+ * The duration (in seconds) that a CPU was idle in the host.
+ */
+ public val cpuIdleTime: Long
+
+ /**
+ * The duration (in seconds) that a vCPU wanted to run, but no capacity was available.
+ */
+ public val cpuStealTime: Long
+
+ /**
+ * The duration (in seconds) of CPU time that was lost due to interference.
+ */
+ public val cpuLostTime: Long
+
+ /**
+ * The current power usage of the host in W.
+ */
+ public val powerUsage: Double
+
+ /**
+ * The total power consumption of the host since last time in J.
+ */
+ public val powerTotal: Double
+
+ /**
+ * The uptime of the host since last time in ms.
+ */
+ public val uptime: Long
+
+ /**
+ * The downtime of the host since last time in ms.
+ */
+ public val downtime: Long
+
+ /**
+ * The [Instant] at which the host booted.
+ */
+ public val bootTime: Instant?
+}
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerTableReader.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerTableReader.kt
new file mode 100644
index 00000000..c23d1467
--- /dev/null
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerTableReader.kt
@@ -0,0 +1,90 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.telemetry.compute.table
+
+import java.time.Instant
+
+/**
+ * An interface that is used to read a row of a server trace entry.
+ */
+public interface ServerTableReader {
+ /**
+ * The timestamp of the current entry of the reader.
+ */
+ public val timestamp: Instant
+
+ /**
+ * The [ServerInfo] of the server to which the row belongs to.
+ */
+ public val server: ServerInfo
+
+ /**
+ * The [HostInfo] of the host on which the server is hosted or `null` if it has no host.
+ */
+ public val host: HostInfo?
+
+ /**
+ * The uptime of the host since last time in ms.
+ */
+ public val uptime: Long
+
+ /**
+ * The downtime of the host since last time in ms.
+ */
+ public val downtime: Long
+
+ /**
+ * The [Instant] at which the server was enqueued for the scheduler.
+ */
+ public val provisionTime: Instant?
+
+ /**
+ * The [Instant] at which the server booted.
+ */
+ public val bootTime: Instant?
+
+ /**
+ * The capacity of the CPUs of the servers (in MHz).
+ */
+ public val cpuLimit: Double
+
+ /**
+ * The duration (in seconds) that a CPU was active in the server.
+ */
+ public val cpuActiveTime: Long
+
+ /**
+ * The duration (in seconds) that a CPU was idle in the server.
+ */
+ public val cpuIdleTime: Long
+
+ /**
+ * The duration (in seconds) that a vCPU wanted to run, but no capacity was available.
+ */
+ public val cpuStealTime: Long
+
+ /**
+ * The duration (in seconds) of CPU time that was lost due to interference.
+ */
+ public val cpuLostTime: Long
+}
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt
index 6db1399d..39bf96f4 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt
@@ -37,3 +37,10 @@ public data class ServiceData(
val attemptsFailure: Int,
val attemptsError: Int
)
+
+/**
+ * Convert a [ServiceTableReader] into a persistent object.
+ */
+public fun ServiceTableReader.toServiceData(): ServiceData {
+ return ServiceData(timestamp, hostsUp, hostsDown, serversPending, serversActive, attemptsSuccess, attemptsFailure, attemptsError)
+}
diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReaderTest.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceTableReader.kt
index c79f0584..908f6748 100644
--- a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReaderTest.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceTableReader.kt
@@ -20,26 +20,51 @@
* SOFTWARE.
*/
-package org.opendc.compute.workload.util
+package org.opendc.telemetry.compute.table
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertAll
+import java.time.Instant
/**
- * Test suite for the [PerformanceInterferenceReader] class.
+ * An interface that is used to read a row of a service trace entry.
*/
-class PerformanceInterferenceReaderTest {
- @Test
- fun testSmoke() {
- val input = checkNotNull(PerformanceInterferenceReader::class.java.getResourceAsStream("/perf-interference.json"))
- val result = PerformanceInterferenceReader().read(input)
-
- assertAll(
- { assertEquals(2, result.size) },
- { assertEquals(setOf("vm_a", "vm_c", "vm_x", "vm_y"), result[0].members) },
- { assertEquals(0.0, result[0].targetLoad, 0.001) },
- { assertEquals(0.8830158730158756, result[0].score, 0.001) }
- )
- }
+public interface ServiceTableReader {
+ /**
+ * The timestamp of the current entry of the reader.
+ */
+ public val timestamp: Instant
+
+ /**
+ * The number of hosts that are up at this instant.
+ */
+ public val hostsUp: Int
+
+ /**
+ * The number of hosts that are down at this instant.
+ */
+ public val hostsDown: Int
+
+ /**
+ * The number of servers that are pending to be scheduled.
+ */
+ public val serversPending: Int
+
+ /**
+ * The number of servers that are currently active.
+ */
+ public val serversActive: Int
+
+ /**
+ * The scheduling attempts that were successful.
+ */
+ public val attemptsSuccess: Int
+
+ /**
+ * The scheduling attempts that were unsuccessful due to client error.
+ */
+ public val attemptsFailure: Int
+
+ /**
+ * The scheduling attempts that were unsuccessful due to scheduler error.
+ */
+ public val attemptsError: Int
}
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
index 59308e11..a1bc869e 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
@@ -32,7 +32,7 @@ import org.opendc.compute.workload.*
import org.opendc.compute.workload.topology.HostSpec
import org.opendc.compute.workload.topology.Topology
import org.opendc.compute.workload.topology.apply
-import org.opendc.compute.workload.util.PerformanceInterferenceReader
+import org.opendc.compute.workload.util.VmInterferenceModelReader
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.MemoryUnit
@@ -131,7 +131,7 @@ class RunnerCli : CliktCommand(name = "runner") {
logger.info { "Constructing performance interference model" }
val workloadLoader = ComputeWorkloadLoader(tracePath)
- val interferenceGroups = let {
+ val interferenceModel = let {
val path = tracePath.resolve(scenario.trace.traceId).resolve("performance-interference-model.json")
val operational = scenario.operationalPhenomena
val enabled = operational.performanceInterferenceEnabled
@@ -140,15 +140,14 @@ class RunnerCli : CliktCommand(name = "runner") {
return@let null
}
- PerformanceInterferenceReader().read(path.inputStream())
+ VmInterferenceModelReader().read(path.inputStream())
}
val targets = portfolio.targets
val results = (0 until targets.repeatsPerScenario).map { repeat ->
logger.info { "Starting repeat $repeat" }
withTimeout(runTimeout * 1000) {
- val interferenceModel = interferenceGroups?.let { VmInterferenceModel(it, Random(repeat.toLong())) }
- runRepeat(scenario, repeat, topology, workloadLoader, interferenceModel)
+ runRepeat(scenario, repeat, topology, workloadLoader, interferenceModel?.withSeed(repeat.toLong()))
}
}
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMetricExporter.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMetricExporter.kt
index 7913660d..d39a0c74 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMetricExporter.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMetricExporter.kt
@@ -24,8 +24,8 @@ package org.opendc.web.runner
import org.opendc.telemetry.compute.ComputeMetricExporter
import org.opendc.telemetry.compute.ComputeMonitor
-import org.opendc.telemetry.compute.table.HostData
-import org.opendc.telemetry.compute.table.ServiceData
+import org.opendc.telemetry.compute.table.HostTableReader
+import org.opendc.telemetry.compute.table.ServiceTableReader
import kotlin.math.max
import kotlin.math.roundToLong
@@ -33,24 +33,24 @@ import kotlin.math.roundToLong
* A [ComputeMonitor] that tracks the aggregate metrics for each repeat.
*/
class WebComputeMetricExporter : ComputeMetricExporter() {
- override fun record(data: HostData) {
- val slices = data.downtime / SLICE_LENGTH
+ override fun record(reader: HostTableReader) {
+ val slices = reader.downtime / SLICE_LENGTH
hostAggregateMetrics = AggregateHostMetrics(
- hostAggregateMetrics.totalActiveTime + data.cpuActiveTime,
- hostAggregateMetrics.totalIdleTime + data.cpuIdleTime,
- hostAggregateMetrics.totalStealTime + data.cpuStealTime,
- hostAggregateMetrics.totalLostTime + data.cpuLostTime,
- hostAggregateMetrics.totalPowerDraw + data.powerTotal,
+ hostAggregateMetrics.totalActiveTime + reader.cpuActiveTime,
+ hostAggregateMetrics.totalIdleTime + reader.cpuIdleTime,
+ hostAggregateMetrics.totalStealTime + reader.cpuStealTime,
+ hostAggregateMetrics.totalLostTime + reader.cpuLostTime,
+ hostAggregateMetrics.totalPowerDraw + reader.powerTotal,
hostAggregateMetrics.totalFailureSlices + slices,
- hostAggregateMetrics.totalFailureVmSlices + data.guestsRunning * slices
+ hostAggregateMetrics.totalFailureVmSlices + reader.guestsRunning * slices
)
- hostMetrics.compute(data.host.id) { _, prev ->
+ hostMetrics.compute(reader.host.id) { _, prev ->
HostMetrics(
- data.cpuUsage + (prev?.cpuUsage ?: 0.0),
- data.cpuDemand + (prev?.cpuDemand ?: 0.0),
- data.guestsRunning + (prev?.instanceCount ?: 0),
+ reader.cpuUsage + (prev?.cpuUsage ?: 0.0),
+ reader.cpuDemand + (prev?.cpuDemand ?: 0.0),
+ reader.guestsRunning + (prev?.instanceCount ?: 0),
1 + (prev?.count ?: 0)
)
}
@@ -79,13 +79,13 @@ class WebComputeMetricExporter : ComputeMetricExporter() {
private var serviceMetrics: AggregateServiceMetrics = AggregateServiceMetrics()
- override fun record(data: ServiceData) {
+ override fun record(reader: ServiceTableReader) {
serviceMetrics = AggregateServiceMetrics(
- max(data.attemptsSuccess, serviceMetrics.vmTotalCount),
- max(data.serversPending, serviceMetrics.vmWaitingCount),
- max(data.serversActive, serviceMetrics.vmActiveCount),
+ max(reader.attemptsSuccess, serviceMetrics.vmTotalCount),
+ max(reader.serversPending, serviceMetrics.vmWaitingCount),
+ max(reader.serversActive, serviceMetrics.vmActiveCount),
max(0, serviceMetrics.vmInactiveCount),
- max(data.attemptsFailure, serviceMetrics.vmFailedCount),
+ max(reader.attemptsFailure, serviceMetrics.vmFailedCount),
)
}