summaryrefslogtreecommitdiff
path: root/opendc-compute
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-compute')
-rw-r--r--opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt2
-rw-r--r--opendc-compute/opendc-compute-service/build.gradle.kts1
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt10
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt2
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt154
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt18
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt64
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt50
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuFilter.kt (renamed from opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/ComputeCapabilitiesFilter.kt)23
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreRamWeigher.kt (renamed from opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreMemoryWeigher.kt)8
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/HostWeigher.kt40
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/InstanceCountWeigher.kt2
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RamWeigher.kt (renamed from opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/MemoryWeigher.kt)9
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuWeigher.kt (renamed from opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/ProvisionedCoresWeigher.kt)15
-rw-r--r--opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt40
-rw-r--r--opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt87
-rw-r--r--opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt407
-rw-r--r--opendc-compute/opendc-compute-simulator/build.gradle.kts4
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt543
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFault.kt36
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFaultInjector.kt65
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StartStopHostFault.kt55
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt44
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/VictimSelector.kt35
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt350
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt38
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt103
-rw-r--r--opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt208
-rw-r--r--opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/failure/HostFaultInjectorTest.kt111
-rw-r--r--opendc-compute/opendc-compute-workload/build.gradle.kts47
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeSchedulers.kt86
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt (renamed from opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RandomWeigher.kt)15
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt199
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt221
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt62
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModel.kt39
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModels.kt70
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt49
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt71
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt145
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt102
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt95
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt66
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt66
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt143
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt61
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt37
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/HostSpec.kt48
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/Topology.kt33
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/TopologyHelpers.kt36
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReader.kt68
-rw-r--r--opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReaderTest.kt45
-rw-r--r--opendc-compute/opendc-compute-workload/src/test/resources/perf-interference.json22
53 files changed, 3887 insertions, 463 deletions
diff --git a/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt b/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt
index baa1ba2f..577fbc73 100644
--- a/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt
+++ b/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt
@@ -45,7 +45,7 @@ public interface ComputeClient : AutoCloseable {
*
* @param name The name of the flavor.
* @param cpuCount The amount of CPU cores for this flavor.
- * @param memorySize The size of the memory.
+ * @param memorySize The size of the memory in MB.
* @param labels The identifying labels of the image.
* @param meta The non-identifying meta-data of the image.
*/
diff --git a/opendc-compute/opendc-compute-service/build.gradle.kts b/opendc-compute/opendc-compute-service/build.gradle.kts
index e0e48b0f..33cafc45 100644
--- a/opendc-compute/opendc-compute-service/build.gradle.kts
+++ b/opendc-compute/opendc-compute-service/build.gradle.kts
@@ -35,6 +35,7 @@ dependencies {
api(projects.opendcTelemetry.opendcTelemetryApi)
implementation(projects.opendcUtils)
implementation(libs.kotlin.logging)
+ implementation(libs.opentelemetry.semconv)
testImplementation(projects.opendcSimulator.opendcSimulatorCore)
testRuntimeOnly(libs.log4j.slf4j)
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
index 1873eb99..2a1fbaa0 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
@@ -23,11 +23,13 @@
package org.opendc.compute.service
import io.opentelemetry.api.metrics.Meter
+import io.opentelemetry.api.metrics.MeterProvider
import org.opendc.compute.api.ComputeClient
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.internal.ComputeServiceImpl
import org.opendc.compute.service.scheduler.ComputeScheduler
import java.time.Clock
+import java.time.Duration
import kotlin.coroutines.CoroutineContext
/**
@@ -70,16 +72,18 @@ public interface ComputeService : AutoCloseable {
*
* @param context The [CoroutineContext] to use in the service.
* @param clock The clock instance to use.
+ * @param meterProvider The [MeterProvider] for creating a [Meter] for the service.
* @param scheduler The scheduler implementation to use.
+ * @param schedulingQuantum The interval between scheduling cycles.
*/
public operator fun invoke(
context: CoroutineContext,
clock: Clock,
- meter: Meter,
+ meterProvider: MeterProvider,
scheduler: ComputeScheduler,
- schedulingQuantum: Long = 300000,
+ schedulingQuantum: Duration = Duration.ofMinutes(5),
): ComputeService {
- return ComputeServiceImpl(context, clock, meter, scheduler, schedulingQuantum)
+ return ComputeServiceImpl(context, clock, meterProvider, scheduler, schedulingQuantum)
}
}
}
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt
index 5632a55e..fc092a3f 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt
@@ -25,7 +25,7 @@ package org.opendc.compute.service.driver
/**
* Describes the static machine properties of the host.
*
- * @property vcpuCount The number of logical processing cores available for this host.
+ * @property cpuCount The number of logical processing cores available for this host.
* @property memorySize The amount of memory available for this host in MB.
*/
public data class HostModel(public val cpuCount: Int, public val memorySize: Long)
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
index 8af5f86e..57e70fcd 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
@@ -22,7 +22,10 @@
package org.opendc.compute.service.internal
+import io.opentelemetry.api.common.AttributeKey
+import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.metrics.Meter
+import io.opentelemetry.api.metrics.MeterProvider
import kotlinx.coroutines.*
import mu.KotlinLogging
import org.opendc.compute.api.*
@@ -33,6 +36,7 @@ import org.opendc.compute.service.driver.HostState
import org.opendc.compute.service.scheduler.ComputeScheduler
import org.opendc.utils.TimerScheduler
import java.time.Clock
+import java.time.Duration
import java.util.*
import kotlin.coroutines.CoroutineContext
import kotlin.math.max
@@ -40,15 +44,18 @@ import kotlin.math.max
/**
* Internal implementation of the OpenDC Compute service.
*
- * @param context The [CoroutineContext] to use.
- * @param clock The clock instance to keep track of time.
+ * @param context The [CoroutineContext] to use in the service.
+ * @param clock The clock instance to use.
+ * @param meterProvider The [MeterProvider] for creating a [Meter] for the service.
+ * @param scheduler The scheduler implementation to use.
+ * @param schedulingQuantum The interval between scheduling cycles.
*/
internal class ComputeServiceImpl(
private val context: CoroutineContext,
private val clock: Clock,
- private val meter: Meter,
+ meterProvider: MeterProvider,
private val scheduler: ComputeScheduler,
- private val schedulingQuantum: Long
+ private val schedulingQuantum: Duration
) : ComputeService, HostListener {
/**
* The [CoroutineScope] of the service bounded by the lifecycle of the service.
@@ -61,6 +68,11 @@ internal class ComputeServiceImpl(
private val logger = KotlinLogging.logger {}
/**
+ * The [Meter] to track metrics of the [ComputeService].
+ */
+ private val meter = meterProvider.get("org.opendc.compute.service")
+
+ /**
* The [Random] instance used to generate unique identifiers for the objects.
*/
private val random = Random(0)
@@ -104,60 +116,37 @@ internal class ComputeServiceImpl(
private var maxMemory = 0L
/**
- * The number of servers that have been submitted to the service for provisioning.
- */
- private val _submittedServers = meter.longCounterBuilder("servers.submitted")
- .setDescription("Number of start requests")
- .setUnit("1")
- .build()
-
- /**
- * The number of servers that failed to be scheduled.
- */
- private val _unscheduledServers = meter.longCounterBuilder("servers.unscheduled")
- .setDescription("Number of unscheduled servers")
- .setUnit("1")
- .build()
-
- /**
- * The number of servers that are waiting to be provisioned.
- */
- private val _waitingServers = meter.longUpDownCounterBuilder("servers.waiting")
- .setDescription("Number of servers waiting to be provisioned")
- .setUnit("1")
- .build()
-
- /**
- * The number of servers that are waiting to be provisioned.
+ * The number of scheduling attempts.
*/
- private val _runningServers = meter.longUpDownCounterBuilder("servers.active")
- .setDescription("Number of servers currently running")
+ private val _schedulingAttempts = meter.counterBuilder("scheduler.attempts")
+ .setDescription("Number of scheduling attempts")
.setUnit("1")
.build()
+ private val _schedulingAttemptsSuccess = _schedulingAttempts
+ .bind(Attributes.of(AttributeKey.stringKey("result"), "success"))
+ private val _schedulingAttemptsFailure = _schedulingAttempts
+ .bind(Attributes.of(AttributeKey.stringKey("result"), "failure"))
+ private val _schedulingAttemptsError = _schedulingAttempts
+ .bind(Attributes.of(AttributeKey.stringKey("result"), "error"))
/**
- * The number of servers that have finished running.
+ * The response time of the service.
*/
- private val _finishedServers = meter.longCounterBuilder("servers.finished")
- .setDescription("Number of servers that finished running")
- .setUnit("1")
+ private val _schedulingLatency = meter.histogramBuilder("scheduler.latency")
+ .setDescription("End to end latency for a server to be scheduled (in multiple attempts)")
+ .ofLongs()
+ .setUnit("ms")
.build()
/**
- * The number of hosts registered at the compute service.
+ * The number of servers that are pending.
*/
- private val _hostCount = meter.longUpDownCounterBuilder("hosts.total")
- .setDescription("Number of hosts")
- .setUnit("1")
- .build()
-
- /**
- * The number of available hosts registered at the compute service.
- */
- private val _availableHostCount = meter.longUpDownCounterBuilder("hosts.available")
- .setDescription("Number of available hosts")
+ private val _servers = meter.upDownCounterBuilder("scheduler.servers")
+ .setDescription("Number of servers managed by the scheduler")
.setUnit("1")
.build()
+ private val _serversPending = _servers.bind(Attributes.of(AttributeKey.stringKey("state"), "pending"))
+ private val _serversActive = _servers.bind(Attributes.of(AttributeKey.stringKey("state"), "active"))
/**
* The [TimerScheduler] to use for scheduling the scheduler cycles.
@@ -170,6 +159,22 @@ internal class ComputeServiceImpl(
override val hostCount: Int
get() = hostToView.size
+ init {
+ val upState = Attributes.of(AttributeKey.stringKey("state"), "up")
+ val downState = Attributes.of(AttributeKey.stringKey("state"), "down")
+
+ meter.upDownCounterBuilder("scheduler.hosts")
+ .setDescription("Number of hosts registered with the scheduler")
+ .setUnit("1")
+ .buildWithCallback { result ->
+ val total = hostCount
+ val available = availableHosts.size.toLong()
+
+ result.observe(available, upState)
+ result.observe(total - available, downState)
+ }
+ }
+
override fun newClient(): ComputeClient {
check(scope.isActive) { "Service is already closed" }
return object : ComputeClient {
@@ -297,24 +302,19 @@ internal class ComputeServiceImpl(
hostToView[host] = hv
if (host.state == HostState.UP) {
- _availableHostCount.add(1)
availableHosts += hv
}
scheduler.addHost(hv)
- _hostCount.add(1)
host.addListener(this)
}
override fun removeHost(host: Host) {
val view = hostToView.remove(host)
if (view != null) {
- if (availableHosts.remove(view)) {
- _availableHostCount.add(-1)
- }
+ availableHosts.remove(view)
scheduler.removeHost(view)
host.removeListener(this)
- _hostCount.add(-1)
}
}
@@ -325,10 +325,9 @@ internal class ComputeServiceImpl(
internal fun schedule(server: InternalServer): SchedulingRequest {
logger.debug { "Enqueueing server ${server.uid} to be assigned to host." }
- val request = SchedulingRequest(server)
+ val request = SchedulingRequest(server, clock.millis())
queue.add(request)
- _submittedServers.add(1)
- _waitingServers.add(1)
+ _serversPending.add(1)
requestSchedulingCycle()
return request
}
@@ -354,10 +353,12 @@ internal class ComputeServiceImpl(
return
}
+ val quantum = schedulingQuantum.toMillis()
+
// We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120).
// This is important because the slices of the VMs need to be aligned.
// We calculate here the delay until the next scheduling slot.
- val delay = schedulingQuantum - (clock.millis() % schedulingQuantum)
+ val delay = quantum - (clock.millis() % quantum)
timerScheduler.startSingleTimer(Unit, delay) {
doSchedule()
@@ -368,12 +369,13 @@ internal class ComputeServiceImpl(
* Run a single scheduling iteration.
*/
private fun doSchedule() {
+ val now = clock.millis()
while (queue.isNotEmpty()) {
val request = queue.peek()
if (request.isCancelled) {
queue.poll()
- _waitingServers.add(-1)
+ _serversPending.add(-1)
continue
}
@@ -385,12 +387,12 @@ internal class ComputeServiceImpl(
if (server.flavor.memorySize > maxMemory || server.flavor.cpuCount > maxCores) {
// Remove the incoming image
queue.poll()
- _waitingServers.add(-1)
- _unscheduledServers.add(1)
+ _serversPending.add(-1)
+ _schedulingAttemptsFailure.add(1)
- logger.warn("Failed to spawn $server: does not fit [${clock.millis()}]")
+ logger.warn { "Failed to spawn $server: does not fit [${clock.instant()}]" }
- server.state = ServerState.ERROR
+ server.state = ServerState.TERMINATED
continue
} else {
break
@@ -401,7 +403,8 @@ internal class ComputeServiceImpl(
// Remove request from queue
queue.poll()
- _waitingServers.add(-1)
+ _serversPending.add(-1)
+ _schedulingLatency.record(now - request.submitTime, server.attributes)
logger.info { "Assigned server $server to host $host." }
@@ -416,12 +419,17 @@ internal class ComputeServiceImpl(
server.host = host
host.spawn(server)
activeServers[server] = host
+
+ _serversActive.add(1)
+ _schedulingAttemptsSuccess.add(1)
} catch (e: Throwable) {
- logger.error("Failed to deploy VM", e)
+ logger.error(e) { "Failed to deploy VM" }
hv.instanceCount--
hv.provisionedCores -= server.flavor.cpuCount
hv.availableMemory += server.flavor.memorySize
+
+ _schedulingAttemptsError.add(1)
}
}
}
@@ -430,7 +438,7 @@ internal class ComputeServiceImpl(
/**
* A request to schedule an [InternalServer] onto one of the [Host]s.
*/
- internal data class SchedulingRequest(val server: InternalServer) {
+ internal data class SchedulingRequest(val server: InternalServer, val submitTime: Long) {
/**
* A flag to indicate that the request is cancelled.
*/
@@ -440,24 +448,22 @@ internal class ComputeServiceImpl(
override fun onStateChanged(host: Host, newState: HostState) {
when (newState) {
HostState.UP -> {
- logger.debug { "[${clock.millis()}] Host ${host.uid} state changed: $newState" }
+ logger.debug { "[${clock.instant()}] Host ${host.uid} state changed: $newState" }
val hv = hostToView[host]
if (hv != null) {
// Corner case for when the hypervisor already exists
availableHosts += hv
- _availableHostCount.add(1)
}
// Re-schedule on the new machine
requestSchedulingCycle()
}
HostState.DOWN -> {
- logger.debug { "[${clock.millis()}] Host ${host.uid} state changed: $newState" }
+ logger.debug { "[${clock.instant()}] Host ${host.uid} state changed: $newState" }
val hv = hostToView[host] ?: return
availableHosts -= hv
- _availableHostCount.add(-1)
requestSchedulingCycle()
}
@@ -475,14 +481,12 @@ internal class ComputeServiceImpl(
server.state = newState
- if (newState == ServerState.RUNNING) {
- _runningServers.add(1)
- } else if (newState == ServerState.TERMINATED || newState == ServerState.DELETED) {
- logger.info { "[${clock.millis()}] Server ${server.uid} ${server.name} ${server.flavor} finished." }
+ if (newState == ServerState.TERMINATED || newState == ServerState.DELETED) {
+ logger.info { "[${clock.instant()}] Server ${server.uid} ${server.name} ${server.flavor} finished." }
- activeServers -= server
- _runningServers.add(-1)
- _finishedServers.add(1)
+ if (activeServers.remove(server) != null) {
+ _serversActive.add(-1)
+ }
val hv = hostToView[host]
if (hv != null) {
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt
index d9d0f3fc..05a7e1bf 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt
@@ -22,6 +22,9 @@
package org.opendc.compute.service.internal
+import io.opentelemetry.api.common.AttributeKey
+import io.opentelemetry.api.common.Attributes
+import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
import mu.KotlinLogging
import org.opendc.compute.api.*
import org.opendc.compute.service.driver.Host
@@ -50,6 +53,21 @@ internal class InternalServer(
private val watchers = mutableListOf<ServerWatcher>()
/**
+ * The attributes of a server.
+ */
+ internal val attributes: Attributes = Attributes.builder()
+ .put(ResourceAttributes.HOST_NAME, name)
+ .put(ResourceAttributes.HOST_ID, uid.toString())
+ .put(ResourceAttributes.HOST_TYPE, flavor.name)
+ .put(AttributeKey.longKey("host.num_cpus"), flavor.cpuCount.toLong())
+ .put(AttributeKey.longKey("host.mem_capacity"), flavor.memorySize)
+ .put(AttributeKey.stringArrayKey("host.labels"), labels.map { (k, v) -> "$k:$v" })
+ .put(ResourceAttributes.HOST_ARCH, ResourceAttributes.HostArchValues.AMD64)
+ .put(ResourceAttributes.HOST_IMAGE_NAME, image.name)
+ .put(ResourceAttributes.HOST_IMAGE_ID, image.uid.toString())
+ .build()
+
+ /**
* The [Host] that has been assigned to host the server.
*/
internal var host: Host? = null
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt
index 0fd5b2a4..8c2d4715 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt
@@ -26,6 +26,8 @@ import org.opendc.compute.api.Server
import org.opendc.compute.service.internal.HostView
import org.opendc.compute.service.scheduler.filters.HostFilter
import org.opendc.compute.service.scheduler.weights.HostWeigher
+import java.util.*
+import kotlin.math.min
/**
* A [ComputeScheduler] implementation that uses filtering and weighing passes to select
@@ -33,13 +35,27 @@ import org.opendc.compute.service.scheduler.weights.HostWeigher
*
* This implementation is based on the filter scheduler from OpenStack Nova.
* See: https://docs.openstack.org/nova/latest/user/filter-scheduler.html
+ *
+ * @param filters The list of filters to apply when searching for an appropriate host.
+ * @param weighers The list of weighers to apply when searching for an appropriate host.
+ * @param subsetSize The size of the subset of best hosts from which a target is randomly chosen.
+ * @param random A [Random] instance for selecting
*/
-public class FilterScheduler(private val filters: List<HostFilter>, private val weighers: List<Pair<HostWeigher, Double>>) : ComputeScheduler {
+public class FilterScheduler(
+ private val filters: List<HostFilter>,
+ private val weighers: List<HostWeigher>,
+ private val subsetSize: Int = 1,
+ private val random: Random = Random(0)
+) : ComputeScheduler {
/**
* The pool of hosts available to the scheduler.
*/
private val hosts = mutableListOf<HostView>()
+ init {
+ require(subsetSize >= 1) { "Subset size must be one or greater" }
+ }
+
override fun addHost(host: HostView) {
hosts.add(host)
}
@@ -49,18 +65,44 @@ public class FilterScheduler(private val filters: List<HostFilter>, private val
}
override fun select(server: Server): HostView? {
- return hosts.asSequence()
- .filter { host ->
- for (filter in filters) {
- if (!filter.test(host, server))
- return@filter false
+ val hosts = hosts
+ val filteredHosts = hosts.filter { host -> filters.all { filter -> filter.test(host, server) } }
+
+ val subset = if (weighers.isNotEmpty()) {
+ val results = weighers.map { it.getWeights(filteredHosts, server) }
+ val weights = DoubleArray(filteredHosts.size)
+
+ for (result in results) {
+ val min = result.min
+ val range = (result.max - min)
+
+ // Skip result if all weights are the same
+ if (range == 0.0) {
+ continue
}
- true
- }
- .sortedByDescending { host ->
- weighers.sumByDouble { (weigher, factor) -> weigher.getWeight(host, server) * factor }
+ val multiplier = result.multiplier
+ val factor = multiplier / range
+
+ for ((i, weight) in result.weights.withIndex()) {
+ weights[i] += factor * (weight - min)
+ }
}
- .firstOrNull()
+
+ weights.indices
+ .asSequence()
+ .sortedByDescending { weights[it] }
+ .map { filteredHosts[it] }
+ .take(subsetSize)
+ .toList()
+ } else {
+ filteredHosts
+ }
+
+ return when (val maxSize = min(subsetSize, subset.size)) {
+ 0 -> null
+ 1 -> subset[0]
+ else -> subset[random.nextInt(maxSize)]
+ }
}
}
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt
new file mode 100644
index 00000000..a470a453
--- /dev/null
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.service.scheduler.filters
+
+import org.opendc.compute.api.Server
+import org.opendc.compute.service.internal.HostView
+
+/**
+ * A [HostFilter] that filters hosts based on the memory requirements of a [Server] and the RAM available on the host.
+ *
+ * @param allocationRatio Virtual RAM to physical RAM allocation ratio.
+ */
+public class RamFilter(private val allocationRatio: Double) : HostFilter {
+ override fun test(host: HostView, server: Server): Boolean {
+ val requested = server.flavor.memorySize
+ val available = host.availableMemory
+ val total = host.host.model.memorySize
+
+ // Do not allow an instance to overcommit against itself, only against
+ // other instances.
+ if (requested > total) {
+ return false
+ }
+
+ val limit = total * allocationRatio
+ val used = total - available
+ val usable = limit - used
+ return usable >= requested
+ }
+}
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/ComputeCapabilitiesFilter.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuFilter.kt
index 072440c5..abdd79f1 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/ComputeCapabilitiesFilter.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuFilter.kt
@@ -26,15 +26,22 @@ import org.opendc.compute.api.Server
import org.opendc.compute.service.internal.HostView
/**
- * A [HostFilter] that checks whether the capabilities provided by the host satisfies the requirements of the server
- * flavor.
+ * A [HostFilter] that filters hosts based on the vCPU requirements of a [Server] and the available vCPUs on the host.
+ *
+ * @param allocationRatio Virtual CPU to physical CPU allocation ratio.
*/
-public class ComputeCapabilitiesFilter : HostFilter {
+public class VCpuFilter(private val allocationRatio: Double) : HostFilter {
override fun test(host: HostView, server: Server): Boolean {
- val fitsMemory = host.availableMemory >= server.flavor.memorySize
- val fitsCpu = host.host.model.cpuCount >= server.flavor.cpuCount
- return fitsMemory && fitsCpu
- }
+ val requested = server.flavor.cpuCount
+ val total = host.host.model.cpuCount
+ val limit = total * allocationRatio
- override fun toString(): String = "ComputeCapabilitiesFilter"
+ // Do not allow an instance to overcommit against itself, only against other instances
+ if (requested > total) {
+ return false
+ }
+
+ val free = limit - host.provisionedCores
+ return free >= requested
+ }
}
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreMemoryWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreRamWeigher.kt
index 12e6510e..d668fdaf 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreMemoryWeigher.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreRamWeigher.kt
@@ -27,11 +27,15 @@ import org.opendc.compute.service.internal.HostView
/**
* A [HostWeigher] that weighs the hosts based on the available memory per core on the host.
+ *
+ * @param multiplier Weight multiplier ratio. A positive value will result in the scheduler preferring hosts with more
+ * available core memory, and a negative number will result in the scheduler preferring hosts with less available core
+ * memory.
*/
-public class CoreMemoryWeigher : HostWeigher {
+public class CoreRamWeigher(override val multiplier: Double = 1.0) : HostWeigher {
override fun getWeight(host: HostView, server: Server): Double {
return host.availableMemory.toDouble() / host.host.model.cpuCount
}
- override fun toString(): String = "CoreMemoryWeigher"
+ override fun toString(): String = "CoreRamWeigher"
}
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/HostWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/HostWeigher.kt
index d48ee9e0..aca8c4e6 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/HostWeigher.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/HostWeigher.kt
@@ -29,9 +29,47 @@ import org.opendc.compute.service.scheduler.FilterScheduler
/**
* An interface used by the [FilterScheduler] to weigh the pool of host for a scheduling request.
*/
-public fun interface HostWeigher {
+public interface HostWeigher {
+ /**
+ * The multiplier for the weigher.
+ */
+ public val multiplier: Double
+
/**
* Obtain the weight of the specified [host] when scheduling the specified [server].
*/
public fun getWeight(host: HostView, server: Server): Double
+
+ /**
+ * Obtain the weights for [hosts] when scheduling the specified [server].
+ */
+ public fun getWeights(hosts: List<HostView>, server: Server): Result {
+ val weights = DoubleArray(hosts.size)
+ var min = Double.MAX_VALUE
+ var max = Double.MIN_VALUE
+
+ for ((i, host) in hosts.withIndex()) {
+ val weight = getWeight(host, server)
+ weights[i] = weight
+ min = kotlin.math.min(min, weight)
+ max = kotlin.math.max(max, weight)
+ }
+
+ return Result(weights, min, max, multiplier)
+ }
+
+ /**
+ * A result returned by the weigher.
+ *
+ * @param weights The weights returned by the weigher.
+ * @param min The minimum weight returned.
+ * @param max The maximum weight returned.
+ * @param multiplier The weight multiplier to use.
+ */
+ public class Result(
+ public val weights: DoubleArray,
+ public val min: Double,
+ public val max: Double,
+ public val multiplier: Double,
+ )
}
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/InstanceCountWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/InstanceCountWeigher.kt
index 2ef733e5..732cbe03 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/InstanceCountWeigher.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/InstanceCountWeigher.kt
@@ -28,7 +28,7 @@ import org.opendc.compute.service.internal.HostView
/**
* A [HostWeigher] that weighs the hosts based on the number of instances on the host.
*/
-public class InstanceCountWeigher : HostWeigher {
+public class InstanceCountWeigher(override val multiplier: Double = 1.0) : HostWeigher {
override fun getWeight(host: HostView, server: Server): Double {
return host.instanceCount.toDouble()
}
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/MemoryWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RamWeigher.kt
index 115d8e4d..d18d31f4 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/MemoryWeigher.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RamWeigher.kt
@@ -26,12 +26,15 @@ import org.opendc.compute.api.Server
import org.opendc.compute.service.internal.HostView
/**
- * A [HostWeigher] that weighs the hosts based on the available memory on the host.
+ * A [HostWeigher] that weighs the hosts based on the available RAM (memory) on the host.
+ *
+ * @param multiplier Weight multiplier ratio. A positive value will result in the scheduler preferring hosts with more
+ * available memory, and a negative number will result in the scheduler preferring hosts with less memory.
*/
-public class MemoryWeigher : HostWeigher {
+public class RamWeigher(override val multiplier: Double = 1.0) : HostWeigher {
override fun getWeight(host: HostView, server: Server): Double {
return host.availableMemory.toDouble()
}
- override fun toString(): String = "MemoryWeigher"
+ override fun toString(): String = "RamWeigher"
}
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/ProvisionedCoresWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuWeigher.kt
index df5bcd6e..4a22269b 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/ProvisionedCoresWeigher.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuWeigher.kt
@@ -26,12 +26,19 @@ import org.opendc.compute.api.Server
import org.opendc.compute.service.internal.HostView
/**
- * A [HostWeigher] that weighs the hosts based on the number of provisioned cores on the host.
+ * A [HostWeigher] that weighs the hosts based on the remaining number of vCPUs available.
+ *
+ * @param allocationRatio Virtual CPU to physical CPU allocation ratio.
*/
-public class ProvisionedCoresWeigher : HostWeigher {
+public class VCpuWeigher(private val allocationRatio: Double, override val multiplier: Double = 1.0) : HostWeigher {
+
+ init {
+ require(allocationRatio > 0.0) { "Allocation ratio must be greater than zero" }
+ }
+
override fun getWeight(host: HostView, server: Server): Double {
- return host.provisionedCores.toDouble()
+ return host.host.model.cpuCount * allocationRatio - host.provisionedCores
}
- override fun toString(): String = "ProvisionedCoresWeigher"
+ override fun toString(): String = "VCpuWeigher"
}
diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
index a6258845..564f9493 100644
--- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
+++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
@@ -37,9 +37,10 @@ import org.opendc.compute.service.driver.HostListener
import org.opendc.compute.service.driver.HostModel
import org.opendc.compute.service.driver.HostState
import org.opendc.compute.service.scheduler.FilterScheduler
-import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter
import org.opendc.compute.service.scheduler.filters.ComputeFilter
-import org.opendc.compute.service.scheduler.weights.MemoryWeigher
+import org.opendc.compute.service.scheduler.filters.RamFilter
+import org.opendc.compute.service.scheduler.filters.VCpuFilter
+import org.opendc.compute.service.scheduler.weights.RamWeigher
import org.opendc.simulator.core.SimulationCoroutineScope
import org.opendc.simulator.core.runBlockingSimulation
import java.util.*
@@ -57,11 +58,10 @@ internal class ComputeServiceTest {
scope = SimulationCoroutineScope()
val clock = scope.clock
val computeScheduler = FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(MemoryWeigher() to -1.0)
+ filters = listOf(ComputeFilter(), VCpuFilter(allocationRatio = 1.0), RamFilter(allocationRatio = 1.0)),
+ weighers = listOf(RamWeigher())
)
- val meter = MeterProvider.noop().get("opendc-compute")
- service = ComputeService(scope.coroutineContext, clock, meter, computeScheduler)
+ service = ComputeService(scope.coroutineContext, clock, MeterProvider.noop(), computeScheduler)
}
@Test
@@ -167,9 +167,9 @@ internal class ComputeServiceTest {
val server = client.newServer("test", image, flavor, start = false)
server.start()
- delay(5 * 60 * 1000)
+ delay(5L * 60 * 1000)
server.refresh()
- assertEquals(ServerState.ERROR, server.state)
+ assertEquals(ServerState.TERMINATED, server.state)
}
@Test
@@ -180,9 +180,9 @@ internal class ComputeServiceTest {
val server = client.newServer("test", image, flavor, start = false)
server.start()
- delay(5 * 60 * 1000)
+ delay(5L * 60 * 1000)
server.refresh()
- assertEquals(ServerState.ERROR, server.state)
+ assertEquals(ServerState.TERMINATED, server.state)
}
@Test
@@ -193,9 +193,9 @@ internal class ComputeServiceTest {
val server = client.newServer("test", image, flavor, start = false)
server.start()
- delay(5 * 60 * 1000)
+ delay(5L * 60 * 1000)
server.refresh()
- assertEquals(ServerState.ERROR, server.state)
+ assertEquals(ServerState.TERMINATED, server.state)
}
@Test
@@ -207,7 +207,7 @@ internal class ComputeServiceTest {
server.start()
server.stop()
- delay(5 * 60 * 1000)
+ delay(5L * 60 * 1000)
server.refresh()
assertEquals(ServerState.TERMINATED, server.state)
}
@@ -228,7 +228,7 @@ internal class ComputeServiceTest {
val server = client.newServer("test", image, flavor, start = false)
server.start()
- delay(10 * 60 * 1000)
+ delay(10L * 60 * 1000)
server.refresh()
assertEquals(ServerState.PROVISIONING, server.state)
@@ -254,12 +254,12 @@ internal class ComputeServiceTest {
val server = client.newServer("test", image, flavor, start = false)
server.start()
- delay(5 * 60 * 1000)
+ delay(5L * 60 * 1000)
every { host.state } returns HostState.UP
listeners.forEach { it.onStateChanged(host, HostState.UP) }
- delay(5 * 60 * 1000)
+ delay(5L * 60 * 1000)
server.refresh()
assertEquals(ServerState.PROVISIONING, server.state)
@@ -284,13 +284,13 @@ internal class ComputeServiceTest {
val image = client.newImage("test")
val server = client.newServer("test", image, flavor, start = false)
- delay(5 * 60 * 1000)
+ delay(5L * 60 * 1000)
every { host.state } returns HostState.DOWN
listeners.forEach { it.onStateChanged(host, HostState.DOWN) }
server.start()
- delay(5 * 60 * 1000)
+ delay(5L * 60 * 1000)
server.refresh()
assertEquals(ServerState.PROVISIONING, server.state)
@@ -344,7 +344,7 @@ internal class ComputeServiceTest {
// Start server
server.start()
- delay(5 * 60 * 1000)
+ delay(5L * 60 * 1000)
coVerify { host.spawn(capture(slot), true) }
listeners.forEach { it.onStateChanged(host, slot.captured, ServerState.RUNNING) }
@@ -383,7 +383,7 @@ internal class ComputeServiceTest {
val server = client.newServer("test", image, flavor, start = false)
server.start()
- delay(5 * 60 * 1000)
+ delay(5L * 60 * 1000)
server.refresh()
assertEquals(ServerState.PROVISIONING, server.state)
diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt
index 20ea8d20..dfd3bc67 100644
--- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt
+++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt
@@ -47,8 +47,9 @@ class InternalServerTest {
fun testEquality() {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
+
val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
val b = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
@@ -59,8 +60,8 @@ class InternalServerTest {
fun testEqualityWithDifferentType() {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
val b = mockk<Server>(relaxUnitFun = true)
@@ -73,8 +74,8 @@ class InternalServerTest {
fun testInequalityWithDifferentType() {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
val b = mockk<Server>(relaxUnitFun = true)
@@ -87,8 +88,8 @@ class InternalServerTest {
fun testInequalityWithIncorrectType() {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
assertNotEquals(a, Unit)
@@ -98,11 +99,11 @@ class InternalServerTest {
fun testStartTerminatedServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
- every { service.schedule(any()) } answers { ComputeServiceImpl.SchedulingRequest(it.invocation.args[0] as InternalServer) }
+ every { service.schedule(any()) } answers { ComputeServiceImpl.SchedulingRequest(it.invocation.args[0] as InternalServer, 0) }
server.start()
@@ -114,8 +115,8 @@ class InternalServerTest {
fun testStartDeletedServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
server.state = ServerState.DELETED
@@ -127,8 +128,8 @@ class InternalServerTest {
fun testStartProvisioningServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
server.state = ServerState.PROVISIONING
@@ -142,8 +143,8 @@ class InternalServerTest {
fun testStartRunningServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
server.state = ServerState.RUNNING
@@ -157,10 +158,10 @@ class InternalServerTest {
fun testStopProvisioningServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
- val request = ComputeServiceImpl.SchedulingRequest(server)
+ val request = ComputeServiceImpl.SchedulingRequest(server, 0)
every { service.schedule(any()) } returns request
@@ -175,8 +176,8 @@ class InternalServerTest {
fun testStopTerminatedServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
server.state = ServerState.TERMINATED
@@ -189,8 +190,8 @@ class InternalServerTest {
fun testStopDeletedServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
server.state = ServerState.DELETED
@@ -203,8 +204,8 @@ class InternalServerTest {
fun testStopRunningServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
val host = mockk<Host>(relaxUnitFun = true)
@@ -220,10 +221,10 @@ class InternalServerTest {
fun testDeleteProvisioningServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>(relaxUnitFun = true)
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
- val request = ComputeServiceImpl.SchedulingRequest(server)
+ val request = ComputeServiceImpl.SchedulingRequest(server, 0)
every { service.schedule(any()) } returns request
@@ -239,8 +240,8 @@ class InternalServerTest {
fun testDeleteTerminatedServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>(relaxUnitFun = true)
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
server.state = ServerState.TERMINATED
@@ -255,8 +256,8 @@ class InternalServerTest {
fun testDeleteDeletedServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>(relaxUnitFun = true)
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
server.state = ServerState.DELETED
@@ -269,8 +270,8 @@ class InternalServerTest {
fun testDeleteRunningServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>(relaxUnitFun = true)
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
val host = mockk<Host>(relaxUnitFun = true)
@@ -282,4 +283,20 @@ class InternalServerTest {
coVerify { host.delete(server) }
verify { service.delete(server) }
}
+
+ private fun mockFlavor(): InternalFlavor {
+ val flavor = mockk<InternalFlavor>()
+ every { flavor.name } returns "c5.large"
+ every { flavor.uid } returns UUID.randomUUID()
+ every { flavor.cpuCount } returns 2
+ every { flavor.memorySize } returns 4096
+ return flavor
+ }
+
+ private fun mockImage(): InternalImage {
+ val image = mockk<InternalImage>()
+ every { image.name } returns "ubuntu-20.04"
+ every { image.uid } returns UUID.randomUUID()
+ return image
+ }
}
diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt
new file mode 100644
index 00000000..cafd4498
--- /dev/null
+++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt
@@ -0,0 +1,407 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.service.scheduler
+
+import io.mockk.every
+import io.mockk.mockk
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.assertNull
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertAll
+import org.junit.jupiter.api.assertThrows
+import org.opendc.compute.api.Server
+import org.opendc.compute.service.driver.HostModel
+import org.opendc.compute.service.driver.HostState
+import org.opendc.compute.service.internal.HostView
+import org.opendc.compute.service.scheduler.filters.ComputeFilter
+import org.opendc.compute.service.scheduler.filters.InstanceCountFilter
+import org.opendc.compute.service.scheduler.filters.RamFilter
+import org.opendc.compute.service.scheduler.filters.VCpuFilter
+import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
+import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher
+import org.opendc.compute.service.scheduler.weights.RamWeigher
+import org.opendc.compute.service.scheduler.weights.VCpuWeigher
+import java.util.*
+
+/**
+ * Test suite for the [FilterScheduler].
+ */
+internal class FilterSchedulerTest {
+ @Test
+ fun testInvalidSubsetSize() {
+ assertThrows<IllegalArgumentException> {
+ FilterScheduler(
+ filters = emptyList(),
+ weighers = emptyList(),
+ subsetSize = 0
+ )
+ }
+
+ assertThrows<IllegalArgumentException> {
+ FilterScheduler(
+ filters = emptyList(),
+ weighers = emptyList(),
+ subsetSize = -2
+ )
+ }
+ }
+
+ @Test
+ fun testNoHosts() {
+ val scheduler = FilterScheduler(
+ filters = emptyList(),
+ weighers = emptyList(),
+ )
+
+ val server = mockk<Server>()
+ every { server.flavor.cpuCount } returns 2
+ every { server.flavor.memorySize } returns 1024
+
+ assertNull(scheduler.select(server))
+ }
+
+ @Test
+ fun testNoFiltersAndSchedulers() {
+ val scheduler = FilterScheduler(
+ filters = emptyList(),
+ weighers = emptyList(),
+ )
+
+ val hostA = mockk<HostView>()
+ every { hostA.host.state } returns HostState.DOWN
+
+ val hostB = mockk<HostView>()
+ every { hostB.host.state } returns HostState.UP
+
+ scheduler.addHost(hostA)
+ scheduler.addHost(hostB)
+
+ val server = mockk<Server>()
+ every { server.flavor.cpuCount } returns 2
+ every { server.flavor.memorySize } returns 1024
+
+ // Make sure we get the first host both times
+ assertAll(
+ { assertEquals(hostA, scheduler.select(server)) },
+ { assertEquals(hostA, scheduler.select(server)) }
+ )
+ }
+
+ @Test
+ fun testNoFiltersAndSchedulersRandom() {
+ val scheduler = FilterScheduler(
+ filters = emptyList(),
+ weighers = emptyList(),
+ subsetSize = Int.MAX_VALUE,
+ random = Random(1)
+ )
+
+ val hostA = mockk<HostView>()
+ every { hostA.host.state } returns HostState.DOWN
+
+ val hostB = mockk<HostView>()
+ every { hostB.host.state } returns HostState.UP
+
+ scheduler.addHost(hostA)
+ scheduler.addHost(hostB)
+
+ val server = mockk<Server>()
+ every { server.flavor.cpuCount } returns 2
+ every { server.flavor.memorySize } returns 1024
+
+ // Make sure we get the first host both times
+ assertAll(
+ { assertEquals(hostB, scheduler.select(server)) },
+ { assertEquals(hostA, scheduler.select(server)) }
+ )
+ }
+
+ @Test
+ fun testHostIsDown() {
+ val scheduler = FilterScheduler(
+ filters = listOf(ComputeFilter()),
+ weighers = emptyList(),
+ )
+
+ val host = mockk<HostView>()
+ every { host.host.state } returns HostState.DOWN
+
+ scheduler.addHost(host)
+
+ val server = mockk<Server>()
+ every { server.flavor.cpuCount } returns 2
+ every { server.flavor.memorySize } returns 1024
+
+ assertNull(scheduler.select(server))
+ }
+
+ @Test
+ fun testHostIsUp() {
+ val scheduler = FilterScheduler(
+ filters = listOf(ComputeFilter()),
+ weighers = emptyList(),
+ )
+
+ val host = mockk<HostView>()
+ every { host.host.state } returns HostState.UP
+
+ scheduler.addHost(host)
+
+ val server = mockk<Server>()
+ every { server.flavor.cpuCount } returns 2
+ every { server.flavor.memorySize } returns 1024
+
+ assertEquals(host, scheduler.select(server))
+ }
+
+ @Test
+ fun testRamFilter() {
+ val scheduler = FilterScheduler(
+ filters = listOf(RamFilter(1.0)),
+ weighers = emptyList(),
+ )
+
+ val hostA = mockk<HostView>()
+ every { hostA.host.state } returns HostState.UP
+ every { hostA.host.model } returns HostModel(4, 2048)
+ every { hostA.availableMemory } returns 512
+
+ val hostB = mockk<HostView>()
+ every { hostB.host.state } returns HostState.UP
+ every { hostB.host.model } returns HostModel(4, 2048)
+ every { hostB.availableMemory } returns 2048
+
+ scheduler.addHost(hostA)
+ scheduler.addHost(hostB)
+
+ val server = mockk<Server>()
+ every { server.flavor.cpuCount } returns 2
+ every { server.flavor.memorySize } returns 1024
+
+ assertEquals(hostB, scheduler.select(server))
+ }
+
+ @Test
+ fun testRamFilterOvercommit() {
+ val scheduler = FilterScheduler(
+ filters = listOf(RamFilter(1.5)),
+ weighers = emptyList(),
+ )
+
+ val host = mockk<HostView>()
+ every { host.host.state } returns HostState.UP
+ every { host.host.model } returns HostModel(4, 2048)
+ every { host.availableMemory } returns 2048
+
+ scheduler.addHost(host)
+
+ val server = mockk<Server>()
+ every { server.flavor.cpuCount } returns 2
+ every { server.flavor.memorySize } returns 2300
+
+ assertNull(scheduler.select(server))
+ }
+
+ @Test
+ fun testVCpuFilter() {
+ val scheduler = FilterScheduler(
+ filters = listOf(VCpuFilter(1.0)),
+ weighers = emptyList(),
+ )
+
+ val hostA = mockk<HostView>()
+ every { hostA.host.state } returns HostState.UP
+ every { hostA.host.model } returns HostModel(4, 2048)
+ every { hostA.provisionedCores } returns 3
+
+ val hostB = mockk<HostView>()
+ every { hostB.host.state } returns HostState.UP
+ every { hostB.host.model } returns HostModel(4, 2048)
+ every { hostB.provisionedCores } returns 0
+
+ scheduler.addHost(hostA)
+ scheduler.addHost(hostB)
+
+ val server = mockk<Server>()
+ every { server.flavor.cpuCount } returns 2
+ every { server.flavor.memorySize } returns 1024
+
+ assertEquals(hostB, scheduler.select(server))
+ }
+
+ @Test
+ fun testVCpuFilterOvercommit() {
+ val scheduler = FilterScheduler(
+ filters = listOf(VCpuFilter(16.0)),
+ weighers = emptyList(),
+ )
+
+ val host = mockk<HostView>()
+ every { host.host.state } returns HostState.UP
+ every { host.host.model } returns HostModel(4, 2048)
+ every { host.provisionedCores } returns 0
+
+ scheduler.addHost(host)
+
+ val server = mockk<Server>()
+ every { server.flavor.cpuCount } returns 8
+ every { server.flavor.memorySize } returns 1024
+
+ assertNull(scheduler.select(server))
+ }
+
+ @Test
+ fun testInstanceCountFilter() {
+ val scheduler = FilterScheduler(
+ filters = listOf(InstanceCountFilter(limit = 2)),
+ weighers = emptyList(),
+ )
+
+ val hostA = mockk<HostView>()
+ every { hostA.host.state } returns HostState.UP
+ every { hostA.host.model } returns HostModel(4, 2048)
+ every { hostA.instanceCount } returns 2
+
+ val hostB = mockk<HostView>()
+ every { hostB.host.state } returns HostState.UP
+ every { hostB.host.model } returns HostModel(4, 2048)
+ every { hostB.instanceCount } returns 0
+
+ scheduler.addHost(hostA)
+ scheduler.addHost(hostB)
+
+ val server = mockk<Server>()
+ every { server.flavor.cpuCount } returns 2
+ every { server.flavor.memorySize } returns 1024
+
+ assertEquals(hostB, scheduler.select(server))
+ }
+
+ @Test
+ fun testRamWeigher() {
+ val scheduler = FilterScheduler(
+ filters = emptyList(),
+ weighers = listOf(RamWeigher(1.5)),
+ )
+
+ val hostA = mockk<HostView>()
+ every { hostA.host.state } returns HostState.UP
+ every { hostA.host.model } returns HostModel(4, 2048)
+ every { hostA.availableMemory } returns 1024
+
+ val hostB = mockk<HostView>()
+ every { hostB.host.state } returns HostState.UP
+ every { hostB.host.model } returns HostModel(4, 2048)
+ every { hostB.availableMemory } returns 512
+
+ scheduler.addHost(hostA)
+ scheduler.addHost(hostB)
+
+ val server = mockk<Server>()
+ every { server.flavor.cpuCount } returns 2
+ every { server.flavor.memorySize } returns 1024
+
+ assertEquals(hostA, scheduler.select(server))
+ }
+
+ @Test
+ fun testCoreRamWeigher() {
+ val scheduler = FilterScheduler(
+ filters = emptyList(),
+ weighers = listOf(CoreRamWeigher(1.5)),
+ )
+
+ val hostA = mockk<HostView>()
+ every { hostA.host.state } returns HostState.UP
+ every { hostA.host.model } returns HostModel(12, 2048)
+ every { hostA.availableMemory } returns 1024
+
+ val hostB = mockk<HostView>()
+ every { hostB.host.state } returns HostState.UP
+ every { hostB.host.model } returns HostModel(4, 2048)
+ every { hostB.availableMemory } returns 512
+
+ scheduler.addHost(hostA)
+ scheduler.addHost(hostB)
+
+ val server = mockk<Server>()
+ every { server.flavor.cpuCount } returns 2
+ every { server.flavor.memorySize } returns 1024
+
+ assertEquals(hostB, scheduler.select(server))
+ }
+
+ @Test
+ fun testVCpuWeigher() {
+ val scheduler = FilterScheduler(
+ filters = emptyList(),
+ weighers = listOf(VCpuWeigher(16.0)),
+ )
+
+ val hostA = mockk<HostView>()
+ every { hostA.host.state } returns HostState.UP
+ every { hostA.host.model } returns HostModel(4, 2048)
+ every { hostA.provisionedCores } returns 2
+
+ val hostB = mockk<HostView>()
+ every { hostB.host.state } returns HostState.UP
+ every { hostB.host.model } returns HostModel(4, 2048)
+ every { hostB.provisionedCores } returns 0
+
+ scheduler.addHost(hostA)
+ scheduler.addHost(hostB)
+
+ val server = mockk<Server>()
+ every { server.flavor.cpuCount } returns 2
+ every { server.flavor.memorySize } returns 1024
+
+ assertEquals(hostB, scheduler.select(server))
+ }
+
+ @Test
+ fun testInstanceCountWeigher() {
+ val scheduler = FilterScheduler(
+ filters = emptyList(),
+ weighers = listOf(InstanceCountWeigher(multiplier = -1.0)),
+ )
+
+ val hostA = mockk<HostView>()
+ every { hostA.host.state } returns HostState.UP
+ every { hostA.host.model } returns HostModel(4, 2048)
+ every { hostA.instanceCount } returns 2
+
+ val hostB = mockk<HostView>()
+ every { hostB.host.state } returns HostState.UP
+ every { hostB.host.model } returns HostModel(4, 2048)
+ every { hostB.instanceCount } returns 0
+
+ scheduler.addHost(hostA)
+ scheduler.addHost(hostB)
+
+ val server = mockk<Server>()
+ every { server.flavor.cpuCount } returns 2
+ every { server.flavor.memorySize } returns 1024
+
+ assertEquals(hostB, scheduler.select(server))
+ }
+}
diff --git a/opendc-compute/opendc-compute-simulator/build.gradle.kts b/opendc-compute/opendc-compute-simulator/build.gradle.kts
index b31a2114..aaf69f78 100644
--- a/opendc-compute/opendc-compute-simulator/build.gradle.kts
+++ b/opendc-compute/opendc-compute-simulator/build.gradle.kts
@@ -33,11 +33,13 @@ dependencies {
api(platform(projects.opendcPlatform))
api(projects.opendcCompute.opendcComputeService)
api(projects.opendcSimulator.opendcSimulatorCompute)
- api(projects.opendcSimulator.opendcSimulatorFailures)
+ api(libs.commons.math3)
implementation(projects.opendcUtils)
+ implementation(libs.opentelemetry.semconv)
implementation(libs.kotlin.logging)
testImplementation(projects.opendcSimulator.opendcSimulatorCore)
testImplementation(projects.opendcTelemetry.opendcTelemetrySdk)
+ testImplementation(projects.opendcTelemetry.opendcTelemetryCompute)
testRuntimeOnly(libs.slf4j.simple)
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index 68667a8c..b9d02185 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -22,29 +22,34 @@
package org.opendc.compute.simulator
+import io.opentelemetry.api.common.AttributeKey
+import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.metrics.Meter
-import io.opentelemetry.api.metrics.common.Labels
+import io.opentelemetry.api.metrics.MeterProvider
+import io.opentelemetry.api.metrics.ObservableDoubleMeasurement
+import io.opentelemetry.api.metrics.ObservableLongMeasurement
import kotlinx.coroutines.*
import mu.KotlinLogging
import org.opendc.compute.api.Flavor
import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
import org.opendc.compute.service.driver.*
+import org.opendc.compute.simulator.internal.Guest
+import org.opendc.compute.simulator.internal.GuestListener
import org.opendc.simulator.compute.*
-import org.opendc.simulator.compute.cpufreq.PerformanceScalingGovernor
-import org.opendc.simulator.compute.cpufreq.ScalingDriver
-import org.opendc.simulator.compute.cpufreq.ScalingGovernor
-import org.opendc.simulator.compute.cpufreq.SimpleScalingDriver
-import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL
-import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
+import org.opendc.simulator.compute.kernel.SimHypervisor
+import org.opendc.simulator.compute.kernel.SimHypervisorProvider
+import org.opendc.simulator.compute.kernel.cpufreq.PerformanceScalingGovernor
+import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor
+import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain
+import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.power.ConstantPowerModel
-import org.opendc.simulator.compute.power.PowerModel
-import org.opendc.simulator.failures.FailureDomain
-import java.time.Clock
+import org.opendc.simulator.compute.power.PowerDriver
+import org.opendc.simulator.compute.power.SimplePowerDriver
+import org.opendc.simulator.flow.FlowEngine
import java.util.*
import kotlin.coroutines.CoroutineContext
-import kotlin.coroutines.resume
/**
* A [Host] that is simulates virtual machines on a physical machine using [SimHypervisor].
@@ -52,34 +57,27 @@ import kotlin.coroutines.resume
public class SimHost(
override val uid: UUID,
override val name: String,
- model: SimMachineModel,
+ model: MachineModel,
override val meta: Map<String, Any>,
context: CoroutineContext,
- clock: Clock,
- meter: Meter,
- hypervisor: SimHypervisorProvider,
- scalingGovernor: ScalingGovernor,
- scalingDriver: ScalingDriver,
+ engine: FlowEngine,
+ meterProvider: MeterProvider,
+ hypervisorProvider: SimHypervisorProvider,
+ scalingGovernor: ScalingGovernor = PerformanceScalingGovernor(),
+ powerDriver: PowerDriver = SimplePowerDriver(ConstantPowerModel(0.0)),
private val mapper: SimWorkloadMapper = SimMetaWorkloadMapper(),
-) : Host, FailureDomain, AutoCloseable {
-
- public constructor(
- uid: UUID,
- name: String,
- model: SimMachineModel,
- meta: Map<String, Any>,
- context: CoroutineContext,
- clock: Clock,
- meter: Meter,
- hypervisor: SimHypervisorProvider,
- powerModel: PowerModel = ConstantPowerModel(0.0),
- mapper: SimWorkloadMapper = SimMetaWorkloadMapper(),
- ) : this(uid, name, model, meta, context, clock, meter, hypervisor, PerformanceScalingGovernor(), SimpleScalingDriver(powerModel), mapper)
-
+ interferenceDomain: VmInterferenceDomain? = null,
+ private val optimize: Boolean = false
+) : Host, AutoCloseable {
/**
* The [CoroutineScope] of the host bounded by the lifecycle of the host.
*/
- override val scope: CoroutineScope = CoroutineScope(context + Job())
+ private val scope: CoroutineScope = CoroutineScope(context + Job())
+
+ /**
+ * The clock instance used by the host.
+ */
+ private val clock = engine.clock
/**
* The logger instance of this server.
@@ -87,52 +85,31 @@ public class SimHost(
private val logger = KotlinLogging.logger {}
/**
- * The event listeners registered with this host.
+ * The [Meter] to track metrics of the simulated host.
*/
- private val listeners = mutableListOf<HostListener>()
+ private val meter = meterProvider.get("org.opendc.compute.simulator")
/**
- * Current total memory use of the images on this hypervisor.
+ * The event listeners registered with this host.
*/
- private var availableMemory: Long = model.memory.map { it.size }.sum()
+ private val listeners = mutableListOf<HostListener>()
/**
* The machine to run on.
*/
- public val machine: SimBareMetalMachine = SimBareMetalMachine(context, clock, model, scalingGovernor, scalingDriver)
+ public val machine: SimBareMetalMachine = SimBareMetalMachine(engine, model.optimize(), powerDriver)
/**
* The hypervisor to run multiple workloads.
*/
- public val hypervisor: SimHypervisor = hypervisor.create(
- scope.coroutineContext, clock,
- object : SimHypervisor.Listener {
- override fun onSliceFinish(
- hypervisor: SimHypervisor,
- requestedWork: Long,
- grantedWork: Long,
- overcommittedWork: Long,
- interferedWork: Long,
- cpuUsage: Double,
- cpuDemand: Double
- ) {
-
- _batch.put(_cpuWork, requestedWork.toDouble())
- _batch.put(_cpuWorkGranted, grantedWork.toDouble())
- _batch.put(_cpuWorkOvercommit, overcommittedWork.toDouble())
- _batch.put(_cpuWorkInterference, interferedWork.toDouble())
- _batch.put(_cpuUsage, cpuUsage)
- _batch.put(_cpuDemand, cpuDemand)
- _batch.put(_cpuPower, machine.powerDraw)
- _batch.record()
- }
- }
- )
+ private val hypervisor: SimHypervisor = hypervisorProvider
+ .create(engine, scalingGovernor = scalingGovernor, interferenceDomain = interferenceDomain)
/**
* The virtual machines running on the hypervisor.
*/
private val guests = HashMap<Server, Guest>()
+ private val _guests = mutableListOf<Guest>()
override val state: HostState
get() = _state
@@ -144,123 +121,99 @@ public class SimHost(
field = value
}
- override val model: HostModel = HostModel(model.cpus.size, model.memory.map { it.size }.sum())
-
- /**
- * The number of guests on the host.
- */
- private val _guests = meter.longUpDownCounterBuilder("guests.total")
- .setDescription("Number of guests")
- .setUnit("1")
- .build()
- .bind(Labels.of("host", uid.toString()))
-
- /**
- * The number of active guests on the host.
- */
- private val _activeGuests = meter.longUpDownCounterBuilder("guests.active")
- .setDescription("Number of active guests")
- .setUnit("1")
- .build()
- .bind(Labels.of("host", uid.toString()))
-
- /**
- * The CPU usage on the host.
- */
- private val _cpuUsage = meter.doubleValueRecorderBuilder("cpu.usage")
- .setDescription("The amount of CPU resources used by the host")
- .setUnit("MHz")
- .build()
-
- /**
- * The CPU demand on the host.
- */
- private val _cpuDemand = meter.doubleValueRecorderBuilder("cpu.demand")
- .setDescription("The amount of CPU resources the guests would use if there were no CPU contention or CPU limits")
- .setUnit("MHz")
- .build()
-
- /**
- * The requested work for the CPU.
- */
- private val _cpuPower = meter.doubleValueRecorderBuilder("power.usage")
- .setDescription("The amount of power used by the CPU")
- .setUnit("W")
- .build()
-
- /**
- * The requested work for the CPU.
- */
- private val _cpuWork = meter.doubleValueRecorderBuilder("cpu.work.total")
- .setDescription("The amount of work supplied to the CPU")
- .setUnit("1")
- .build()
-
- /**
- * The work actually performed by the CPU.
- */
- private val _cpuWorkGranted = meter.doubleValueRecorderBuilder("cpu.work.granted")
- .setDescription("The amount of work performed by the CPU")
- .setUnit("1")
- .build()
+ override val model: HostModel = HostModel(model.cpus.size, model.memory.sumOf { it.size })
/**
- * The work that could not be performed by the CPU due to overcommitting resource.
+ * The [GuestListener] that listens for guest events.
*/
- private val _cpuWorkOvercommit = meter.doubleValueRecorderBuilder("cpu.work.overcommit")
- .setDescription("The amount of work not performed by the CPU due to overcommitment")
- .setUnit("1")
- .build()
+ private val guestListener = object : GuestListener {
+ override fun onStart(guest: Guest) {
+ listeners.forEach { it.onStateChanged(this@SimHost, guest.server, guest.state) }
+ }
- /**
- * The work that could not be performed by the CPU due to interference.
- */
- private val _cpuWorkInterference = meter.doubleValueRecorderBuilder("cpu.work.interference")
- .setDescription("The amount of work not performed by the CPU due to interference")
- .setUnit("1")
- .build()
+ override fun onStop(guest: Guest) {
+ listeners.forEach { it.onStateChanged(this@SimHost, guest.server, guest.state) }
+ }
+ }
/**
- * The batch recorder used to record multiple metrics atomically.
+ * The [Job] that represents the machine running the hypervisor.
*/
- private val _batch = meter.newBatchRecorder("host", uid.toString())
+ private var _job: Job? = null
init {
- // Launch hypervisor onto machine
- scope.launch {
- try {
- _state = HostState.UP
- machine.run(this@SimHost.hypervisor, emptyMap())
- } catch (_: CancellationException) {
- // Ignored
- } catch (cause: Throwable) {
- logger.error(cause) { "Host failed" }
- throw cause
- } finally {
- _state = HostState.DOWN
- }
- }
+ launch()
+
+ meter.upDownCounterBuilder("system.guests")
+ .setDescription("Number of guests on this host")
+ .setUnit("1")
+ .buildWithCallback(::collectGuests)
+ meter.gaugeBuilder("system.cpu.limit")
+ .setDescription("Amount of CPU resources available to the host")
+ .buildWithCallback(::collectCpuLimit)
+ meter.gaugeBuilder("system.cpu.demand")
+ .setDescription("Amount of CPU resources the guests would use if there were no CPU contention or CPU limits")
+ .setUnit("MHz")
+ .buildWithCallback { result -> result.observe(hypervisor.cpuDemand) }
+ meter.gaugeBuilder("system.cpu.usage")
+ .setDescription("Amount of CPU resources used by the host")
+ .setUnit("MHz")
+ .buildWithCallback { result -> result.observe(hypervisor.cpuUsage) }
+ meter.gaugeBuilder("system.cpu.utilization")
+ .setDescription("Utilization of the CPU resources of the host")
+ .setUnit("%")
+ .buildWithCallback { result -> result.observe(hypervisor.cpuUsage / _cpuLimit) }
+ meter.counterBuilder("system.cpu.time")
+ .setDescription("Amount of CPU time spent by the host")
+ .setUnit("s")
+ .buildWithCallback(::collectCpuTime)
+ meter.gaugeBuilder("system.power.usage")
+ .setDescription("Power usage of the host ")
+ .setUnit("W")
+ .buildWithCallback { result -> result.observe(machine.powerUsage) }
+ meter.counterBuilder("system.power.total")
+ .setDescription("Amount of energy used by the CPU")
+ .setUnit("J")
+ .ofDoubles()
+ .buildWithCallback { result -> result.observe(machine.energyUsage) }
+ meter.counterBuilder("system.time")
+ .setDescription("The uptime of the host")
+ .setUnit("s")
+ .buildWithCallback(::collectUptime)
+ meter.gaugeBuilder("system.time.boot")
+ .setDescription("The boot time of the host")
+ .setUnit("1")
+ .ofLongs()
+ .buildWithCallback(::collectBootTime)
}
override fun canFit(server: Server): Boolean {
- val sufficientMemory = availableMemory > server.flavor.memorySize
- val enoughCpus = machine.model.cpus.size >= server.flavor.cpuCount
+ val sufficientMemory = model.memorySize >= server.flavor.memorySize
+ val enoughCpus = model.cpuCount >= server.flavor.cpuCount
val canFit = hypervisor.canFit(server.flavor.toMachineModel())
return sufficientMemory && enoughCpus && canFit
}
override suspend fun spawn(server: Server, start: Boolean) {
- // Return if the server already exists on this host
- if (server in this) {
- return
+ val guest = guests.computeIfAbsent(server) { key ->
+ require(canFit(key)) { "Server does not fit" }
+
+ val machine = hypervisor.createMachine(key.flavor.toMachineModel(), key.name)
+ val newGuest = Guest(
+ scope.coroutineContext,
+ clock,
+ this,
+ mapper,
+ guestListener,
+ server,
+ machine
+ )
+
+ _guests.add(newGuest)
+ newGuest
}
- require(canFit(server)) { "Server does not fit" }
- val guest = Guest(server, hypervisor.createMachine(server.flavor.toMachineModel()))
- guests[server] = guest
- _guests.add(1)
-
if (start) {
guest.start()
}
@@ -281,9 +234,8 @@ public class SimHost(
}
override suspend fun delete(server: Server) {
- val guest = guests.remove(server) ?: return
- guest.terminate()
- _guests.add(-1)
+ val guest = guests[server] ?: return
+ guest.delete()
}
override fun addListener(listener: HostListener) {
@@ -295,130 +247,233 @@ public class SimHost(
}
override fun close() {
+ reset()
scope.cancel()
machine.close()
}
override fun toString(): String = "SimHost[uid=$uid,name=$name,model=$model]"
+ public suspend fun fail() {
+ reset()
+
+ for (guest in _guests) {
+ guest.fail()
+ }
+ }
+
+ public suspend fun recover() {
+ updateUptime()
+
+ launch()
+
+ // Wait for the hypervisor to launch before recovering the guests
+ yield()
+
+ for (guest in _guests) {
+ guest.recover()
+ }
+ }
+
+ /**
+ * Launch the hypervisor.
+ */
+ private fun launch() {
+ check(_job == null) { "Concurrent hypervisor running" }
+
+ // Launch hypervisor onto machine
+ _job = scope.launch {
+ try {
+ _bootTime = clock.millis()
+ _state = HostState.UP
+ machine.run(hypervisor, emptyMap())
+ } catch (_: CancellationException) {
+ // Ignored
+ } catch (cause: Throwable) {
+ logger.error(cause) { "Host failed" }
+ throw cause
+ } finally {
+ _state = HostState.DOWN
+ }
+ }
+ }
+
+ /**
+ * Reset the machine.
+ */
+ private fun reset() {
+ updateUptime()
+
+ // Stop the hypervisor
+ val job = _job
+ if (job != null) {
+ job.cancel()
+ _job = null
+ }
+
+ _state = HostState.DOWN
+ }
+
/**
* Convert flavor to machine model.
*/
- private fun Flavor.toMachineModel(): SimMachineModel {
+ private fun Flavor.toMachineModel(): MachineModel {
val originalCpu = machine.model.cpus[0]
val processingNode = originalCpu.node.copy(coreCount = cpuCount)
val processingUnits = (0 until cpuCount).map { originalCpu.copy(id = it, node = processingNode) }
val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize))
- return SimMachineModel(processingUnits, memoryUnits)
+ return MachineModel(processingUnits, memoryUnits).optimize()
}
- private fun onGuestStart(vm: Guest) {
- guests.forEach { (_, guest) ->
- if (guest.state == ServerState.RUNNING) {
- vm.performanceInterferenceModel?.onStart(vm.server.image.name)
- }
+ /**
+ * Optimize the [MachineModel] for simulation.
+ */
+ private fun MachineModel.optimize(): MachineModel {
+ if (!optimize) {
+ return this
}
- _activeGuests.add(1)
- listeners.forEach { it.onStateChanged(this, vm.server, vm.state) }
+ val originalCpu = cpus[0]
+ val freq = cpus.sumOf { it.frequency }
+ val processingNode = originalCpu.node.copy(coreCount = 1)
+ val processingUnits = listOf(originalCpu.copy(frequency = freq, node = processingNode))
+
+ val memorySize = memory.sumOf { it.size }
+ val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize))
+
+ return MachineModel(processingUnits, memoryUnits)
}
- private fun onGuestStop(vm: Guest) {
- guests.forEach { (_, guest) ->
- if (guest.state == ServerState.RUNNING) {
- vm.performanceInterferenceModel?.onStop(vm.server.image.name)
+ private val STATE_KEY = AttributeKey.stringKey("state")
+
+ private val terminatedState = Attributes.of(STATE_KEY, "terminated")
+ private val runningState = Attributes.of(STATE_KEY, "running")
+ private val errorState = Attributes.of(STATE_KEY, "error")
+ private val invalidState = Attributes.of(STATE_KEY, "invalid")
+
+ /**
+ * Helper function to collect the guest counts on this host.
+ */
+ private fun collectGuests(result: ObservableLongMeasurement) {
+ var terminated = 0L
+ var running = 0L
+ var error = 0L
+ var invalid = 0L
+
+ val guests = _guests.listIterator()
+ for (guest in guests) {
+ when (guest.state) {
+ ServerState.TERMINATED -> terminated++
+ ServerState.RUNNING -> running++
+ ServerState.ERROR -> error++
+ ServerState.DELETED -> {
+ // Remove guests that have been deleted
+ this.guests.remove(guest.server)
+ guests.remove()
+ }
+ else -> invalid++
}
}
- _activeGuests.add(-1)
- listeners.forEach { it.onStateChanged(this, vm.server, vm.state) }
+ result.observe(terminated, terminatedState)
+ result.observe(running, runningState)
+ result.observe(error, errorState)
+ result.observe(invalid, invalidState)
}
- override suspend fun fail() {
- _state = HostState.DOWN
- }
+ private val _cpuLimit = machine.model.cpus.sumOf { it.frequency }
+
+ /**
+ * Helper function to collect the CPU limits of a machine.
+ */
+ private fun collectCpuLimit(result: ObservableDoubleMeasurement) {
+ result.observe(_cpuLimit)
- override suspend fun recover() {
- _state = HostState.UP
+ val guests = _guests
+ for (i in guests.indices) {
+ guests[i].collectCpuLimit(result)
+ }
}
+ private val _activeState = Attributes.of(STATE_KEY, "active")
+ private val _stealState = Attributes.of(STATE_KEY, "steal")
+ private val _lostState = Attributes.of(STATE_KEY, "lost")
+ private val _idleState = Attributes.of(STATE_KEY, "idle")
+
/**
- * A virtual machine instance that the driver manages.
+ * Helper function to track the CPU time of a machine.
*/
- private inner class Guest(val server: Server, val machine: SimMachine) {
- val performanceInterferenceModel: PerformanceInterferenceModel? = server.meta[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel?
+ private fun collectCpuTime(result: ObservableLongMeasurement) {
+ val counters = hypervisor.counters
- var state: ServerState = ServerState.TERMINATED
+ result.observe(counters.cpuActiveTime / 1000L, _activeState)
+ result.observe(counters.cpuIdleTime / 1000L, _idleState)
+ result.observe(counters.cpuStealTime / 1000L, _stealState)
+ result.observe(counters.cpuLostTime / 1000L, _lostState)
- suspend fun start() {
- when (state) {
- ServerState.TERMINATED -> {
- logger.info { "User requested to start server ${server.uid}" }
- launch()
- }
- ServerState.RUNNING -> return
- ServerState.DELETED -> {
- logger.warn { "User tried to start terminated server" }
- throw IllegalArgumentException("Server is terminated")
- }
- else -> assert(false) { "Invalid state transition" }
- }
+ val guests = _guests
+ for (i in guests.indices) {
+ guests[i].collectCpuTime(result)
}
+ }
- suspend fun stop() {
- when (state) {
- ServerState.RUNNING, ServerState.ERROR -> {
- val job = job ?: throw IllegalStateException("Server should be active")
- job.cancel()
- job.join()
- }
- ServerState.TERMINATED, ServerState.DELETED -> return
- else -> assert(false) { "Invalid state transition" }
- }
- }
+ private var _lastReport = clock.millis()
- suspend fun terminate() {
- stop()
- state = ServerState.DELETED
+ /**
+ * Helper function to track the uptime of a machine.
+ */
+ private fun updateUptime() {
+ val now = clock.millis()
+ val duration = now - _lastReport
+ _lastReport = now
+
+ if (_state == HostState.UP) {
+ _uptime += duration
+ } else if (_state == HostState.DOWN && scope.isActive) {
+ // Only increment downtime if the machine is in a failure state
+ _downtime += duration
}
- private var job: Job? = null
-
- private suspend fun launch() = suspendCancellableCoroutine<Unit> { cont ->
- assert(job == null) { "Concurrent job running" }
- val workload = mapper.createWorkload(server)
-
- job = scope.launch {
- delay(1) // TODO Introduce boot time
- init()
- cont.resume(Unit)
- try {
- machine.run(workload, mapOf("driver" to this@SimHost, "server" to server))
- exit(null)
- } catch (cause: Throwable) {
- exit(cause)
- } finally {
- machine.close()
- job = null
- }
- }
+ val guests = _guests
+ for (i in guests.indices) {
+ guests[i].updateUptime(duration)
}
+ }
+
+ private var _uptime = 0L
+ private var _downtime = 0L
+ private val _upState = Attributes.of(STATE_KEY, "up")
+ private val _downState = Attributes.of(STATE_KEY, "down")
- private fun init() {
- state = ServerState.RUNNING
- onGuestStart(this)
+ /**
+ * Helper function to track the uptime of a machine.
+ */
+ private fun collectUptime(result: ObservableLongMeasurement) {
+ updateUptime()
+
+ result.observe(_uptime, _upState)
+ result.observe(_downtime, _downState)
+
+ val guests = _guests
+ for (i in guests.indices) {
+ guests[i].collectUptime(result)
}
+ }
+
+ private var _bootTime = Long.MIN_VALUE
- private fun exit(cause: Throwable?) {
- state =
- if (cause == null)
- ServerState.TERMINATED
- else
- ServerState.ERROR
+ /**
+ * Helper function to track the boot time of a machine.
+ */
+ private fun collectBootTime(result: ObservableLongMeasurement) {
+ if (_bootTime != Long.MIN_VALUE) {
+ result.observe(_bootTime)
+ }
- availableMemory += server.flavor.memorySize
- onGuestStop(this)
+ val guests = _guests
+ for (i in guests.indices) {
+ guests[i].collectBootTime(result)
}
}
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFault.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFault.kt
new file mode 100644
index 00000000..258ccc89
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFault.kt
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.failure
+
+import org.opendc.compute.simulator.SimHost
+import java.time.Clock
+
+/**
+ * Interface responsible for applying the fault to a host.
+ */
+public interface HostFault {
+ /**
+ * Apply the fault to the specified [victims].
+ */
+ public suspend fun apply(clock: Clock, victims: List<SimHost>)
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFaultInjector.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFaultInjector.kt
new file mode 100644
index 00000000..5eff439f
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFaultInjector.kt
@@ -0,0 +1,65 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.failure
+
+import org.apache.commons.math3.distribution.RealDistribution
+import org.opendc.compute.simulator.SimHost
+import org.opendc.compute.simulator.internal.HostFaultInjectorImpl
+import java.time.Clock
+import kotlin.coroutines.CoroutineContext
+
+/**
+ * An interface for stochastically injecting faults into a set of hosts.
+ */
+public interface HostFaultInjector : AutoCloseable {
+ /**
+ * Start fault injection.
+ */
+ public fun start()
+
+ /**
+ * Stop fault injection into the system.
+ */
+ public override fun close()
+
+ public companion object {
+ /**
+ * Construct a new [HostFaultInjector].
+ *
+ * @param context The scope to run the fault injector in.
+ * @param clock The [Clock] to keep track of simulation time.
+ * @param hosts The hosts to inject the faults into.
+ * @param iat The inter-arrival time distribution of the failures (in hours).
+ * @param selector The [VictimSelector] to select the host victims.
+ * @param fault The type of [HostFault] to inject.
+ */
+ public operator fun invoke(
+ context: CoroutineContext,
+ clock: Clock,
+ hosts: Set<SimHost>,
+ iat: RealDistribution,
+ selector: VictimSelector,
+ fault: HostFault
+ ): HostFaultInjector = HostFaultInjectorImpl(context, clock, hosts, iat, selector, fault)
+ }
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StartStopHostFault.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StartStopHostFault.kt
new file mode 100644
index 00000000..fc7cebfc
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StartStopHostFault.kt
@@ -0,0 +1,55 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.failure
+
+import kotlinx.coroutines.delay
+import org.apache.commons.math3.distribution.RealDistribution
+import org.opendc.compute.simulator.SimHost
+import java.time.Clock
+import kotlin.math.roundToLong
+
+/**
+ * A type of [HostFault] where the hosts are stopped and recover after some random amount of time.
+ */
+public class StartStopHostFault(private val duration: RealDistribution) : HostFault {
+ override suspend fun apply(clock: Clock, victims: List<SimHost>) {
+ for (host in victims) {
+ host.fail()
+ }
+
+ val df = (duration.sample() * 1000).roundToLong() // seconds to milliseconds
+
+ // Handle long overflow
+ if (clock.millis() + df <= 0) {
+ return
+ }
+
+ delay(df)
+
+ for (host in victims) {
+ host.recover()
+ }
+ }
+
+ override fun toString(): String = "StartStopHostFault[$duration]"
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt
new file mode 100644
index 00000000..fcd9dd7e
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.failure
+
+import org.apache.commons.math3.distribution.RealDistribution
+import org.opendc.compute.simulator.SimHost
+import java.util.*
+import kotlin.math.roundToInt
+
+/**
+ * A [VictimSelector] that stochastically selects a set of hosts to be failed.
+ */
+public class StochasticVictimSelector(
+ private val size: RealDistribution,
+ private val random: Random = Random(0)
+) : VictimSelector {
+
+ override fun select(hosts: Set<SimHost>): List<SimHost> {
+ val n = size.sample().roundToInt()
+ return hosts.shuffled(random).take(n)
+ }
+
+ override fun toString(): String = "StochasticVictimSelector[$size]"
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/VictimSelector.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/VictimSelector.kt
new file mode 100644
index 00000000..b5610284
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/VictimSelector.kt
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.failure
+
+import org.opendc.compute.simulator.SimHost
+
+/**
+ * Interface responsible for selecting the victim(s) for fault injection.
+ */
+public interface VictimSelector {
+ /**
+ * Select the hosts from [hosts] where a fault will be injected.
+ */
+ public fun select(hosts: Set<SimHost>): List<SimHost>
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
new file mode 100644
index 00000000..5ea1860d
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
@@ -0,0 +1,350 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.internal
+
+import io.opentelemetry.api.common.AttributeKey
+import io.opentelemetry.api.common.Attributes
+import io.opentelemetry.api.common.AttributesBuilder
+import io.opentelemetry.api.metrics.ObservableDoubleMeasurement
+import io.opentelemetry.api.metrics.ObservableLongMeasurement
+import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
+import kotlinx.coroutines.*
+import mu.KotlinLogging
+import org.opendc.compute.api.Server
+import org.opendc.compute.api.ServerState
+import org.opendc.compute.simulator.SimHost
+import org.opendc.compute.simulator.SimWorkloadMapper
+import org.opendc.simulator.compute.kernel.SimVirtualMachine
+import org.opendc.simulator.compute.workload.SimWorkload
+import java.time.Clock
+import kotlin.coroutines.CoroutineContext
+
+/**
+ * A virtual machine instance that is managed by a [SimHost].
+ */
+internal class Guest(
+ context: CoroutineContext,
+ private val clock: Clock,
+ val host: SimHost,
+ private val mapper: SimWorkloadMapper,
+ private val listener: GuestListener,
+ val server: Server,
+ val machine: SimVirtualMachine
+) {
+ /**
+ * The [CoroutineScope] of the guest.
+ */
+ private val scope: CoroutineScope = CoroutineScope(context + Job())
+
+ /**
+ * The logger instance of this guest.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
+ * The state of the [Guest].
+ *
+ * [ServerState.PROVISIONING] is an invalid value for a guest, since it applies before the host is selected for
+ * a server.
+ */
+ var state: ServerState = ServerState.TERMINATED
+
+ /**
+ * The attributes of the guest.
+ */
+ val attributes: Attributes = GuestAttributes(this)
+
+ /**
+ * Start the guest.
+ */
+ suspend fun start() {
+ when (state) {
+ ServerState.TERMINATED, ServerState.ERROR -> {
+ logger.info { "User requested to start server ${server.uid}" }
+ doStart()
+ }
+ ServerState.RUNNING -> return
+ ServerState.DELETED -> {
+ logger.warn { "User tried to start deleted server" }
+ throw IllegalArgumentException("Server is deleted")
+ }
+ else -> assert(false) { "Invalid state transition" }
+ }
+ }
+
+ /**
+ * Stop the guest.
+ */
+ suspend fun stop() {
+ when (state) {
+ ServerState.RUNNING -> doStop(ServerState.TERMINATED)
+ ServerState.ERROR -> doRecover()
+ ServerState.TERMINATED, ServerState.DELETED -> return
+ else -> assert(false) { "Invalid state transition" }
+ }
+ }
+
+ /**
+ * Delete the guest.
+ *
+ * This operation will stop the guest if it is running on the host and remove all resources associated with the
+ * guest.
+ */
+ suspend fun delete() {
+ stop()
+
+ state = ServerState.DELETED
+
+ machine.close()
+ scope.cancel()
+ }
+
+ /**
+ * Fail the guest if it is active.
+ *
+ * This operation forcibly stops the guest and puts the server into an error state.
+ */
+ suspend fun fail() {
+ if (state != ServerState.RUNNING) {
+ return
+ }
+
+ doStop(ServerState.ERROR)
+ }
+
+ /**
+ * Recover the guest if it is in an error state.
+ */
+ suspend fun recover() {
+ if (state != ServerState.ERROR) {
+ return
+ }
+
+ doStart()
+ }
+
+ /**
+ * The [Job] representing the current active virtual machine instance or `null` if no virtual machine is active.
+ */
+ private var job: Job? = null
+
+ /**
+ * Launch the guest on the simulated
+ */
+ private suspend fun doStart() {
+ assert(job == null) { "Concurrent job running" }
+ val workload = mapper.createWorkload(server)
+
+ val job = scope.launch { runMachine(workload) }
+ this.job = job
+
+ state = ServerState.RUNNING
+ onStart()
+
+ job.invokeOnCompletion { cause ->
+ this.job = null
+ onStop(if (cause != null && cause !is CancellationException) ServerState.ERROR else ServerState.TERMINATED)
+ }
+ }
+
+ /**
+ * Attempt to stop the server and put it into [target] state.
+ */
+ private suspend fun doStop(target: ServerState) {
+ assert(job != null) { "Invalid job state" }
+ val job = job ?: return
+ job.cancel()
+ job.join()
+
+ state = target
+ }
+
+ /**
+ * Attempt to recover from an error state.
+ */
+ private fun doRecover() {
+ state = ServerState.TERMINATED
+ }
+
+ /**
+ * Converge the process that models the virtual machine lifecycle as a coroutine.
+ */
+ private suspend fun runMachine(workload: SimWorkload) {
+ delay(1) // TODO Introduce model for boot time
+ machine.run(workload, mapOf("driver" to host, "server" to server))
+ }
+
+ /**
+ * This method is invoked when the guest was started on the host and has booted into a running state.
+ */
+ private fun onStart() {
+ _bootTime = clock.millis()
+ state = ServerState.RUNNING
+ listener.onStart(this)
+ }
+
+ /**
+ * This method is invoked when the guest stopped.
+ */
+ private fun onStop(target: ServerState) {
+ state = target
+ listener.onStop(this)
+ }
+
+ private val STATE_KEY = AttributeKey.stringKey("state")
+
+ private var _uptime = 0L
+ private var _downtime = 0L
+ private val _upState = attributes.toBuilder()
+ .put(STATE_KEY, "up")
+ .build()
+ private val _downState = attributes.toBuilder()
+ .put(STATE_KEY, "down")
+ .build()
+
+ /**
+ * Helper function to track the uptime and downtime of the guest.
+ */
+ fun updateUptime(duration: Long) {
+ if (state == ServerState.RUNNING) {
+ _uptime += duration
+ } else if (state == ServerState.ERROR) {
+ _downtime += duration
+ }
+ }
+
+ /**
+ * Helper function to track the uptime of the guest.
+ */
+ fun collectUptime(result: ObservableLongMeasurement) {
+ result.observe(_uptime, _upState)
+ result.observe(_downtime, _downState)
+ }
+
+ private var _bootTime = Long.MIN_VALUE
+
+ /**
+ * Helper function to track the boot time of the guest.
+ */
+ fun collectBootTime(result: ObservableLongMeasurement) {
+ if (_bootTime != Long.MIN_VALUE) {
+ result.observe(_bootTime)
+ }
+ }
+
+ private val _activeState = attributes.toBuilder()
+ .put(STATE_KEY, "active")
+ .build()
+ private val _stealState = attributes.toBuilder()
+ .put(STATE_KEY, "steal")
+ .build()
+ private val _lostState = attributes.toBuilder()
+ .put(STATE_KEY, "lost")
+ .build()
+ private val _idleState = attributes.toBuilder()
+ .put(STATE_KEY, "idle")
+ .build()
+
+ /**
+ * Helper function to track the CPU time of a machine.
+ */
+ fun collectCpuTime(result: ObservableLongMeasurement) {
+ val counters = machine.counters
+
+ result.observe(counters.cpuActiveTime / 1000, _activeState)
+ result.observe(counters.cpuIdleTime / 1000, _idleState)
+ result.observe(counters.cpuStealTime / 1000, _stealState)
+ result.observe(counters.cpuLostTime / 1000, _lostState)
+ }
+
+ private val _cpuLimit = machine.model.cpus.sumOf { it.frequency }
+
+ /**
+ * Helper function to collect the CPU limits of a machine.
+ */
+ fun collectCpuLimit(result: ObservableDoubleMeasurement) {
+ result.observe(_cpuLimit, attributes)
+ }
+
+ /**
+ * An optimized [Attributes] implementation.
+ */
+ private class GuestAttributes(private val uid: String, private val attributes: Attributes) : Attributes by attributes {
+ /**
+ * Construct a [GuestAttributes] instance from a [Guest].
+ */
+ constructor(guest: Guest) : this(
+ guest.server.uid.toString(),
+ Attributes.builder()
+ .put(ResourceAttributes.HOST_NAME, guest.server.name)
+ .put(ResourceAttributes.HOST_ID, guest.server.uid.toString())
+ .put(ResourceAttributes.HOST_TYPE, guest.server.flavor.name)
+ .put(AttributeKey.longKey("host.num_cpus"), guest.server.flavor.cpuCount.toLong())
+ .put(AttributeKey.longKey("host.mem_capacity"), guest.server.flavor.memorySize)
+ .put(AttributeKey.stringArrayKey("host.labels"), guest.server.labels.map { (k, v) -> "$k:$v" })
+ .put(ResourceAttributes.HOST_ARCH, ResourceAttributes.HostArchValues.AMD64)
+ .put(ResourceAttributes.HOST_IMAGE_NAME, guest.server.image.name)
+ .put(ResourceAttributes.HOST_IMAGE_ID, guest.server.image.uid.toString())
+ .build()
+ )
+
+ override fun <T : Any?> get(key: AttributeKey<T>): T? {
+ // Optimize access to the HOST_ID key which is accessed quite often
+ if (key == ResourceAttributes.HOST_ID) {
+ @Suppress("UNCHECKED_CAST")
+ return uid as T?
+ }
+ return attributes.get(key)
+ }
+
+ override fun toBuilder(): AttributesBuilder {
+ val delegate = attributes.toBuilder()
+ return object : AttributesBuilder {
+
+ override fun putAll(attributes: Attributes): AttributesBuilder {
+ delegate.putAll(attributes)
+ return this
+ }
+
+ override fun <T : Any?> put(key: AttributeKey<Long>, value: Int): AttributesBuilder {
+ delegate.put<T>(key, value)
+ return this
+ }
+
+ override fun <T : Any?> put(key: AttributeKey<T>, value: T): AttributesBuilder {
+ delegate.put(key, value)
+ return this
+ }
+
+ override fun build(): Attributes = GuestAttributes(uid, delegate.build())
+ }
+ }
+
+ override fun equals(other: Any?): Boolean = attributes == other
+
+ // Cache hash code
+ private val _hash = attributes.hashCode()
+
+ override fun hashCode(): Int = _hash
+ }
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt
new file mode 100644
index 00000000..e6d0fdad
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.internal
+
+/**
+ * Helper interface to listen for [Guest] events.
+ */
+internal interface GuestListener {
+ /**
+ * This method is invoked when the guest machine is running.
+ */
+ fun onStart(guest: Guest)
+
+ /**
+ * This method is invoked when the guest machine is stopped.
+ */
+ fun onStop(guest: Guest)
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt
new file mode 100644
index 00000000..7d46e626
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt
@@ -0,0 +1,103 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.internal
+
+import kotlinx.coroutines.*
+import org.apache.commons.math3.distribution.RealDistribution
+import org.opendc.compute.simulator.SimHost
+import org.opendc.compute.simulator.failure.HostFault
+import org.opendc.compute.simulator.failure.HostFaultInjector
+import org.opendc.compute.simulator.failure.VictimSelector
+import java.time.Clock
+import kotlin.coroutines.CoroutineContext
+import kotlin.math.roundToLong
+
+/**
+ * Internal implementation of the [HostFaultInjector] interface.
+ *
+ * @param context The scope to run the fault injector in.
+ * @param clock The [Clock] to keep track of simulation time.
+ * @param hosts The set of hosts to inject faults into.
+ * @param iat The inter-arrival time distribution of the failures (in hours).
+ * @param selector The [VictimSelector] to select the host victims.
+ * @param fault The type of [HostFault] to inject.
+ */
+internal class HostFaultInjectorImpl(
+ private val context: CoroutineContext,
+ private val clock: Clock,
+ private val hosts: Set<SimHost>,
+ private val iat: RealDistribution,
+ private val selector: VictimSelector,
+ private val fault: HostFault
+) : HostFaultInjector {
+ /**
+ * The scope in which the injector runs.
+ */
+ private val scope = CoroutineScope(context + Job())
+
+ /**
+ * The [Job] that awaits the nearest fault in the system.
+ */
+ private var job: Job? = null
+
+ /**
+ * Start the fault injection into the system.
+ */
+ override fun start() {
+ if (job != null) {
+ return
+ }
+
+ job = scope.launch {
+ runInjector()
+ job = null
+ }
+ }
+
+ /**
+ * Converge the injection process.
+ */
+ private suspend fun runInjector() {
+ while (true) {
+ // Make sure to convert delay from hours to milliseconds
+ val d = (iat.sample() * 3.6e6).roundToLong()
+
+ // Handle long overflow
+ if (clock.millis() + d <= 0) {
+ return
+ }
+
+ delay(d)
+
+ val victims = selector.select(hosts)
+ fault.apply(clock, victims)
+ }
+ }
+
+ /**
+ * Stop the fault injector.
+ */
+ public override fun close() {
+ scope.cancel()
+ }
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index 5594fd59..a0ff9228 100644
--- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -23,47 +23,48 @@
package org.opendc.compute.simulator
import io.opentelemetry.api.metrics.MeterProvider
-import io.opentelemetry.sdk.common.CompletableResultCode
import io.opentelemetry.sdk.metrics.SdkMeterProvider
-import io.opentelemetry.sdk.metrics.data.MetricData
-import io.opentelemetry.sdk.metrics.export.MetricExporter
import io.opentelemetry.sdk.metrics.export.MetricProducer
+import io.opentelemetry.sdk.resources.Resource
import kotlinx.coroutines.*
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
-import org.opendc.compute.api.Flavor
-import org.opendc.compute.api.Image
-import org.opendc.compute.api.Server
-import org.opendc.compute.api.ServerState
-import org.opendc.compute.api.ServerWatcher
+import org.opendc.compute.api.*
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.driver.HostListener
-import org.opendc.simulator.compute.SimFairShareHypervisorProvider
-import org.opendc.simulator.compute.SimMachineModel
+import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider
+import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.workload.SimTrace
+import org.opendc.simulator.compute.workload.SimTraceFragment
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.flow.FlowEngine
+import org.opendc.telemetry.compute.ComputeMetricExporter
+import org.opendc.telemetry.compute.HOST_ID
+import org.opendc.telemetry.compute.table.HostData
+import org.opendc.telemetry.compute.table.ServerData
import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import org.opendc.telemetry.sdk.toOtelClock
-import java.util.UUID
+import java.time.Duration
+import java.util.*
import kotlin.coroutines.resume
/**
* Basic test-suite for the hypervisor.
*/
-@OptIn(ExperimentalCoroutinesApi::class)
internal class SimHostTest {
- private lateinit var machineModel: SimMachineModel
+ private lateinit var machineModel: MachineModel
@BeforeEach
fun setUp() {
val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2)
- machineModel = SimMachineModel(
+ machineModel = MachineModel(
cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 3200.0) },
memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
)
@@ -74,16 +75,31 @@ internal class SimHostTest {
*/
@Test
fun testOvercommitted() = runBlockingSimulation {
- var requestedWork = 0L
- var grantedWork = 0L
- var overcommittedWork = 0L
+ var idleTime = 0L
+ var activeTime = 0L
+ var stealTime = 0L
+ val hostId = UUID.randomUUID()
+ val hostResource = Resource.builder()
+ .put(HOST_ID, hostId.toString())
+ .build()
val meterProvider: MeterProvider = SdkMeterProvider
.builder()
+ .setResource(hostResource)
.setClock(clock.toOtelClock())
.build()
- val virtDriver = SimHost(UUID.randomUUID(), "test", machineModel, emptyMap(), coroutineContext, clock, meterProvider.get("opendc-compute-simulator"), SimFairShareHypervisorProvider())
+ val engine = FlowEngine(coroutineContext, clock)
+ val virtDriver = SimHost(
+ uid = hostId,
+ name = "test",
+ model = machineModel,
+ meta = emptyMap(),
+ coroutineContext,
+ engine,
+ meterProvider,
+ SimFairShareHypervisorProvider()
+ )
val duration = 5 * 60L
val vmImageA = MockImage(
UUID.randomUUID(),
@@ -91,12 +107,13 @@ internal class SimHostTest {
emptyMap(),
mapOf(
"workload" to SimTraceWorkload(
- sequenceOf(
- SimTraceWorkload.Fragment(duration * 1000, 2 * 28.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 2 * 3500.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 0.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 2 * 183.0, 2)
+ SimTrace.ofFragments(
+ SimTraceFragment(0, duration * 1000, 2 * 28.0, 2),
+ SimTraceFragment(duration * 1000, duration * 1000, 2 * 3500.0, 2),
+ SimTraceFragment(duration * 2000, duration * 1000, 0.0, 2),
+ SimTraceFragment(duration * 3000, duration * 1000, 2 * 183.0, 2)
),
+ offset = 1
)
)
)
@@ -106,12 +123,13 @@ internal class SimHostTest {
emptyMap(),
mapOf(
"workload" to SimTraceWorkload(
- sequenceOf(
- SimTraceWorkload.Fragment(duration * 1000, 2 * 28.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 2 * 3100.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 0.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 2 * 73.0, 2)
- )
+ SimTrace.ofFragments(
+ SimTraceFragment(0, duration * 1000, 2 * 28.0, 2),
+ SimTraceFragment(duration * 1000, duration * 1000, 2 * 3100.0, 2),
+ SimTraceFragment(duration * 2000, duration * 1000, 0.0, 2),
+ SimTraceFragment(duration * 3000, duration * 1000, 2 * 73.0, 2)
+ ),
+ offset = 1
)
)
)
@@ -121,20 +139,14 @@ internal class SimHostTest {
// Setup metric reader
val reader = CoroutineMetricReader(
this, listOf(meterProvider as MetricProducer),
- object : MetricExporter {
- override fun export(metrics: Collection<MetricData>): CompletableResultCode {
- val metricsByName = metrics.associateBy { it.name }
- requestedWork += metricsByName.getValue("cpu.work.total").doubleSummaryData.points.first().sum.toLong()
- grantedWork += metricsByName.getValue("cpu.work.granted").doubleSummaryData.points.first().sum.toLong()
- overcommittedWork += metricsByName.getValue("cpu.work.overcommit").doubleSummaryData.points.first().sum.toLong()
- return CompletableResultCode.ofSuccess()
+ object : ComputeMetricExporter() {
+ override fun record(data: HostData) {
+ activeTime += data.cpuActiveTime
+ idleTime += data.cpuIdleTime
+ stealTime += data.cpuStealTime
}
-
- override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess()
-
- override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess()
},
- exportInterval = duration * 1000
+ exportInterval = Duration.ofSeconds(duration)
)
coroutineScope {
@@ -155,18 +167,124 @@ internal class SimHostTest {
}
// Ensure last cycle is collected
- delay(1000 * duration)
+ delay(1000L * duration)
virtDriver.close()
reader.close()
assertAll(
- { assertEquals(4197600, requestedWork, "Requested work does not match") },
- { assertEquals(2157600, grantedWork, "Granted work does not match") },
- { assertEquals(2040000, overcommittedWork, "Overcommitted work does not match") },
+ { assertEquals(658, activeTime, "Active time does not match") },
+ { assertEquals(1741, idleTime, "Idle time does not match") },
+ { assertEquals(637, stealTime, "Steal time does not match") },
{ assertEquals(1500001, clock.millis()) }
)
}
+ /**
+ * Test failure of the host.
+ */
+ @Test
+ fun testFailure() = runBlockingSimulation {
+ var activeTime = 0L
+ var idleTime = 0L
+ var uptime = 0L
+ var downtime = 0L
+ var guestUptime = 0L
+ var guestDowntime = 0L
+
+ val hostId = UUID.randomUUID()
+ val hostResource = Resource.builder()
+ .put(HOST_ID, hostId.toString())
+ .build()
+ val meterProvider: MeterProvider = SdkMeterProvider
+ .builder()
+ .setResource(hostResource)
+ .setClock(clock.toOtelClock())
+ .build()
+
+ val engine = FlowEngine(coroutineContext, clock)
+ val host = SimHost(
+ uid = hostId,
+ name = "test",
+ model = machineModel,
+ meta = emptyMap(),
+ coroutineContext,
+ engine,
+ meterProvider,
+ SimFairShareHypervisorProvider()
+ )
+ val duration = 5 * 60L
+ val image = MockImage(
+ UUID.randomUUID(),
+ "<unnamed>",
+ emptyMap(),
+ mapOf(
+ "workload" to SimTraceWorkload(
+ SimTrace.ofFragments(
+ SimTraceFragment(0, duration * 1000, 2 * 28.0, 2),
+ SimTraceFragment(duration * 1000L, duration * 1000, 2 * 3500.0, 2),
+ SimTraceFragment(duration * 2000L, duration * 1000, 0.0, 2),
+ SimTraceFragment(duration * 3000L, duration * 1000, 2 * 183.0, 2)
+ ),
+ offset = 1
+ )
+ )
+ )
+ val flavor = MockFlavor(2, 0)
+ val server = MockServer(UUID.randomUUID(), "a", flavor, image)
+
+ // Setup metric reader
+ val reader = CoroutineMetricReader(
+ this, listOf(meterProvider as MetricProducer),
+ object : ComputeMetricExporter() {
+ override fun record(data: HostData) {
+ activeTime += data.cpuActiveTime
+ idleTime += data.cpuIdleTime
+ uptime += data.uptime
+ downtime += data.downtime
+ }
+
+ override fun record(data: ServerData) {
+ guestUptime += data.uptime
+ guestDowntime += data.downtime
+ }
+ },
+ exportInterval = Duration.ofSeconds(duration)
+ )
+
+ coroutineScope {
+ host.spawn(server)
+ delay(5000L)
+ host.fail()
+ delay(duration * 1000)
+ host.recover()
+
+ suspendCancellableCoroutine<Unit> { cont ->
+ host.addListener(object : HostListener {
+ override fun onStateChanged(host: Host, server: Server, newState: ServerState) {
+ if (newState == ServerState.TERMINATED) {
+ cont.resume(Unit)
+ }
+ }
+ })
+ }
+ }
+
+ host.close()
+ // Ensure last cycle is collected
+ delay(1000L * duration)
+
+ reader.close()
+
+ assertAll(
+ { assertEquals(1175, idleTime, "Idle time does not match") },
+ { assertEquals(624, activeTime, "Active time does not match") },
+ { assertEquals(900001, uptime, "Uptime does not match") },
+ { assertEquals(300000, downtime, "Downtime does not match") },
+ { assertEquals(900000, guestUptime, "Guest uptime does not match") },
+ { assertEquals(300000, guestDowntime, "Guest downtime does not match") },
+ )
+ }
+
private class MockFlavor(
override val cpuCount: Int,
override val memorySize: Long
diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/failure/HostFaultInjectorTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/failure/HostFaultInjectorTest.kt
new file mode 100644
index 00000000..f240a25f
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/failure/HostFaultInjectorTest.kt
@@ -0,0 +1,111 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.failure
+
+import io.mockk.coVerify
+import io.mockk.mockk
+import kotlinx.coroutines.delay
+import org.apache.commons.math3.distribution.LogNormalDistribution
+import org.apache.commons.math3.random.Well19937c
+import org.junit.jupiter.api.Test
+import org.opendc.compute.simulator.SimHost
+import org.opendc.simulator.core.runBlockingSimulation
+import java.time.Clock
+import java.time.Duration
+import kotlin.coroutines.CoroutineContext
+import kotlin.math.ln
+
+/**
+ * Test suite for [HostFaultInjector] class.
+ */
+internal class HostFaultInjectorTest {
+ /**
+ * Simple test case to test that nothing happens when the injector is not started.
+ */
+ @Test
+ fun testInjectorNotStarted() = runBlockingSimulation {
+ val host = mockk<SimHost>(relaxUnitFun = true)
+
+ val injector = createSimpleInjector(coroutineContext, clock, setOf(host))
+
+ coVerify(exactly = 0) { host.fail() }
+ coVerify(exactly = 0) { host.recover() }
+
+ injector.close()
+ }
+
+ /**
+ * Simple test case to test a start stop fault where the machine is stopped and started after some time.
+ */
+ @Test
+ fun testInjectorStopsMachine() = runBlockingSimulation {
+ val host = mockk<SimHost>(relaxUnitFun = true)
+
+ val injector = createSimpleInjector(coroutineContext, clock, setOf(host))
+
+ injector.start()
+
+ delay(Duration.ofDays(55).toMillis())
+
+ injector.close()
+
+ coVerify(exactly = 1) { host.fail() }
+ coVerify(exactly = 1) { host.recover() }
+ }
+
+ /**
+ * Simple test case to test a start stop fault where multiple machines are stopped.
+ */
+ @Test
+ fun testInjectorStopsMultipleMachines() = runBlockingSimulation {
+ val hosts = listOf<SimHost>(
+ mockk(relaxUnitFun = true),
+ mockk(relaxUnitFun = true)
+ )
+
+ val injector = createSimpleInjector(coroutineContext, clock, hosts.toSet())
+
+ injector.start()
+
+ delay(Duration.ofDays(55).toMillis())
+
+ injector.close()
+
+ coVerify(exactly = 1) { hosts[0].fail() }
+ coVerify(exactly = 1) { hosts[1].fail() }
+ coVerify(exactly = 1) { hosts[0].recover() }
+ coVerify(exactly = 1) { hosts[1].recover() }
+ }
+
+ /**
+ * Create a simple start stop fault injector.
+ */
+ private fun createSimpleInjector(context: CoroutineContext, clock: Clock, hosts: Set<SimHost>): HostFaultInjector {
+ val rng = Well19937c(0)
+ val iat = LogNormalDistribution(rng, ln(24 * 7.0), 1.03)
+ val selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25))
+ val fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71))
+
+ return HostFaultInjector(context, clock, hosts, iat, selector, fault)
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/build.gradle.kts b/opendc-compute/opendc-compute-workload/build.gradle.kts
new file mode 100644
index 00000000..28a5e1da
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/build.gradle.kts
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+description = "Support library for simulating VM-based workloads with OpenDC"
+
+/* Build configuration */
+plugins {
+ `kotlin-library-conventions`
+ `testing-conventions`
+}
+
+dependencies {
+ api(platform(projects.opendcPlatform))
+ api(projects.opendcCompute.opendcComputeSimulator)
+
+ implementation(projects.opendcTrace.opendcTraceApi)
+ implementation(projects.opendcTrace.opendcTraceParquet)
+ implementation(projects.opendcSimulator.opendcSimulatorCore)
+ implementation(projects.opendcSimulator.opendcSimulatorCompute)
+ implementation(projects.opendcTelemetry.opendcTelemetrySdk)
+ implementation(projects.opendcTelemetry.opendcTelemetryCompute)
+ implementation(libs.opentelemetry.semconv)
+
+ implementation(libs.kotlin.logging)
+ implementation(libs.jackson.databind)
+ implementation(libs.jackson.module.kotlin)
+ implementation(kotlin("reflect"))
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeSchedulers.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeSchedulers.kt
new file mode 100644
index 00000000..c94f30e4
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeSchedulers.kt
@@ -0,0 +1,86 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+@file:JvmName("ComputeSchedulers")
+package org.opendc.compute.workload
+
+import org.opendc.compute.service.scheduler.ComputeScheduler
+import org.opendc.compute.service.scheduler.FilterScheduler
+import org.opendc.compute.service.scheduler.ReplayScheduler
+import org.opendc.compute.service.scheduler.filters.ComputeFilter
+import org.opendc.compute.service.scheduler.filters.RamFilter
+import org.opendc.compute.service.scheduler.filters.VCpuFilter
+import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
+import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher
+import org.opendc.compute.service.scheduler.weights.RamWeigher
+import org.opendc.compute.service.scheduler.weights.VCpuWeigher
+import java.util.*
+
+/**
+ * Create a [ComputeScheduler] for the experiment.
+ */
+public fun createComputeScheduler(name: String, seeder: Random, placements: Map<String, String> = emptyMap()): ComputeScheduler {
+ val cpuAllocationRatio = 16.0
+ val ramAllocationRatio = 1.5
+ return when (name) {
+ "mem" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(RamWeigher(multiplier = 1.0))
+ )
+ "mem-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(RamWeigher(multiplier = -1.0))
+ )
+ "core-mem" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(CoreRamWeigher(multiplier = 1.0))
+ )
+ "core-mem-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(CoreRamWeigher(multiplier = -1.0))
+ )
+ "active-servers" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(InstanceCountWeigher(multiplier = -1.0))
+ )
+ "active-servers-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(InstanceCountWeigher(multiplier = 1.0))
+ )
+ "provisioned-cores" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = 1.0))
+ )
+ "provisioned-cores-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = -1.0))
+ )
+ "random" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = emptyList(),
+ subsetSize = Int.MAX_VALUE,
+ random = Random(seeder.nextLong())
+ )
+ "replay" -> ReplayScheduler(placements)
+ else -> throw IllegalArgumentException("Unknown policy $name")
+ }
+}
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RandomWeigher.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt
index 1615df3a..78002c2f 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RandomWeigher.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt
@@ -20,17 +20,16 @@
* SOFTWARE.
*/
-package org.opendc.compute.service.scheduler.weights
+package org.opendc.compute.workload
-import org.opendc.compute.api.Server
-import org.opendc.compute.service.internal.HostView
import java.util.*
/**
- * A [HostWeigher] that assigns random weights to each host every selection.
+ * An interface that describes how a workload is resolved.
*/
-public class RandomWeigher(private val random: Random) : HostWeigher {
- override fun getWeight(host: HostView, server: Server): Double = random.nextDouble()
-
- override fun toString(): String = "RandomWeigher"
+public interface ComputeWorkload {
+ /**
+ * Resolve the workload into a list of [VirtualMachine]s to simulate.
+ */
+ public fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine>
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
new file mode 100644
index 00000000..1a6624f7
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
@@ -0,0 +1,199 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload
+
+import mu.KotlinLogging
+import org.opendc.simulator.compute.workload.SimTrace
+import org.opendc.trace.*
+import java.io.File
+import java.time.Duration
+import java.time.Instant
+import java.util.*
+import java.util.concurrent.ConcurrentHashMap
+import kotlin.math.max
+import kotlin.math.roundToLong
+
+/**
+ * A helper class for loading compute workload traces into memory.
+ *
+ * @param baseDir The directory containing the traces.
+ */
+public class ComputeWorkloadLoader(private val baseDir: File) {
+ /**
+ * The logger for this instance.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
+ * The cache of workloads.
+ */
+ private val cache = ConcurrentHashMap<String, List<VirtualMachine>>()
+
+ /**
+ * Read the fragments into memory.
+ */
+ private fun parseFragments(trace: Trace): Map<String, Builder> {
+ val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader()
+
+ val idCol = reader.resolve(RESOURCE_ID)
+ val timestampCol = reader.resolve(RESOURCE_STATE_TIMESTAMP)
+ val durationCol = reader.resolve(RESOURCE_STATE_DURATION)
+ val coresCol = reader.resolve(RESOURCE_CPU_COUNT)
+ val usageCol = reader.resolve(RESOURCE_STATE_CPU_USAGE)
+
+ val fragments = mutableMapOf<String, Builder>()
+
+ return try {
+ while (reader.nextRow()) {
+ val id = reader.get(idCol) as String
+ val time = reader.get(timestampCol) as Instant
+ val duration = reader.get(durationCol) as Duration
+ val cores = reader.getInt(coresCol)
+ val cpuUsage = reader.getDouble(usageCol)
+
+ val timeMs = time.toEpochMilli()
+ val deadlineMs = timeMs + duration.toMillis()
+ val builder = fragments.computeIfAbsent(id) { Builder() }
+ builder.add(timeMs, deadlineMs, cpuUsage, cores)
+ }
+
+ fragments
+ } finally {
+ reader.close()
+ }
+ }
+
+ /**
+ * Read the metadata into a workload.
+ */
+ private fun parseMeta(trace: Trace, fragments: Map<String, Builder>): List<VirtualMachine> {
+ val reader = checkNotNull(trace.getTable(TABLE_RESOURCES)).newReader()
+
+ val idCol = reader.resolve(RESOURCE_ID)
+ val startTimeCol = reader.resolve(RESOURCE_START_TIME)
+ val stopTimeCol = reader.resolve(RESOURCE_STOP_TIME)
+ val coresCol = reader.resolve(RESOURCE_CPU_COUNT)
+ val memCol = reader.resolve(RESOURCE_MEM_CAPACITY)
+
+ var counter = 0
+ val entries = mutableListOf<VirtualMachine>()
+
+ return try {
+ while (reader.nextRow()) {
+
+ val id = reader.get(idCol) as String
+ if (!fragments.containsKey(id)) {
+ continue
+ }
+
+ val submissionTime = reader.get(startTimeCol) as Instant
+ val endTime = reader.get(stopTimeCol) as Instant
+ val maxCores = reader.getInt(coresCol)
+ val requiredMemory = reader.getDouble(memCol) / 1000.0 // Convert from KB to MB
+ val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray())
+
+ val builder = fragments.getValue(id)
+ val totalLoad = builder.totalLoad
+
+ entries.add(
+ VirtualMachine(
+ uid,
+ id,
+ maxCores,
+ requiredMemory.roundToLong(),
+ totalLoad,
+ submissionTime,
+ endTime,
+ builder.build()
+ )
+ )
+ }
+
+ // Make sure the virtual machines are ordered by start time
+ entries.sortBy { it.startTime }
+
+ entries
+ } catch (e: Exception) {
+ e.printStackTrace()
+ throw e
+ } finally {
+ reader.close()
+ }
+ }
+
+ /**
+ * Load the trace with the specified [name] and [format].
+ */
+ public fun get(name: String, format: String): List<VirtualMachine> {
+ return cache.computeIfAbsent(name) {
+ val path = baseDir.resolve(it)
+
+ logger.info { "Loading trace $it at $path" }
+
+ val trace = Trace.open(path, format)
+ val fragments = parseFragments(trace)
+ parseMeta(trace, fragments)
+ }
+ }
+
+ /**
+ * Clear the workload cache.
+ */
+ public fun reset() {
+ cache.clear()
+ }
+
+ /**
+ * A builder for a VM trace.
+ */
+ private class Builder {
+ /**
+ * The total load of the trace.
+ */
+ @JvmField var totalLoad: Double = 0.0
+
+ /**
+ * The internal builder for the trace.
+ */
+ private val builder = SimTrace.builder()
+
+ /**
+ * Add a fragment to the trace.
+ *
+ * @param timestamp Timestamp at which the fragment starts (in epoch millis).
+ * @param deadline Timestamp at which the fragment ends (in epoch millis).
+ * @param usage CPU usage of this fragment.
+ * @param cores Number of cores used.
+ */
+ fun add(timestamp: Long, deadline: Long, usage: Double, cores: Int) {
+ val duration = max(0, deadline - timestamp)
+ totalLoad += (usage * duration) / 1000.0 // avg MHz * duration = MFLOPs
+ builder.add(timestamp, deadline, usage, cores)
+ }
+
+ /**
+ * Build the trace.
+ */
+ fun build(): SimTrace = builder.build()
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt
new file mode 100644
index 00000000..283f82fe
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt
@@ -0,0 +1,221 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload
+
+import io.opentelemetry.sdk.metrics.SdkMeterProvider
+import io.opentelemetry.sdk.metrics.export.MetricProducer
+import io.opentelemetry.sdk.resources.Resource
+import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
+import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.yield
+import org.opendc.compute.service.ComputeService
+import org.opendc.compute.service.scheduler.ComputeScheduler
+import org.opendc.compute.simulator.SimHost
+import org.opendc.compute.workload.topology.HostSpec
+import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
+import org.opendc.simulator.compute.workload.SimTraceWorkload
+import org.opendc.simulator.flow.FlowEngine
+import org.opendc.telemetry.compute.*
+import org.opendc.telemetry.sdk.toOtelClock
+import java.time.Clock
+import java.util.*
+import kotlin.coroutines.CoroutineContext
+import kotlin.math.max
+
+/**
+ * Helper class to simulated VM-based workloads in OpenDC.
+ *
+ * @param context [CoroutineContext] to run the simulation in.
+ * @param clock [Clock] instance tracking simulation time.
+ * @param scheduler [ComputeScheduler] implementation to use for the service.
+ * @param failureModel A failure model to use for injecting failures.
+ * @param interferenceModel The model to use for performance interference.
+ */
+public class ComputeWorkloadRunner(
+ private val context: CoroutineContext,
+ private val clock: Clock,
+ scheduler: ComputeScheduler,
+ private val failureModel: FailureModel? = null,
+ private val interferenceModel: VmInterferenceModel? = null,
+) : AutoCloseable {
+ /**
+ * The [ComputeService] that has been configured by the manager.
+ */
+ public val service: ComputeService
+
+ /**
+ * The [MetricProducer] that are used by the [ComputeService] and the simulated hosts.
+ */
+ public val producers: List<MetricProducer>
+ get() = _metricProducers
+ private val _metricProducers = mutableListOf<MetricProducer>()
+
+ /**
+ * The [FlowEngine] to simulate the hosts.
+ */
+ private val engine = FlowEngine(context, clock)
+
+ /**
+ * The hosts that belong to this class.
+ */
+ private val hosts = mutableSetOf<SimHost>()
+
+ init {
+ val (service, serviceMeterProvider) = createService(scheduler)
+ this._metricProducers.add(serviceMeterProvider)
+ this.service = service
+ }
+
+ /**
+ * Converge a simulation of the [ComputeService] by replaying the workload trace given by [trace].
+ */
+ public suspend fun run(trace: List<VirtualMachine>, seed: Long) {
+ val random = Random(seed)
+ val injector = failureModel?.createInjector(context, clock, service, random)
+ val client = service.newClient()
+
+ // Create new image for the virtual machine
+ val image = client.newImage("vm-image")
+
+ try {
+ coroutineScope {
+ // Start the fault injector
+ injector?.start()
+
+ var offset = Long.MIN_VALUE
+
+ for (entry in trace.sortedBy { it.startTime }) {
+ val now = clock.millis()
+ val start = entry.startTime.toEpochMilli()
+
+ if (offset < 0) {
+ offset = start - now
+ }
+
+ // Make sure the trace entries are ordered by submission time
+ assert(start - offset >= 0) { "Invalid trace order" }
+ delay(max(0, (start - offset) - now))
+
+ launch {
+ val workloadOffset = -offset + 300001
+ val workload = SimTraceWorkload(entry.trace, workloadOffset)
+
+ val server = client.newServer(
+ entry.name,
+ image,
+ client.newFlavor(
+ entry.name,
+ entry.cpuCount,
+ entry.memCapacity
+ ),
+ meta = mapOf("workload" to workload)
+ )
+
+ // Wait for the server reach its end time
+ val endTime = entry.stopTime.toEpochMilli()
+ delay(endTime + workloadOffset - clock.millis() + 5 * 60 * 1000)
+
+ // Delete the server after reaching the end-time of the virtual machine
+ server.delete()
+ }
+ }
+ }
+
+ yield()
+ } finally {
+ injector?.close()
+ client.close()
+ }
+ }
+
+ /**
+ * Register a host for this simulation.
+ *
+ * @param spec The definition of the host.
+ * @param optimize Merge the CPU resources of the host into a single CPU resource.
+ * @return The [SimHost] that has been constructed by the runner.
+ */
+ public fun registerHost(spec: HostSpec, optimize: Boolean = false): SimHost {
+ val resource = Resource.builder()
+ .put(HOST_ID, spec.uid.toString())
+ .put(HOST_NAME, spec.name)
+ .put(HOST_ARCH, ResourceAttributes.HostArchValues.AMD64)
+ .put(HOST_NCPUS, spec.model.cpus.size)
+ .put(HOST_MEM_CAPACITY, spec.model.memory.sumOf { it.size })
+ .build()
+
+ val meterProvider = SdkMeterProvider.builder()
+ .setClock(clock.toOtelClock())
+ .setResource(resource)
+ .build()
+ _metricProducers.add(meterProvider)
+
+ val host = SimHost(
+ spec.uid,
+ spec.name,
+ spec.model,
+ spec.meta,
+ context,
+ engine,
+ meterProvider,
+ spec.hypervisor,
+ powerDriver = spec.powerDriver,
+ interferenceDomain = interferenceModel?.newDomain(),
+ optimize = optimize
+ )
+
+ hosts.add(host)
+ service.addHost(host)
+
+ return host
+ }
+
+ override fun close() {
+ service.close()
+
+ for (host in hosts) {
+ host.close()
+ }
+
+ hosts.clear()
+ }
+
+ /**
+ * Construct a [ComputeService] instance.
+ */
+ private fun createService(scheduler: ComputeScheduler): Pair<ComputeService, SdkMeterProvider> {
+ val resource = Resource.builder()
+ .put(ResourceAttributes.SERVICE_NAME, "opendc-compute")
+ .build()
+
+ val meterProvider = SdkMeterProvider.builder()
+ .setClock(clock.toOtelClock())
+ .setResource(resource)
+ .build()
+
+ val service = ComputeService(context, clock, meterProvider, scheduler)
+ return service to meterProvider
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt
new file mode 100644
index 00000000..2f4935ca
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt
@@ -0,0 +1,62 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+@file:JvmName("ComputeWorkloads")
+package org.opendc.compute.workload
+
+import org.opendc.compute.workload.internal.CompositeComputeWorkload
+import org.opendc.compute.workload.internal.HpcSampledComputeWorkload
+import org.opendc.compute.workload.internal.LoadSampledComputeWorkload
+import org.opendc.compute.workload.internal.TraceComputeWorkload
+
+/**
+ * Construct a workload from a trace.
+ */
+public fun trace(name: String, format: String = "opendc-vm"): ComputeWorkload = TraceComputeWorkload(name, format)
+
+/**
+ * Construct a composite workload with the specified fractions.
+ */
+public fun composite(vararg pairs: Pair<ComputeWorkload, Double>): ComputeWorkload {
+ return CompositeComputeWorkload(pairs.toMap())
+}
+
+/**
+ * Sample a workload by a [fraction] of the total load.
+ */
+public fun ComputeWorkload.sampleByLoad(fraction: Double): ComputeWorkload {
+ return LoadSampledComputeWorkload(this, fraction)
+}
+
+/**
+ * Sample a workload by a [fraction] of the HPC VMs (count)
+ */
+public fun ComputeWorkload.sampleByHpc(fraction: Double): ComputeWorkload {
+ return HpcSampledComputeWorkload(this, fraction)
+}
+
+/**
+ * Sample a workload by a [fraction] of the HPC load
+ */
+public fun ComputeWorkload.sampleByHpcLoad(fraction: Double): ComputeWorkload {
+ return HpcSampledComputeWorkload(this, fraction, sampleLoad = true)
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModel.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModel.kt
new file mode 100644
index 00000000..4d9ef15d
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModel.kt
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload
+
+import org.opendc.compute.service.ComputeService
+import org.opendc.compute.simulator.failure.HostFaultInjector
+import java.time.Clock
+import java.util.*
+import kotlin.coroutines.CoroutineContext
+
+/**
+ * Factory interface for constructing [HostFaultInjector] for modeling failures of compute service hosts.
+ */
+public interface FailureModel {
+ /**
+ * Construct a [HostFaultInjector] for the specified [service].
+ */
+ public fun createInjector(context: CoroutineContext, clock: Clock, service: ComputeService, random: Random): HostFaultInjector
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModels.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModels.kt
new file mode 100644
index 00000000..be7120b9
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModels.kt
@@ -0,0 +1,70 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+@file:JvmName("FailureModels")
+package org.opendc.compute.workload
+
+import org.apache.commons.math3.distribution.LogNormalDistribution
+import org.apache.commons.math3.random.Well19937c
+import org.opendc.compute.service.ComputeService
+import org.opendc.compute.simulator.SimHost
+import org.opendc.compute.simulator.failure.HostFaultInjector
+import org.opendc.compute.simulator.failure.StartStopHostFault
+import org.opendc.compute.simulator.failure.StochasticVictimSelector
+import java.time.Clock
+import java.time.Duration
+import java.util.*
+import kotlin.coroutines.CoroutineContext
+import kotlin.math.ln
+
+/**
+ * Obtain a [FailureModel] based on the GRID'5000 failure trace.
+ *
+ * This fault injector uses parameters from the GRID'5000 failure trace as described in
+ * "A Framework for the Study of Grid Inter-Operation Mechanisms", A. Iosup, 2009.
+ */
+public fun grid5000(failureInterval: Duration): FailureModel {
+ return object : FailureModel {
+ override fun createInjector(
+ context: CoroutineContext,
+ clock: Clock,
+ service: ComputeService,
+ random: Random
+ ): HostFaultInjector {
+ val rng = Well19937c(random.nextLong())
+ val hosts = service.hosts.map { it as SimHost }.toSet()
+
+ // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009
+ // GRID'5000
+ return HostFaultInjector(
+ context,
+ clock,
+ hosts,
+ iat = LogNormalDistribution(rng, ln(failureInterval.toHours().toDouble()), 1.03),
+ selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), random),
+ fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71))
+ )
+ }
+
+ override fun toString(): String = "Grid5000FailureModel"
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt
new file mode 100644
index 00000000..5dd239f6
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt
@@ -0,0 +1,49 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload
+
+import org.opendc.simulator.compute.workload.SimTrace
+import java.time.Instant
+import java.util.*
+
+/**
+ * A virtual machine workload.
+ *
+ * @param uid The unique identifier of the virtual machine.
+ * @param name The name of the virtual machine.
+ * @param cpuCount The number of vCPUs in the VM.
+ * @param memCapacity The provisioned memory for the VM.
+ * @param startTime The start time of the VM.
+ * @param stopTime The stop time of the VM.
+ * @param trace The trace that belong to this VM.
+ */
+public data class VirtualMachine(
+ val uid: UUID,
+ val name: String,
+ val cpuCount: Int,
+ val memCapacity: Long,
+ val totalLoad: Double,
+ val startTime: Instant,
+ val stopTime: Instant,
+ val trace: SimTrace,
+)
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt
new file mode 100644
index 00000000..ad182d67
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt
@@ -0,0 +1,71 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.export.parquet
+
+import io.opentelemetry.sdk.common.CompletableResultCode
+import org.opendc.telemetry.compute.ComputeMetricExporter
+import org.opendc.telemetry.compute.ComputeMonitor
+import org.opendc.telemetry.compute.table.HostData
+import org.opendc.telemetry.compute.table.ServerData
+import org.opendc.telemetry.compute.table.ServiceData
+import java.io.File
+
+/**
+ * A [ComputeMonitor] that logs the events to a Parquet file.
+ */
+public class ParquetComputeMetricExporter(base: File, partition: String, bufferSize: Int) : ComputeMetricExporter() {
+ private val serverWriter = ParquetServerDataWriter(
+ File(base, "server/$partition/data.parquet").also { it.parentFile.mkdirs() },
+ bufferSize
+ )
+
+ private val hostWriter = ParquetHostDataWriter(
+ File(base, "host/$partition/data.parquet").also { it.parentFile.mkdirs() },
+ bufferSize
+ )
+
+ private val serviceWriter = ParquetServiceDataWriter(
+ File(base, "service/$partition/data.parquet").also { it.parentFile.mkdirs() },
+ bufferSize
+ )
+
+ override fun record(data: ServerData) {
+ serverWriter.write(data)
+ }
+
+ override fun record(data: HostData) {
+ hostWriter.write(data)
+ }
+
+ override fun record(data: ServiceData) {
+ serviceWriter.write(data)
+ }
+
+ override fun shutdown(): CompletableResultCode {
+ hostWriter.close()
+ serviceWriter.close()
+ serverWriter.close()
+
+ return CompletableResultCode.ofSuccess()
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt
new file mode 100644
index 00000000..4172d729
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt
@@ -0,0 +1,145 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.export.parquet
+
+import mu.KotlinLogging
+import org.apache.avro.Schema
+import org.apache.avro.generic.GenericData
+import org.apache.avro.generic.GenericRecordBuilder
+import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.parquet.hadoop.ParquetFileWriter
+import org.apache.parquet.hadoop.ParquetWriter
+import org.apache.parquet.hadoop.metadata.CompressionCodecName
+import org.opendc.trace.util.parquet.LocalOutputFile
+import java.io.File
+import java.util.concurrent.ArrayBlockingQueue
+import java.util.concurrent.BlockingQueue
+import kotlin.concurrent.thread
+
+/**
+ * A writer that writes data in Parquet format.
+ */
+public abstract class ParquetDataWriter<in T>(
+ path: File,
+ private val schema: Schema,
+ bufferSize: Int = 4096
+) : AutoCloseable {
+ /**
+ * The logging instance to use.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
+ * The queue of commands to process.
+ */
+ private val queue: BlockingQueue<T> = ArrayBlockingQueue(bufferSize)
+
+ /**
+ * An exception to be propagated to the actual writer.
+ */
+ private var exception: Throwable? = null
+
+ /**
+ * The thread that is responsible for writing the Parquet records.
+ */
+ private val writerThread = thread(start = false, name = this.toString()) {
+ val writer = let {
+ val builder = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(path))
+ .withSchema(schema)
+ .withCompressionCodec(CompressionCodecName.ZSTD)
+ .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+ buildWriter(builder)
+ }
+
+ val queue = queue
+ val buf = mutableListOf<T>()
+ var shouldStop = false
+
+ try {
+ while (!shouldStop) {
+ try {
+ process(writer, queue.take())
+ } catch (e: InterruptedException) {
+ shouldStop = true
+ }
+
+ if (queue.drainTo(buf) > 0) {
+ for (data in buf) {
+ process(writer, data)
+ }
+ buf.clear()
+ }
+ }
+ } catch (e: Throwable) {
+ logger.error(e) { "Failure in Parquet data writer" }
+ exception = e
+ } finally {
+ writer.close()
+ }
+ }
+
+ /**
+ * Build the [ParquetWriter] used to write the Parquet files.
+ */
+ protected open fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> {
+ return builder.build()
+ }
+
+ /**
+ * Convert the specified [data] into a Parquet record.
+ */
+ protected abstract fun convert(builder: GenericRecordBuilder, data: T)
+
+ /**
+ * Write the specified metrics to the database.
+ */
+ public fun write(data: T) {
+ val exception = exception
+ if (exception != null) {
+ throw IllegalStateException("Writer thread failed", exception)
+ }
+
+ queue.put(data)
+ }
+
+ /**
+ * Signal the writer to stop.
+ */
+ override fun close() {
+ writerThread.interrupt()
+ writerThread.join()
+ }
+
+ init {
+ writerThread.start()
+ }
+
+ /**
+ * Process the specified [data] to be written to the Parquet file.
+ */
+ private fun process(writer: ParquetWriter<GenericData.Record>, data: T) {
+ val builder = GenericRecordBuilder(schema)
+ convert(builder, data)
+ writer.write(builder.build())
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt
new file mode 100644
index 00000000..98a0739e
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt
@@ -0,0 +1,102 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.export.parquet
+
+import org.apache.avro.Schema
+import org.apache.avro.SchemaBuilder
+import org.apache.avro.generic.GenericData
+import org.apache.avro.generic.GenericRecordBuilder
+import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.parquet.hadoop.ParquetWriter
+import org.opendc.telemetry.compute.table.HostData
+import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA
+import org.opendc.trace.util.parquet.UUID_SCHEMA
+import org.opendc.trace.util.parquet.optional
+import java.io.File
+
+/**
+ * A Parquet event writer for [HostData]s.
+ */
+public class ParquetHostDataWriter(path: File, bufferSize: Int) :
+ ParquetDataWriter<HostData>(path, SCHEMA, bufferSize) {
+
+ override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> {
+ return builder
+ .withDictionaryEncoding("host_id", true)
+ .build()
+ }
+
+ override fun convert(builder: GenericRecordBuilder, data: HostData) {
+ builder["timestamp"] = data.timestamp.toEpochMilli()
+
+ builder["host_id"] = data.host.id
+
+ builder["uptime"] = data.uptime
+ builder["downtime"] = data.downtime
+ val bootTime = data.bootTime
+ builder["boot_time"] = bootTime?.toEpochMilli()
+
+ builder["cpu_count"] = data.host.cpuCount
+ builder["cpu_limit"] = data.cpuLimit
+ builder["cpu_time_active"] = data.cpuActiveTime
+ builder["cpu_time_idle"] = data.cpuIdleTime
+ builder["cpu_time_steal"] = data.cpuStealTime
+ builder["cpu_time_lost"] = data.cpuLostTime
+
+ builder["mem_limit"] = data.host.memCapacity
+
+ builder["power_total"] = data.powerTotal
+
+ builder["guests_terminated"] = data.guestsTerminated
+ builder["guests_running"] = data.guestsRunning
+ builder["guests_error"] = data.guestsError
+ builder["guests_invalid"] = data.guestsInvalid
+ }
+
+ override fun toString(): String = "host-writer"
+
+ private companion object {
+ private val SCHEMA: Schema = SchemaBuilder
+ .record("host")
+ .namespace("org.opendc.telemetry.compute")
+ .fields()
+ .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault()
+ .name("host_id").type(UUID_SCHEMA).noDefault()
+ .requiredLong("uptime")
+ .requiredLong("downtime")
+ .name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault()
+ .requiredInt("cpu_count")
+ .requiredDouble("cpu_limit")
+ .requiredLong("cpu_time_active")
+ .requiredLong("cpu_time_idle")
+ .requiredLong("cpu_time_steal")
+ .requiredLong("cpu_time_lost")
+ .requiredLong("mem_limit")
+ .requiredDouble("power_total")
+ .requiredInt("guests_terminated")
+ .requiredInt("guests_running")
+ .requiredInt("guests_error")
+ .requiredInt("guests_invalid")
+ .endRecord()
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt
new file mode 100644
index 00000000..0d11ec23
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt
@@ -0,0 +1,95 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.export.parquet
+
+import org.apache.avro.Schema
+import org.apache.avro.SchemaBuilder
+import org.apache.avro.generic.GenericData
+import org.apache.avro.generic.GenericRecordBuilder
+import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.parquet.hadoop.ParquetWriter
+import org.opendc.telemetry.compute.table.ServerData
+import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA
+import org.opendc.trace.util.parquet.UUID_SCHEMA
+import org.opendc.trace.util.parquet.optional
+import java.io.File
+
+/**
+ * A Parquet event writer for [ServerData]s.
+ */
+public class ParquetServerDataWriter(path: File, bufferSize: Int) :
+ ParquetDataWriter<ServerData>(path, SCHEMA, bufferSize) {
+
+ override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> {
+ return builder
+ .withDictionaryEncoding("server_id", true)
+ .withDictionaryEncoding("host_id", true)
+ .build()
+ }
+
+ override fun convert(builder: GenericRecordBuilder, data: ServerData) {
+ builder["timestamp"] = data.timestamp.toEpochMilli()
+
+ builder["server_id"] = data.server.id
+ builder["host_id"] = data.host?.id
+
+ builder["uptime"] = data.uptime
+ builder["downtime"] = data.downtime
+ val bootTime = data.bootTime
+ builder["boot_time"] = bootTime?.toEpochMilli()
+ builder["scheduling_latency"] = data.schedulingLatency
+
+ builder["cpu_count"] = data.server.cpuCount
+ builder["cpu_limit"] = data.cpuLimit
+ builder["cpu_time_active"] = data.cpuActiveTime
+ builder["cpu_time_idle"] = data.cpuIdleTime
+ builder["cpu_time_steal"] = data.cpuStealTime
+ builder["cpu_time_lost"] = data.cpuLostTime
+
+ builder["mem_limit"] = data.server.memCapacity
+ }
+
+ override fun toString(): String = "server-writer"
+
+ private companion object {
+ private val SCHEMA: Schema = SchemaBuilder
+ .record("server")
+ .namespace("org.opendc.telemetry.compute")
+ .fields()
+ .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault()
+ .name("server_id").type(UUID_SCHEMA).noDefault()
+ .name("host_id").type(UUID_SCHEMA.optional()).noDefault()
+ .requiredLong("uptime")
+ .requiredLong("downtime")
+ .name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault()
+ .requiredLong("scheduling_latency")
+ .requiredInt("cpu_count")
+ .requiredDouble("cpu_limit")
+ .requiredLong("cpu_time_active")
+ .requiredLong("cpu_time_idle")
+ .requiredLong("cpu_time_steal")
+ .requiredLong("cpu_time_lost")
+ .requiredLong("mem_limit")
+ .endRecord()
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt
new file mode 100644
index 00000000..47824b29
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt
@@ -0,0 +1,66 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.export.parquet
+
+import org.apache.avro.Schema
+import org.apache.avro.SchemaBuilder
+import org.apache.avro.generic.GenericRecordBuilder
+import org.opendc.telemetry.compute.table.ServiceData
+import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA
+import java.io.File
+
+/**
+ * A Parquet event writer for [ServiceData]s.
+ */
+public class ParquetServiceDataWriter(path: File, bufferSize: Int) :
+ ParquetDataWriter<ServiceData>(path, SCHEMA, bufferSize) {
+
+ override fun convert(builder: GenericRecordBuilder, data: ServiceData) {
+ builder["timestamp"] = data.timestamp.toEpochMilli()
+ builder["hosts_up"] = data.hostsUp
+ builder["hosts_down"] = data.hostsDown
+ builder["servers_pending"] = data.serversPending
+ builder["servers_active"] = data.serversActive
+ builder["attempts_success"] = data.attemptsSuccess
+ builder["attempts_failure"] = data.attemptsFailure
+ builder["attempts_error"] = data.attemptsError
+ }
+
+ override fun toString(): String = "service-writer"
+
+ private companion object {
+ private val SCHEMA: Schema = SchemaBuilder
+ .record("service")
+ .namespace("org.opendc.telemetry.compute")
+ .fields()
+ .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault()
+ .requiredInt("hosts_up")
+ .requiredInt("hosts_down")
+ .requiredInt("servers_pending")
+ .requiredInt("servers_active")
+ .requiredInt("attempts_success")
+ .requiredInt("attempts_failure")
+ .requiredInt("attempts_error")
+ .endRecord()
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt
new file mode 100644
index 00000000..9b2bec55
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt
@@ -0,0 +1,66 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.internal
+
+import mu.KotlinLogging
+import org.opendc.compute.workload.ComputeWorkload
+import org.opendc.compute.workload.ComputeWorkloadLoader
+import org.opendc.compute.workload.VirtualMachine
+import java.util.*
+
+/**
+ * A [ComputeWorkload] that samples multiple workloads based on the total load of all workloads.
+ */
+internal class CompositeComputeWorkload(val sources: Map<ComputeWorkload, Double>) : ComputeWorkload {
+ /**
+ * The logging instance of this class.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> {
+ val traces = sources.map { (source, fraction) -> fraction to source.resolve(loader, random) }
+
+ val totalLoad = traces.sumOf { (_, vms) -> vms.sumOf { it.totalLoad } }
+
+ val res = mutableListOf<VirtualMachine>()
+
+ for ((fraction, vms) in traces) {
+ var currentLoad = 0.0
+
+ for (entry in vms) {
+ val entryLoad = entry.totalLoad
+ if ((currentLoad + entryLoad) / totalLoad > fraction) {
+ break
+ }
+
+ currentLoad += entryLoad
+ res += entry
+ }
+ }
+
+ val vmCount = traces.sumOf { (_, vms) -> vms.size }
+ logger.info { "Sampled $vmCount VMs into subset of ${res.size} VMs" }
+
+ return res
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt
new file mode 100644
index 00000000..52f4c672
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt
@@ -0,0 +1,143 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.internal
+
+import mu.KotlinLogging
+import org.opendc.compute.workload.ComputeWorkload
+import org.opendc.compute.workload.ComputeWorkloadLoader
+import org.opendc.compute.workload.VirtualMachine
+import java.util.*
+
+/**
+ * A [ComputeWorkload] that samples HPC VMs in the workload.
+ *
+ * @param fraction The fraction of load/virtual machines to sample
+ * @param sampleLoad A flag to indicate that the sampling should be based on the total load or on the number of VMs.
+ */
+internal class HpcSampledComputeWorkload(val source: ComputeWorkload, val fraction: Double, val sampleLoad: Boolean = false) : ComputeWorkload {
+ /**
+ * The logging instance of this class.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
+ * The pattern to match compute nodes in the workload.
+ */
+ private val pattern = Regex("^(ComputeNode|cn).*")
+
+ override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> {
+ val vms = source.resolve(loader, random)
+
+ val (hpc, nonHpc) = vms.partition { entry ->
+ val name = entry.name
+ name.matches(pattern)
+ }
+
+ val hpcSequence = generateSequence(0) { it + 1 }
+ .map { index ->
+ val res = mutableListOf<VirtualMachine>()
+ hpc.mapTo(res) { sample(it, index) }
+ res.shuffle(random)
+ res
+ }
+ .flatten()
+
+ val nonHpcSequence = generateSequence(0) { it + 1 }
+ .map { index ->
+ val res = mutableListOf<VirtualMachine>()
+ nonHpc.mapTo(res) { sample(it, index) }
+ res.shuffle(random)
+ res
+ }
+ .flatten()
+
+ logger.debug { "Found ${hpc.size} HPC workloads and ${nonHpc.size} non-HPC workloads" }
+
+ val totalLoad = vms.sumOf { it.totalLoad }
+
+ logger.debug { "Total trace load: $totalLoad" }
+ var hpcCount = 0
+ var hpcLoad = 0.0
+ var nonHpcCount = 0
+ var nonHpcLoad = 0.0
+
+ val res = mutableListOf<VirtualMachine>()
+
+ if (sampleLoad) {
+ var currentLoad = 0.0
+ for (entry in hpcSequence) {
+ val entryLoad = entry.totalLoad
+ if ((currentLoad + entryLoad) / totalLoad > fraction) {
+ break
+ }
+
+ hpcLoad += entryLoad
+ hpcCount += 1
+ currentLoad += entryLoad
+ res += entry
+ }
+
+ for (entry in nonHpcSequence) {
+ val entryLoad = entry.totalLoad
+ if ((currentLoad + entryLoad) / totalLoad > 1) {
+ break
+ }
+
+ nonHpcLoad += entryLoad
+ nonHpcCount += 1
+ currentLoad += entryLoad
+ res += entry
+ }
+ } else {
+ hpcSequence
+ .take((fraction * vms.size).toInt())
+ .forEach { entry ->
+ hpcLoad += entry.totalLoad
+ hpcCount += 1
+ res.add(entry)
+ }
+
+ nonHpcSequence
+ .take(((1 - fraction) * vms.size).toInt())
+ .forEach { entry ->
+ nonHpcLoad += entry.totalLoad
+ nonHpcCount += 1
+ res.add(entry)
+ }
+ }
+
+ logger.debug { "HPC $hpcCount (load $hpcLoad) and non-HPC $nonHpcCount (load $nonHpcLoad)" }
+ logger.debug { "Total sampled load: ${hpcLoad + nonHpcLoad}" }
+ logger.info { "Sampled ${vms.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }
+
+ return res
+ }
+
+ /**
+ * Sample a random trace entry.
+ */
+ private fun sample(entry: VirtualMachine, i: Int): VirtualMachine {
+ val uid = UUID.nameUUIDFromBytes("${entry.uid}-$i".toByteArray())
+ return entry.copy(uid = uid)
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt
new file mode 100644
index 00000000..ef6de729
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt
@@ -0,0 +1,61 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.internal
+
+import mu.KotlinLogging
+import org.opendc.compute.workload.ComputeWorkload
+import org.opendc.compute.workload.ComputeWorkloadLoader
+import org.opendc.compute.workload.VirtualMachine
+import java.util.*
+
+/**
+ * A [ComputeWorkload] that is sampled based on total load.
+ */
+internal class LoadSampledComputeWorkload(val source: ComputeWorkload, val fraction: Double) : ComputeWorkload {
+ /**
+ * The logging instance of this class.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> {
+ val vms = source.resolve(loader, random)
+ val res = mutableListOf<VirtualMachine>()
+
+ val totalLoad = vms.sumOf { it.totalLoad }
+ var currentLoad = 0.0
+
+ for (entry in vms) {
+ val entryLoad = entry.totalLoad
+ if ((currentLoad + entryLoad) / totalLoad > fraction) {
+ break
+ }
+
+ currentLoad += entryLoad
+ res += entry
+ }
+
+ logger.info { "Sampled ${vms.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }
+
+ return res
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt
new file mode 100644
index 00000000..c20cb8f3
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.internal
+
+import org.opendc.compute.workload.ComputeWorkload
+import org.opendc.compute.workload.ComputeWorkloadLoader
+import org.opendc.compute.workload.VirtualMachine
+import java.util.*
+
+/**
+ * A [ComputeWorkload] from a trace.
+ */
+internal class TraceComputeWorkload(val name: String, val format: String) : ComputeWorkload {
+ override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> {
+ return loader.get(name, format)
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/HostSpec.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/HostSpec.kt
new file mode 100644
index 00000000..f3dc1e9e
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/HostSpec.kt
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.topology
+
+import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider
+import org.opendc.simulator.compute.kernel.SimHypervisorProvider
+import org.opendc.simulator.compute.model.MachineModel
+import org.opendc.simulator.compute.power.PowerDriver
+import java.util.*
+
+/**
+ * Description of a physical host that will be simulated by OpenDC and host the virtual machines.
+ *
+ * @param uid Unique identifier of the host.
+ * @param name The name of the host.
+ * @param meta The metadata of the host.
+ * @param model The physical model of the machine.
+ * @param powerDriver The [PowerDriver] to model the power consumption of the machine.
+ * @param hypervisor The hypervisor implementation to use.
+ */
+public data class HostSpec(
+ val uid: UUID,
+ val name: String,
+ val meta: Map<String, Any>,
+ val model: MachineModel,
+ val powerDriver: PowerDriver,
+ val hypervisor: SimHypervisorProvider = SimFairShareHypervisorProvider()
+)
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/Topology.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/Topology.kt
new file mode 100644
index 00000000..3b8dc918
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/Topology.kt
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.topology
+
+/**
+ * Representation of the environment of the compute service, describing the physical details of every host.
+ */
+public interface Topology {
+ /**
+ * Resolve the [Topology] into a list of [HostSpec]s.
+ */
+ public fun resolve(): List<HostSpec>
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/TopologyHelpers.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/TopologyHelpers.kt
new file mode 100644
index 00000000..74f9a1f8
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/TopologyHelpers.kt
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+@file:JvmName("TopologyHelpers")
+package org.opendc.compute.workload.topology
+
+import org.opendc.compute.workload.ComputeWorkloadRunner
+
+/**
+ * Apply the specified [topology] to the given [ComputeWorkloadRunner].
+ */
+public fun ComputeWorkloadRunner.apply(topology: Topology, optimize: Boolean = false) {
+ val hosts = topology.resolve()
+ for (spec in hosts) {
+ registerHost(spec, optimize)
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReader.kt
new file mode 100644
index 00000000..67f9626c
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReader.kt
@@ -0,0 +1,68 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.util
+
+import com.fasterxml.jackson.annotation.JsonProperty
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
+import com.fasterxml.jackson.module.kotlin.readValue
+import org.opendc.simulator.compute.kernel.interference.VmInterferenceGroup
+import java.io.File
+import java.io.InputStream
+
+/**
+ * A parser for the JSON performance interference setup files used for the TPDS article on Capelin.
+ */
+public class PerformanceInterferenceReader {
+ /**
+ * The [ObjectMapper] to use.
+ */
+ private val mapper = jacksonObjectMapper()
+
+ init {
+ mapper.addMixIn(VmInterferenceGroup::class.java, GroupMixin::class.java)
+ }
+
+ /**
+ * Read the performance interface model from [file].
+ */
+ public fun read(file: File): List<VmInterferenceGroup> {
+ return mapper.readValue(file)
+ }
+
+ /**
+ * Read the performance interface model from the input.
+ */
+ public fun read(input: InputStream): List<VmInterferenceGroup> {
+ return mapper.readValue(input)
+ }
+
+ private data class GroupMixin(
+ @JsonProperty("minServerLoad")
+ val targetLoad: Double,
+ @JsonProperty("performanceScore")
+ val score: Double,
+ @JsonProperty("vms")
+ val members: Set<String>,
+ )
+}
diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReaderTest.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReaderTest.kt
new file mode 100644
index 00000000..c79f0584
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReaderTest.kt
@@ -0,0 +1,45 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.util
+
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertAll
+
+/**
+ * Test suite for the [PerformanceInterferenceReader] class.
+ */
+class PerformanceInterferenceReaderTest {
+ @Test
+ fun testSmoke() {
+ val input = checkNotNull(PerformanceInterferenceReader::class.java.getResourceAsStream("/perf-interference.json"))
+ val result = PerformanceInterferenceReader().read(input)
+
+ assertAll(
+ { assertEquals(2, result.size) },
+ { assertEquals(setOf("vm_a", "vm_c", "vm_x", "vm_y"), result[0].members) },
+ { assertEquals(0.0, result[0].targetLoad, 0.001) },
+ { assertEquals(0.8830158730158756, result[0].score, 0.001) }
+ )
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/test/resources/perf-interference.json b/opendc-compute/opendc-compute-workload/src/test/resources/perf-interference.json
new file mode 100644
index 00000000..1be5852b
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/test/resources/perf-interference.json
@@ -0,0 +1,22 @@
+[
+ {
+ "vms": [
+ "vm_a",
+ "vm_c",
+ "vm_x",
+ "vm_y"
+ ],
+ "minServerLoad": 0.0,
+ "performanceScore": 0.8830158730158756
+ },
+ {
+ "vms": [
+ "vm_a",
+ "vm_b",
+ "vm_c",
+ "vm_d"
+ ],
+ "minServerLoad": 0.0,
+ "performanceScore": 0.7133055555552751
+ }
+]