summaryrefslogtreecommitdiff
path: root/opendc-compute/opendc-compute-workload
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-compute/opendc-compute-workload')
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt70
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt8
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt29
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt3
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt12
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt6
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt6
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt3
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMetricReader.kt9
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/HostSpec.kt7
10 files changed, 83 insertions, 70 deletions
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt
index fddb4890..f6744123 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt
@@ -26,12 +26,12 @@ import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.yield
-import org.opendc.compute.api.Server
import org.opendc.compute.service.ComputeService
import org.opendc.compute.service.scheduler.ComputeScheduler
import org.opendc.compute.simulator.SimHost
import org.opendc.compute.workload.topology.HostSpec
-import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
+import org.opendc.simulator.compute.SimBareMetalMachine
+import org.opendc.simulator.compute.kernel.SimHypervisor
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.flow.FlowEngine
import java.time.Clock
@@ -46,55 +46,52 @@ import kotlin.math.max
* @param context [CoroutineContext] to run the simulation in.
* @param clock [Clock] instance tracking simulation time.
* @param scheduler [ComputeScheduler] implementation to use for the service.
- * @param failureModel A failure model to use for injecting failures.
- * @param interferenceModel The model to use for performance interference.
* @param schedulingQuantum The scheduling quantum of the scheduler.
*/
public class ComputeServiceHelper(
private val context: CoroutineContext,
private val clock: Clock,
scheduler: ComputeScheduler,
- private val failureModel: FailureModel? = null,
- private val interferenceModel: VmInterferenceModel? = null,
+ seed: Long,
schedulingQuantum: Duration = Duration.ofMinutes(5)
) : AutoCloseable {
/**
* The [ComputeService] that has been configured by the manager.
*/
- public val service: ComputeService
+ public val service: ComputeService = ComputeService(context, clock, scheduler, schedulingQuantum)
/**
* The [FlowEngine] to simulate the hosts.
*/
- private val _engine = FlowEngine(context, clock)
+ private val engine = FlowEngine(context, clock)
/**
* The hosts that belong to this class.
*/
- private val _hosts = mutableSetOf<SimHost>()
+ private val hosts = mutableSetOf<SimHost>()
- init {
- val service = createService(scheduler, schedulingQuantum)
- this.service = service
- }
+ /**
+ * The source of randomness.
+ */
+ private val random = SplittableRandom(seed)
/**
* Run a simulation of the [ComputeService] by replaying the workload trace given by [trace].
*
* @param trace The trace to simulate.
- * @param seed The seed for the simulation.
- * @param servers A list to which the created servers is added.
* @param submitImmediately A flag to indicate that the servers are scheduled immediately (so not at their start time).
+ * @param failureModel A failure model to use for injecting failures.
+ * @param interference A flag to indicate that VM interference needs to be enabled.
*/
public suspend fun run(
trace: List<VirtualMachine>,
- seed: Long,
- servers: MutableList<Server>? = null,
- submitImmediately: Boolean = false
+ submitImmediately: Boolean = false,
+ failureModel: FailureModel? = null,
+ interference: Boolean = false,
) {
- val random = Random(seed)
- val injector = failureModel?.createInjector(context, clock, service, random)
+ val injector = failureModel?.createInjector(context, clock, service, Random(random.nextLong()))
val client = service.newClient()
+ val clock = clock
// Create new image for the virtual machine
val image = client.newImage("vm-image")
@@ -121,10 +118,16 @@ public class ComputeServiceHelper(
delay(max(0, (start - offset) - now))
}
- launch {
- val workloadOffset = -offset + 300001
- val workload = SimTraceWorkload(entry.trace, workloadOffset)
+ val workloadOffset = -offset + 300001
+ val workload = SimTraceWorkload(entry.trace, workloadOffset)
+ val meta = mutableMapOf<String, Any>("workload" to workload)
+ val interferenceProfile = entry.interferenceProfile
+ if (interference && interferenceProfile != null) {
+ meta["interference-profile"] = interferenceProfile
+ }
+
+ launch {
val server = client.newServer(
entry.name,
image,
@@ -134,11 +137,9 @@ public class ComputeServiceHelper(
entry.memCapacity,
meta = if (entry.cpuCapacity > 0.0) mapOf("cpu-capacity" to entry.cpuCapacity) else emptyMap()
),
- meta = mapOf("workload" to workload)
+ meta = meta
)
- servers?.add(server)
-
// Wait for the server reach its end time
val endTime = entry.stopTime.toEpochMilli()
delay(endTime + workloadOffset - clock.millis() + 5 * 60 * 1000)
@@ -164,20 +165,21 @@ public class ComputeServiceHelper(
* @return The [SimHost] that has been constructed by the runner.
*/
public fun registerHost(spec: HostSpec, optimize: Boolean = false): SimHost {
+ val machine = SimBareMetalMachine(engine, spec.model, spec.powerDriver)
+ val hypervisor = SimHypervisor(engine, spec.multiplexerFactory, random)
+
val host = SimHost(
spec.uid,
spec.name,
- spec.model,
spec.meta,
context,
- _engine,
- spec.hypervisor,
- powerDriver = spec.powerDriver,
- interferenceDomain = interferenceModel?.newDomain(),
+ clock,
+ machine,
+ hypervisor,
optimize = optimize
)
- require(_hosts.add(host)) { "Host with uid ${spec.uid} already exists" }
+ require(hosts.add(host)) { "Host with uid ${spec.uid} already exists" }
service.addHost(host)
return host
@@ -186,11 +188,11 @@ public class ComputeServiceHelper(
override fun close() {
service.close()
- for (host in _hosts) {
+ for (host in hosts) {
host.close()
}
- _hosts.clear()
+ hosts.clear()
}
/**
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt
index aa0b5eaf..78002c2f 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt
@@ -22,7 +22,6 @@
package org.opendc.compute.workload
-import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import java.util.*
/**
@@ -32,10 +31,5 @@ public interface ComputeWorkload {
/**
* Resolve the workload into a list of [VirtualMachine]s to simulate.
*/
- public fun resolve(loader: ComputeWorkloadLoader, random: Random): Resolved
-
- /**
- * A concrete instance of a workload.
- */
- public data class Resolved(val vms: List<VirtualMachine>, val interferenceModel: VmInterferenceModel?)
+ public fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine>
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
index 12c2325a..387a3ec2 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
@@ -48,7 +48,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
/**
* The cache of workloads.
*/
- private val cache = ConcurrentHashMap<String, SoftReference<ComputeWorkload.Resolved>>()
+ private val cache = ConcurrentHashMap<String, SoftReference<List<VirtualMachine>>>()
/**
* Read the fragments into memory.
@@ -87,7 +87,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
/**
* Read the metadata into a workload.
*/
- private fun parseMeta(trace: Trace, fragments: Map<String, Builder>): List<VirtualMachine> {
+ private fun parseMeta(trace: Trace, fragments: Map<String, Builder>, interferenceModel: VmInterferenceModel): List<VirtualMachine> {
val reader = checkNotNull(trace.getTable(TABLE_RESOURCES)).newReader()
val idCol = reader.resolve(RESOURCE_ID)
@@ -128,7 +128,8 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
totalLoad,
submissionTime,
endTime,
- builder.build()
+ builder.build(),
+ interferenceModel.getProfile(id)
)
)
}
@@ -159,7 +160,6 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
val modelBuilder = VmInterferenceModel.builder()
while (reader.nextRow()) {
- @Suppress("UNCHECKED_CAST")
val members = reader.getSet(membersCol, String::class.java)!!
val target = reader.getDouble(targetCol)
val score = reader.getDouble(scoreCol)
@@ -177,7 +177,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
/**
* Load the trace with the specified [name] and [format].
*/
- public fun get(name: String, format: String): ComputeWorkload.Resolved {
+ public fun get(name: String, format: String): List<VirtualMachine> {
val ref = cache.compute(name) { key, oldVal ->
val inst = oldVal?.get()
if (inst == null) {
@@ -188,11 +188,10 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
val trace = Trace.open(path, format)
val fragments = parseFragments(trace)
- val vms = parseMeta(trace, fragments)
val interferenceModel = parseInterferenceModel(trace)
- val instance = ComputeWorkload.Resolved(vms, interferenceModel)
+ val vms = parseMeta(trace, fragments, interferenceModel)
- SoftReference(instance)
+ SoftReference(vms)
} else {
oldVal
}
@@ -223,6 +222,11 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
private val builder = SimTrace.builder()
/**
+ * The deadline of the previous fragment.
+ */
+ private var previousDeadline = Long.MIN_VALUE
+
+ /**
* Add a fragment to the trace.
*
* @param timestamp Timestamp at which the fragment starts (in epoch millis).
@@ -233,7 +237,14 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
fun add(timestamp: Long, deadline: Long, usage: Double, cores: Int) {
val duration = max(0, deadline - timestamp)
totalLoad += (usage * duration) / 1000.0 // avg MHz * duration = MFLOPs
- builder.add(timestamp, deadline, usage, cores)
+
+ if (timestamp != previousDeadline) {
+ // There is a gap between the previous and current fragment; fill the gap
+ builder.add(timestamp, 0.0, cores)
+ }
+
+ builder.add(deadline, usage, cores)
+ previousDeadline = deadline
}
/**
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt
index 88e80719..8560b537 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt
@@ -22,6 +22,7 @@
package org.opendc.compute.workload
+import org.opendc.simulator.compute.kernel.interference.VmInterferenceProfile
import org.opendc.simulator.compute.workload.SimTrace
import java.time.Instant
import java.util.*
@@ -37,6 +38,7 @@ import java.util.*
* @param startTime The start time of the VM.
* @param stopTime The stop time of the VM.
* @param trace The trace that belong to this VM.
+ * @param interferenceProfile The interference profile of this virtual machine.
*/
public data class VirtualMachine(
val uid: UUID,
@@ -48,4 +50,5 @@ public data class VirtualMachine(
val startTime: Instant,
val stopTime: Instant,
val trace: SimTrace,
+ val interferenceProfile: VmInterferenceProfile?
)
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt
index 1959c48d..9b2bec55 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt
@@ -37,17 +37,17 @@ internal class CompositeComputeWorkload(val sources: Map<ComputeWorkload, Double
*/
private val logger = KotlinLogging.logger {}
- override fun resolve(loader: ComputeWorkloadLoader, random: Random): ComputeWorkload.Resolved {
+ override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> {
val traces = sources.map { (source, fraction) -> fraction to source.resolve(loader, random) }
- val totalLoad = traces.sumOf { (_, w) -> w.vms.sumOf { it.totalLoad } }
+ val totalLoad = traces.sumOf { (_, vms) -> vms.sumOf { it.totalLoad } }
val res = mutableListOf<VirtualMachine>()
- for ((fraction, w) in traces) {
+ for ((fraction, vms) in traces) {
var currentLoad = 0.0
- for (entry in w.vms) {
+ for (entry in vms) {
val entryLoad = entry.totalLoad
if ((currentLoad + entryLoad) / totalLoad > fraction) {
break
@@ -58,9 +58,9 @@ internal class CompositeComputeWorkload(val sources: Map<ComputeWorkload, Double
}
}
- val vmCount = traces.sumOf { (_, w) -> w.vms.size }
+ val vmCount = traces.sumOf { (_, vms) -> vms.size }
logger.info { "Sampled $vmCount VMs into subset of ${res.size} VMs" }
- return ComputeWorkload.Resolved(res, null)
+ return res
}
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt
index 84a77f0f..52f4c672 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt
@@ -45,8 +45,8 @@ internal class HpcSampledComputeWorkload(val source: ComputeWorkload, val fracti
*/
private val pattern = Regex("^(ComputeNode|cn).*")
- override fun resolve(loader: ComputeWorkloadLoader, random: Random): ComputeWorkload.Resolved {
- val (vms, interferenceModel) = source.resolve(loader, random)
+ override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> {
+ val vms = source.resolve(loader, random)
val (hpc, nonHpc) = vms.partition { entry ->
val name = entry.name
@@ -130,7 +130,7 @@ internal class HpcSampledComputeWorkload(val source: ComputeWorkload, val fracti
logger.debug { "Total sampled load: ${hpcLoad + nonHpcLoad}" }
logger.info { "Sampled ${vms.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }
- return ComputeWorkload.Resolved(res, interferenceModel)
+ return res
}
/**
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt
index bc13560c..ef6de729 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt
@@ -37,8 +37,8 @@ internal class LoadSampledComputeWorkload(val source: ComputeWorkload, val fract
*/
private val logger = KotlinLogging.logger {}
- override fun resolve(loader: ComputeWorkloadLoader, random: Random): ComputeWorkload.Resolved {
- val (vms, interferenceModel) = source.resolve(loader, random)
+ override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> {
+ val vms = source.resolve(loader, random)
val res = mutableListOf<VirtualMachine>()
val totalLoad = vms.sumOf { it.totalLoad }
@@ -56,6 +56,6 @@ internal class LoadSampledComputeWorkload(val source: ComputeWorkload, val fract
logger.info { "Sampled ${vms.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }
- return ComputeWorkload.Resolved(res, interferenceModel)
+ return res
}
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt
index dc9abaef..c20cb8f3 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt
@@ -24,13 +24,14 @@ package org.opendc.compute.workload.internal
import org.opendc.compute.workload.ComputeWorkload
import org.opendc.compute.workload.ComputeWorkloadLoader
+import org.opendc.compute.workload.VirtualMachine
import java.util.*
/**
* A [ComputeWorkload] from a trace.
*/
internal class TraceComputeWorkload(val name: String, val format: String) : ComputeWorkload {
- override fun resolve(loader: ComputeWorkloadLoader, random: Random): ComputeWorkload.Resolved {
+ override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> {
return loader.get(name, format)
}
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMetricReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMetricReader.kt
index 45bd9ab1..a0ec4bd6 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMetricReader.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMetricReader.kt
@@ -42,7 +42,6 @@ import java.time.Instant
* @param scope The [CoroutineScope] to run the reader in.
* @param clock The virtual clock.
* @param service The [ComputeService] to monitor.
- * @param servers The [Server]s to monitor.
* @param monitor The monitor to export the metrics to.
* @param exportInterval The export interval.
*/
@@ -50,7 +49,6 @@ public class ComputeMetricReader(
scope: CoroutineScope,
clock: Clock,
private val service: ComputeService,
- private val servers: List<Server>,
private val monitor: ComputeMonitor,
private val exportInterval: Duration = Duration.ofMinutes(5)
) : AutoCloseable {
@@ -76,6 +74,11 @@ public class ComputeMetricReader(
*/
private val job = scope.launch {
val intervalMs = exportInterval.toMillis()
+ val service = service
+ val monitor = monitor
+ val hostTableReaders = hostTableReaders
+ val serverTableReaders = serverTableReaders
+ val serviceTableReader = serviceTableReader
try {
while (isActive) {
@@ -91,7 +94,7 @@ public class ComputeMetricReader(
reader.reset()
}
- for (server in servers) {
+ for (server in service.servers) {
val reader = serverTableReaders.computeIfAbsent(server) { ServerTableReaderImpl(service, it) }
reader.record(now)
monitor.record(reader)
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/HostSpec.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/HostSpec.kt
index f3dc1e9e..87530f5a 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/HostSpec.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/HostSpec.kt
@@ -22,10 +22,9 @@
package org.opendc.compute.workload.topology
-import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider
-import org.opendc.simulator.compute.kernel.SimHypervisorProvider
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.power.PowerDriver
+import org.opendc.simulator.flow.mux.FlowMultiplexerFactory
import java.util.*
/**
@@ -36,7 +35,7 @@ import java.util.*
* @param meta The metadata of the host.
* @param model The physical model of the machine.
* @param powerDriver The [PowerDriver] to model the power consumption of the machine.
- * @param hypervisor The hypervisor implementation to use.
+ * @param multiplexerFactory The [FlowMultiplexerFactory] that is used to multiplex the virtual machines over the host.
*/
public data class HostSpec(
val uid: UUID,
@@ -44,5 +43,5 @@ public data class HostSpec(
val meta: Map<String, Any>,
val model: MachineModel,
val powerDriver: PowerDriver,
- val hypervisor: SimHypervisorProvider = SimFairShareHypervisorProvider()
+ val multiplexerFactory: FlowMultiplexerFactory = FlowMultiplexerFactory.maxMinMultiplexer()
)