summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-06-24 14:29:29 +0200
committerGitHub <noreply@github.com>2021-06-24 14:29:29 +0200
commit36cb3c0cf642990a7b087a56d627a0de4fe2e71f (patch)
tree67c09fa437bc9b1f37f23b80b970b6aa686ad818
parenta29a61334adb8432c69800b19508eca4eff4bfd1 (diff)
parente56967a29ac2b2d26cc085b1f3e27096dad6a170 (diff)
simulator: Support perf interference in uniform resource model
This pull request re-implements the performance interference model to integrate with the uniform resource model in OpenDC. This forms the basis for other forms of resource interference (e.g., network or disk). * Add interface for resource interference in uniform resource model (`opendc-simulator-resources`) * Remove dependency on performance interference model from trace readers * Re-implement the performance interference model on top of the interface in the uniform resource model. **Breaking API Changes** * The original performance interference model classes are removed * The SC20 trace and environment related readers have moved to the Capelin experiments module. * Changes to the interfaces in `opendc-format`. Implements #103
-rw-r--r--buildSrc/src/main/kotlin/kotlin-conventions.gradle.kts1
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt43
-rw-r--r--opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt19
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt56
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt45
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt (renamed from opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt)24
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt)28
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReader.kt (renamed from opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt)52
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt)16
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt)28
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt)33
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt (renamed from opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20VmPlacementReader.kt)25
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt15
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt47
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/resources/perf-interference.json22
-rw-r--r--opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt5
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Model.kt67
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt97
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt4
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt15
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/PerformanceInterferenceEntry.kt7
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt181
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/interference/PerformanceInterferenceModel.kt134
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt30
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt12
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt12
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt10
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt4
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt2
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt4
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt (renamed from opendc-format/src/main/kotlin/org/opendc/format/trace/PerformanceInterferenceModelReader.kt)24
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceGroup.kt44
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt170
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt12
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt61
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt4
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt6
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt42
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt17
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt6
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt6
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt16
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt3
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/interference/InterferenceDomain.kt19
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/interference/InterferenceKey.kt (renamed from opendc-format/src/main/kotlin/org/opendc/format/trace/VmPlacementReader.kt)17
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/runner/web/Main.kt35
-rw-r--r--opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt6
47 files changed, 671 insertions, 855 deletions
diff --git a/buildSrc/src/main/kotlin/kotlin-conventions.gradle.kts b/buildSrc/src/main/kotlin/kotlin-conventions.gradle.kts
index 7fda64a2..6e4cab89 100644
--- a/buildSrc/src/main/kotlin/kotlin-conventions.gradle.kts
+++ b/buildSrc/src/main/kotlin/kotlin-conventions.gradle.kts
@@ -40,4 +40,5 @@ java {
tasks.withType<KotlinCompile>().configureEach {
kotlinOptions.jvmTarget = Libs.jvmTarget.toString()
kotlinOptions.freeCompilerArgs += "-Xopt-in=kotlin.RequiresOptIn"
+ kotlinOptions.freeCompilerArgs += "-Xjvm-default=all"
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index be7bc667..5ea577f3 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -31,17 +31,15 @@ import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
import org.opendc.compute.service.driver.*
import org.opendc.simulator.compute.*
-import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL
-import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
import org.opendc.simulator.compute.kernel.SimHypervisor
import org.opendc.simulator.compute.kernel.SimHypervisorProvider
import org.opendc.simulator.compute.kernel.cpufreq.PerformanceScalingGovernor
import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor
+import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.power.ConstantPowerModel
import org.opendc.simulator.compute.power.PowerDriver
-import org.opendc.simulator.compute.power.PowerModel
import org.opendc.simulator.compute.power.SimplePowerDriver
import org.opendc.simulator.failures.FailureDomain
import org.opendc.simulator.resources.SimResourceInterpreter
@@ -61,24 +59,11 @@ public class SimHost(
interpreter: SimResourceInterpreter,
meter: Meter,
hypervisor: SimHypervisorProvider,
- scalingGovernor: ScalingGovernor,
- scalingDriver: PowerDriver,
+ scalingGovernor: ScalingGovernor = PerformanceScalingGovernor(),
+ powerDriver: PowerDriver = SimplePowerDriver(ConstantPowerModel(0.0)),
private val mapper: SimWorkloadMapper = SimMetaWorkloadMapper(),
+ interferenceDomain: VmInterferenceDomain? = null
) : Host, FailureDomain, AutoCloseable {
-
- public constructor(
- uid: UUID,
- name: String,
- model: MachineModel,
- meta: Map<String, Any>,
- context: CoroutineContext,
- interpreter: SimResourceInterpreter,
- meter: Meter,
- hypervisor: SimHypervisorProvider,
- powerModel: PowerModel = ConstantPowerModel(0.0),
- mapper: SimWorkloadMapper = SimMetaWorkloadMapper(),
- ) : this(uid, name, model, meta, context, interpreter, meter, hypervisor, PerformanceScalingGovernor(), SimplePowerDriver(powerModel), mapper)
-
/**
* The [CoroutineScope] of the host bounded by the lifecycle of the host.
*/
@@ -102,13 +87,15 @@ public class SimHost(
/**
* The machine to run on.
*/
- public val machine: SimBareMetalMachine = SimBareMetalMachine(interpreter, model, scalingDriver)
+ public val machine: SimBareMetalMachine = SimBareMetalMachine(interpreter, model, powerDriver)
/**
* The hypervisor to run multiple workloads.
*/
public val hypervisor: SimHypervisor = hypervisor.create(
interpreter,
+ scalingGovernor = scalingGovernor,
+ interferenceDomain = interferenceDomain,
listener = object : SimHypervisor.Listener {
override fun onSliceFinish(
hypervisor: SimHypervisor,
@@ -260,7 +247,7 @@ public class SimHost(
}
require(canFit(server)) { "Server does not fit" }
- val guest = Guest(server, hypervisor.createMachine(server.flavor.toMachineModel()))
+ val guest = Guest(server, hypervisor.createMachine(server.flavor.toMachineModel(), server.name))
guests[server] = guest
_guests.add(1)
@@ -317,23 +304,11 @@ public class SimHost(
}
private fun onGuestStart(vm: Guest) {
- guests.forEach { (_, guest) ->
- if (guest.state == ServerState.RUNNING) {
- vm.performanceInterferenceModel?.onStart(vm.server.image.name)
- }
- }
-
_activeGuests.add(1)
listeners.forEach { it.onStateChanged(this, vm.server, vm.state) }
}
private fun onGuestStop(vm: Guest) {
- guests.forEach { (_, guest) ->
- if (guest.state == ServerState.RUNNING) {
- vm.performanceInterferenceModel?.onStop(vm.server.image.name)
- }
- }
-
_activeGuests.add(-1)
listeners.forEach { it.onStateChanged(this, vm.server, vm.state) }
}
@@ -350,8 +325,6 @@ public class SimHost(
* A virtual machine instance that the driver manages.
*/
private inner class Guest(val server: Server, val machine: SimMachine) {
- val performanceInterferenceModel: PerformanceInterferenceModel? = server.meta[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel?
-
var state: ServerState = ServerState.TERMINATED
suspend fun start() {
diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index 5414d042..5a6fb03d 100644
--- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -33,11 +33,7 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
-import org.opendc.compute.api.Flavor
-import org.opendc.compute.api.Image
-import org.opendc.compute.api.Server
-import org.opendc.compute.api.ServerState
-import org.opendc.compute.api.ServerWatcher
+import org.opendc.compute.api.*
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.driver.HostListener
import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider
@@ -50,7 +46,7 @@ import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.resources.SimResourceInterpreter
import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import org.opendc.telemetry.sdk.toOtelClock
-import java.util.UUID
+import java.util.*
import kotlin.coroutines.resume
/**
@@ -85,7 +81,16 @@ internal class SimHostTest {
.build()
val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val virtDriver = SimHost(UUID.randomUUID(), "test", machineModel, emptyMap(), coroutineContext, interpreter, meterProvider.get("opendc-compute-simulator"), SimFairShareHypervisorProvider())
+ val virtDriver = SimHost(
+ uid = UUID.randomUUID(),
+ name = "test",
+ model = machineModel,
+ meta = emptyMap(),
+ coroutineContext,
+ interpreter,
+ meterProvider.get("opendc-compute-simulator"),
+ SimFairShareHypervisorProvider()
+ )
val duration = 5 * 60L
val vmImageA = MockImage(
UUID.randomUUID(),
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
index 47f5f71e..9548253d 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
@@ -41,11 +41,11 @@ import org.opendc.compute.service.scheduler.ComputeScheduler
import org.opendc.compute.simulator.SimHost
import org.opendc.experiments.capelin.monitor.ExperimentMetricExporter
import org.opendc.experiments.capelin.monitor.ExperimentMonitor
-import org.opendc.experiments.capelin.trace.Sc20StreamingParquetTraceReader
import org.opendc.format.environment.EnvironmentReader
import org.opendc.format.trace.TraceReader
-import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider
+import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
+import org.opendc.simulator.compute.power.SimplePowerDriver
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.failures.CorrelatedFaultInjector
@@ -53,7 +53,6 @@ import org.opendc.simulator.failures.FaultInjector
import org.opendc.simulator.resources.SimResourceInterpreter
import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import org.opendc.telemetry.sdk.toOtelClock
-import java.io.File
import java.time.Clock
import kotlin.coroutines.resume
import kotlin.math.ln
@@ -68,7 +67,7 @@ private val logger = KotlinLogging.logger {}
/**
* Construct the failure domain for the experiments.
*/
-public fun createFailureDomain(
+fun createFailureDomain(
coroutineScope: CoroutineScope,
clock: Clock,
seed: Int,
@@ -100,7 +99,7 @@ public fun createFailureDomain(
/**
* Obtain the [FaultInjector] to use for the experiments.
*/
-public fun createFaultInjector(
+fun createFaultInjector(
coroutineScope: CoroutineScope,
clock: Clock,
random: Random,
@@ -119,30 +118,14 @@ public fun createFaultInjector(
}
/**
- * Create the trace reader from which the VM workloads are read.
- */
-public fun createTraceReader(
- path: File,
- performanceInterferenceModel: PerformanceInterferenceModel,
- vms: List<String>,
- seed: Int
-): Sc20StreamingParquetTraceReader {
- return Sc20StreamingParquetTraceReader(
- path,
- performanceInterferenceModel,
- vms,
- Random(seed)
- )
-}
-
-/**
* Construct the environment for a simulated compute service..
*/
-public suspend fun withComputeService(
+suspend fun withComputeService(
clock: Clock,
meterProvider: MeterProvider,
environmentReader: EnvironmentReader,
scheduler: ComputeScheduler,
+ interferenceModel: VmInterferenceModel? = null,
block: suspend CoroutineScope.(ComputeService) -> Unit
): Unit = coroutineScope {
val interpreter = SimResourceInterpreter(coroutineContext, clock)
@@ -158,7 +141,8 @@ public suspend fun withComputeService(
interpreter,
meterProvider.get("opendc-compute-simulator"),
SimFairShareHypervisorProvider(),
- def.powerModel
+ powerDriver = SimplePowerDriver(def.powerModel),
+ interferenceDomain = interferenceModel?.newDomain()
)
}
@@ -181,16 +165,13 @@ public suspend fun withComputeService(
/**
* Attach the specified monitor to the VM provisioner.
*/
-@OptIn(ExperimentalCoroutinesApi::class)
-public suspend fun withMonitor(
+suspend fun withMonitor(
monitor: ExperimentMonitor,
clock: Clock,
metricProducer: MetricProducer,
scheduler: ComputeService,
block: suspend CoroutineScope.() -> Unit
): Unit = coroutineScope {
- val monitorJobs = mutableSetOf<Job>()
-
// Monitor host events
for (host in scheduler.hosts) {
monitor.reportHostStateChange(clock.millis(), host, HostState.UP)
@@ -211,24 +192,23 @@ public suspend fun withMonitor(
try {
block(this)
} finally {
- monitorJobs.forEach(Job::cancel)
reader.close()
monitor.close()
}
}
-public class ComputeMetrics {
- public var submittedVms: Int = 0
- public var queuedVms: Int = 0
- public var runningVms: Int = 0
- public var unscheduledVms: Int = 0
- public var finishedVms: Int = 0
+class ComputeMetrics {
+ var submittedVms: Int = 0
+ var queuedVms: Int = 0
+ var runningVms: Int = 0
+ var unscheduledVms: Int = 0
+ var finishedVms: Int = 0
}
/**
* Collect the metrics of the compute service.
*/
-public fun collectMetrics(metricProducer: MetricProducer): ComputeMetrics {
+fun collectMetrics(metricProducer: MetricProducer): ComputeMetrics {
val metrics = metricProducer.collectAllMetrics().associateBy { it.name }
val res = ComputeMetrics()
try {
@@ -247,7 +227,7 @@ public fun collectMetrics(metricProducer: MetricProducer): ComputeMetrics {
/**
* Process the trace.
*/
-public suspend fun processTrace(
+suspend fun processTrace(
clock: Clock,
reader: TraceReader<SimWorkload>,
scheduler: ComputeService,
@@ -306,7 +286,7 @@ public suspend fun processTrace(
/**
* Create a [MeterProvider] instance for the experiment.
*/
-public fun createMeterProvider(clock: Clock): MeterProvider {
+fun createMeterProvider(clock: Clock): MeterProvider {
val powerSelector = InstrumentSelector.builder()
.setInstrumentNameRegex("power\\.usage")
.setInstrumentType(InstrumentType.VALUE_RECORDER)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index b70eefb2..cbb5bfd9 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -32,29 +32,30 @@ import org.opendc.compute.service.scheduler.*
import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter
import org.opendc.compute.service.scheduler.filters.ComputeFilter
import org.opendc.compute.service.scheduler.weights.*
+import org.opendc.experiments.capelin.env.ClusterEnvironmentReader
import org.opendc.experiments.capelin.model.CompositeWorkload
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
import org.opendc.experiments.capelin.monitor.ParquetExperimentMonitor
-import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader
-import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader
-import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
-import org.opendc.format.trace.PerformanceInterferenceModelReader
+import org.opendc.experiments.capelin.trace.ParquetTraceReader
+import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader
+import org.opendc.experiments.capelin.trace.RawParquetTraceReader
import org.opendc.harness.dsl.Experiment
import org.opendc.harness.dsl.anyOf
+import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.core.runBlockingSimulation
import java.io.File
+import java.io.FileInputStream
import java.util.*
import java.util.concurrent.ConcurrentHashMap
-import kotlin.random.asKotlinRandom
/**
* A portfolio represents a collection of scenarios are tested for the work.
*
* @param name The name of the portfolio.
*/
-public abstract class Portfolio(name: String) : Experiment(name) {
+abstract class Portfolio(name: String) : Experiment(name) {
/**
* The logger for this portfolio instance.
*/
@@ -71,34 +72,29 @@ public abstract class Portfolio(name: String) : Experiment(name) {
private val vmPlacements by anyOf(emptyMap<String, String>())
/**
- * The path to the performance interference model.
- */
- private val performanceInterferenceModel by anyOf<PerformanceInterferenceModelReader?>(null)
-
- /**
* The topology to test.
*/
- public abstract val topology: Topology
+ abstract val topology: Topology
/**
* The workload to test.
*/
- public abstract val workload: Workload
+ abstract val workload: Workload
/**
* The operational phenomenas to consider.
*/
- public abstract val operationalPhenomena: OperationalPhenomena
+ abstract val operationalPhenomena: OperationalPhenomena
/**
* The allocation policies to consider.
*/
- public abstract val allocationPolicy: String
+ abstract val allocationPolicy: String
/**
* A map of trace readers.
*/
- private val traceReaders = ConcurrentHashMap<String, Sc20RawParquetTraceReader>()
+ private val traceReaders = ConcurrentHashMap<String, RawParquetTraceReader>()
/**
* Perform a single trial for this portfolio.
@@ -106,7 +102,7 @@ public abstract class Portfolio(name: String) : Experiment(name) {
@OptIn(ExperimentalCoroutinesApi::class)
override fun doRun(repeat: Int): Unit = runBlockingSimulation {
val seeder = Random(repeat.toLong())
- val environment = Sc20ClusterEnvironmentReader(File(config.getString("env-path"), "${topology.name}.txt"))
+ val environment = ClusterEnvironmentReader(File(config.getString("env-path"), "${topology.name}.txt"))
val chan = Channel<Unit>(Channel.CONFLATED)
val allocationPolicy = createComputeScheduler(seeder)
@@ -122,14 +118,17 @@ public abstract class Portfolio(name: String) : Experiment(name) {
val rawReaders = workloadNames.map { workloadName ->
traceReaders.computeIfAbsent(workloadName) {
logger.info { "Loading trace $workloadName" }
- Sc20RawParquetTraceReader(File(config.getString("trace-path"), workloadName))
+ RawParquetTraceReader(File(config.getString("trace-path"), workloadName))
}
}
- val performanceInterferenceModel = performanceInterferenceModel
- ?.takeIf { operationalPhenomena.hasInterference }
- ?.construct(seeder.asKotlinRandom()) ?: emptyMap()
- val trace = Sc20ParquetTraceReader(rawReaders, performanceInterferenceModel, workload, seeder.nextInt())
+ val performanceInterferenceModel = if (operationalPhenomena.hasInterference)
+ PerformanceInterferenceReader(FileInputStream(config.getString("interference-model")))
+ .use { VmInterferenceModel(it.read(), Random(seeder.nextLong())) }
+ else
+ null
+
+ val trace = ParquetTraceReader(rawReaders, workload, seeder.nextInt())
val monitor = ParquetExperimentMonitor(
File(config.getString("output-path")),
@@ -137,7 +136,7 @@ public abstract class Portfolio(name: String) : Experiment(name) {
4096
)
- withComputeService(clock, meterProvider, environment, allocationPolicy) { scheduler ->
+ withComputeService(clock, meterProvider, environment, allocationPolicy, performanceInterferenceModel) { scheduler ->
val failureDomain = if (operationalPhenomena.failureFrequency > 0) {
logger.debug("ENABLING failures")
createFailureDomain(
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt
index 1efd2ddf..d73d14f5 100644
--- a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.format.environment.sc20
+package org.opendc.experiments.capelin.env
import org.opendc.format.environment.EnvironmentReader
import org.opendc.format.environment.MachineDef
@@ -37,22 +37,22 @@ import java.util.*
/**
* A [EnvironmentReader] for the internal environment format.
*
- * @param environmentFile The file describing the physical cluster.
+ * @param input The input stream describing the physical cluster.
*/
-public class Sc20ClusterEnvironmentReader(
- private val input: InputStream
-) : EnvironmentReader {
+class ClusterEnvironmentReader(private val input: InputStream) : EnvironmentReader {
+ /**
+ * Construct a [ClusterEnvironmentReader] for the specified [file].
+ */
+ constructor(file: File) : this(FileInputStream(file))
- public constructor(file: File) : this(FileInputStream(file))
-
- public override fun read(): List<MachineDef> {
+ override fun read(): List<MachineDef> {
var clusterIdCol = 0
var speedCol = 0
var numberOfHostsCol = 0
var memoryPerHostCol = 0
var coresPerHostCol = 0
- var clusterIdx: Int = 0
+ var clusterIdx = 0
var clusterId: String
var speed: Double
var numberOfHosts: Int
@@ -116,5 +116,7 @@ public class Sc20ClusterEnvironmentReader(
return nodes
}
- override fun close() {}
+ override fun close() {
+ input.close()
+ }
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt
index 7f25137e..5ad75565 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt
@@ -26,21 +26,17 @@ import org.opendc.experiments.capelin.model.CompositeWorkload
import org.opendc.experiments.capelin.model.Workload
import org.opendc.format.trace.TraceEntry
import org.opendc.format.trace.TraceReader
-import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL
-import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
import org.opendc.simulator.compute.workload.SimWorkload
-import java.util.TreeSet
/**
* A [TraceReader] for the internal VM workload trace format.
*
- * @param reader The internal trace reader to use.
- * @param performanceInterferenceModel The performance model covering the workload in the VM trace.
- * @param run The run to which this reader belongs.
+ * @param rawReaders The internal raw trace readers to use.
+ * @param workload The workload to read.
+ * @param seed The seed to use for sampling.
*/
-public class Sc20ParquetTraceReader(
- rawReaders: List<Sc20RawParquetTraceReader>,
- performanceInterferenceModel: Map<String, PerformanceInterferenceModel>,
+public class ParquetTraceReader(
+ rawReaders: List<RawParquetTraceReader>,
workload: Workload,
seed: Int
) : TraceReader<SimWorkload> {
@@ -59,20 +55,6 @@ public class Sc20ParquetTraceReader(
}
.map { sampleWorkload(it.first, workload, it.second, seed) }
.flatten()
- .run {
- // Apply performance interference model
- if (performanceInterferenceModel.isEmpty())
- this
- else {
- map { entry ->
- val id = entry.name
- val relevantPerformanceInterferenceModelItems =
- performanceInterferenceModel[id] ?: PerformanceInterferenceModel(TreeSet())
-
- entry.copy(meta = entry.meta + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems))
- }
- }
- }
.iterator()
override fun hasNext(): Boolean = iterator.hasNext()
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReader.kt
index 4267737d..a19f5699 100644
--- a/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReader.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,46 +20,46 @@
* SOFTWARE.
*/
-package org.opendc.format.trace.sc20
+package org.opendc.experiments.capelin.trace
+import com.fasterxml.jackson.annotation.JsonProperty
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
-import org.opendc.format.trace.PerformanceInterferenceModelReader
-import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
+import org.opendc.simulator.compute.kernel.interference.VmInterferenceGroup
import java.io.InputStream
-import java.util.*
-import kotlin.random.Random
/**
- * A parser for the JSON performance interference setup files used for the SC20 paper.
+ * A parser for the JSON performance interference setup files used for the TPDS article on Capelin.
*
* @param input The input stream to read from.
* @param mapper The Jackson object mapper to use.
*/
-public class Sc20PerformanceInterferenceReader(input: InputStream, mapper: ObjectMapper = jacksonObjectMapper()) :
- PerformanceInterferenceModelReader {
- /**
- * The computed value from the file.
- */
- private val items: Map<String, TreeSet<PerformanceInterferenceModel.Item>>
-
+class PerformanceInterferenceReader(
+ private val input: InputStream,
+ private val mapper: ObjectMapper = jacksonObjectMapper()
+) : AutoCloseable {
init {
- val entries: List<PerformanceInterferenceEntry> = mapper.readValue(input)
- val res = mutableMapOf<String, TreeSet<PerformanceInterferenceModel.Item>>()
- for (entry in entries) {
- val item = PerformanceInterferenceModel.Item(TreeSet(entry.vms), entry.minServerLoad, entry.performanceScore)
- for (workload in entry.vms) {
- res.computeIfAbsent(workload) { TreeSet() }.add(item)
- }
- }
+ mapper.addMixIn(VmInterferenceGroup::class.java, GroupMixin::class.java)
+ }
- items = res
+ /**
+ * Read the performance interface model from the input.
+ */
+ fun read(): List<VmInterferenceGroup> {
+ return mapper.readValue(input)
}
- override fun construct(random: Random): Map<String, PerformanceInterferenceModel> {
- return items.mapValues { PerformanceInterferenceModel(it.value, Random(random.nextInt())) }
+ override fun close() {
+ input.close()
}
- override fun close() {}
+ private data class GroupMixin(
+ @JsonProperty("minServerLoad")
+ val targetLoad: Double,
+ @JsonProperty("performanceScore")
+ val score: Double,
+ @JsonProperty("vms")
+ val members: Set<String>,
+ )
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt
index 54151c9f..94193780 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt
@@ -22,7 +22,6 @@
package org.opendc.experiments.capelin.trace
-import mu.KotlinLogging
import org.apache.avro.generic.GenericData
import org.opendc.format.trace.TraceEntry
import org.opendc.format.trace.TraceReader
@@ -32,14 +31,12 @@ import org.opendc.simulator.compute.workload.SimWorkload
import java.io.File
import java.util.UUID
-private val logger = KotlinLogging.logger {}
-
/**
* A [TraceReader] for the internal VM workload trace format.
*
* @param path The directory of the traces.
*/
-public class Sc20RawParquetTraceReader(private val path: File) {
+class RawParquetTraceReader(private val path: File) {
/**
* Read the fragments into memory.
*/
@@ -136,14 +133,5 @@ public class Sc20RawParquetTraceReader(private val path: File) {
/**
* Read the entries in the trace.
*/
- public fun read(): List<TraceEntry<SimWorkload>> = entries
-
- /**
- * Create a [TraceReader] instance.
- */
- public fun createReader(): TraceReader<SimWorkload> {
- return object : TraceReader<SimWorkload>, Iterator<TraceEntry<SimWorkload>> by entries.iterator() {
- override fun close() {}
- }
- }
+ fun read(): List<TraceEntry<SimWorkload>> = entries
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt
index 6792c2ab..a3b45f47 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt
@@ -33,8 +33,6 @@ import org.apache.parquet.io.api.Binary
import org.opendc.format.trace.TraceEntry
import org.opendc.format.trace.TraceReader
import org.opendc.format.util.LocalInputFile
-import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL
-import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.compute.workload.SimWorkload
import java.io.File
@@ -44,7 +42,6 @@ import java.util.TreeSet
import java.util.UUID
import java.util.concurrent.ArrayBlockingQueue
import kotlin.concurrent.thread
-import kotlin.random.Random
private val logger = KotlinLogging.logger {}
@@ -52,14 +49,9 @@ private val logger = KotlinLogging.logger {}
* A [TraceReader] for the internal VM workload trace format that streams workloads on the fly.
*
* @param traceFile The directory of the traces.
- * @param performanceInterferenceModel The performance model covering the workload in the VM trace.
+ * @param selectedVms The list of VMs to read from the trace.
*/
-public class Sc20StreamingParquetTraceReader(
- traceFile: File,
- performanceInterferenceModel: PerformanceInterferenceModel? = null,
- selectedVms: List<String> = emptyList(),
- random: Random
-) : TraceReader<SimWorkload> {
+class StreamingParquetTraceReader(traceFile: File, selectedVms: List<String> = emptyList()) : TraceReader<SimWorkload> {
/**
* The internal iterator to use for this reader.
*/
@@ -227,14 +219,6 @@ public class Sc20StreamingParquetTraceReader(
buffers.remove(id)
}
- val relevantPerformanceInterferenceModelItems =
- if (performanceInterferenceModel != null)
- PerformanceInterferenceModel(
- performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSortedSet(),
- Random(random.nextInt())
- )
- else
- null
val workload = SimTraceWorkload(fragments)
val meta = mapOf(
"cores" to maxCores,
@@ -242,13 +226,7 @@ public class Sc20StreamingParquetTraceReader(
"workload" to workload
)
- TraceEntry(
- uid, id, submissionTime, workload,
- if (performanceInterferenceModel != null)
- meta + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems as Any)
- else
- meta
- )
+ TraceEntry(uid, id, submissionTime, workload, meta)
}
.sortedBy { it.start }
.toList()
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt
index d0031a66..7cd1f159 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt
@@ -41,7 +41,6 @@ import org.apache.avro.generic.GenericData
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
-import org.opendc.format.trace.sc20.Sc20VmPlacementReader
import org.opendc.format.util.LocalOutputFile
import java.io.BufferedReader
import java.io.File
@@ -53,7 +52,7 @@ import kotlin.math.min
/**
* Represents the command for converting traces
*/
-public class TraceConverterCli : CliktCommand(name = "trace-converter") {
+class TraceConverterCli : CliktCommand(name = "trace-converter") {
/**
* The directory where the trace should be stored.
*/
@@ -149,24 +148,24 @@ public class TraceConverterCli : CliktCommand(name = "trace-converter") {
/**
* The supported trace conversions.
*/
-public sealed class TraceConversion(name: String) : OptionGroup(name) {
+sealed class TraceConversion(name: String) : OptionGroup(name) {
/**
* Read the fragments of the trace.
*/
- public abstract fun read(
+ abstract fun read(
traceDirectory: File,
metaSchema: Schema,
metaWriter: ParquetWriter<GenericData.Record>
): MutableList<Fragment>
}
-public class SolvinityConversion : TraceConversion("Solvinity") {
+class SolvinityConversion : TraceConversion("Solvinity") {
private val clusters by option()
.split(",")
private val vmPlacements by option("--vm-placements", help = "file containing the VM placements")
.file(canBeDir = false)
- .convert { it.inputStream().buffered().use { Sc20VmPlacementReader(it).construct() } }
+ .convert { VmPlacementReader(it.inputStream()).use { reader -> reader.read() } }
.required()
override fun read(
@@ -335,7 +334,7 @@ public class SolvinityConversion : TraceConversion("Solvinity") {
/**
* Conversion of the Bitbrains public trace.
*/
-public class BitbrainsConversion : TraceConversion("Bitbrains") {
+class BitbrainsConversion : TraceConversion("Bitbrains") {
override fun read(
traceDirectory: File,
metaSchema: Schema,
@@ -447,7 +446,7 @@ public class BitbrainsConversion : TraceConversion("Bitbrains") {
/**
* Conversion of the Azure public VM trace.
*/
-public class AzureConversion : TraceConversion("Azure") {
+class AzureConversion : TraceConversion("Azure") {
private val seed by option(help = "seed for trace sampling")
.long()
.default(0)
@@ -604,18 +603,18 @@ public class AzureConversion : TraceConversion("Azure") {
}
}
-public data class Fragment(
- public val id: String,
- public val tick: Long,
- public val flops: Long,
- public val duration: Long,
- public val usage: Double,
- public val cores: Int
+data class Fragment(
+ val id: String,
+ val tick: Long,
+ val flops: Long,
+ val duration: Long,
+ val usage: Double,
+ val cores: Int
)
-public class VmInfo(public val cores: Int, public val requiredMemory: Long, public var minTime: Long, public var maxTime: Long)
+class VmInfo(val cores: Int, val requiredMemory: Long, var minTime: Long, var maxTime: Long)
/**
* A script to convert a trace in text format into a Parquet trace.
*/
-public fun main(args: Array<String>): Unit = TraceConverterCli().main(args)
+fun main(args: Array<String>): Unit = TraceConverterCli().main(args)
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20VmPlacementReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt
index 61bdea60..7a1683f0 100644
--- a/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20VmPlacementReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,32 +20,33 @@
* SOFTWARE.
*/
-package org.opendc.format.trace.sc20
+package org.opendc.experiments.capelin.trace
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
-import org.opendc.format.trace.VmPlacementReader
import java.io.InputStream
/**
- * A parser for the JSON VM placement data files used for the SC20 paper.
+ * A parser for the JSON VM placement data files used for the TPDS article on Capelin.
*
* @param input The input stream to read from.
* @param mapper The Jackson object mapper to use.
*/
-public class Sc20VmPlacementReader(input: InputStream, mapper: ObjectMapper = jacksonObjectMapper()) :
- VmPlacementReader {
+public class VmPlacementReader(
+ private val input: InputStream,
+ private val mapper: ObjectMapper = jacksonObjectMapper()
+) : AutoCloseable {
/**
- * The environment that was read from the file.
+ * Read the VM placements from the input.
*/
- private val placements = mapper.readValue<Map<String, String>>(input)
-
- override fun construct(): Map<String, String> {
- return placements
+ public fun read(): Map<String, String> {
+ return mapper.readValue<Map<String, String>>(input)
.mapKeys { "vm__workload__${it.key}.txt" }
.mapValues { it.value.split("/")[1] } // Clusters have format XX0 / X00
}
- override fun close() {}
+ override fun close() {
+ input.close()
+ }
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 4b21b4f7..08e04ddf 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -34,12 +34,12 @@ import org.opendc.compute.service.scheduler.FilterScheduler
import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter
import org.opendc.compute.service.scheduler.filters.ComputeFilter
import org.opendc.compute.service.scheduler.weights.CoreMemoryWeigher
+import org.opendc.experiments.capelin.env.ClusterEnvironmentReader
import org.opendc.experiments.capelin.model.Workload
import org.opendc.experiments.capelin.monitor.ExperimentMonitor
-import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader
-import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader
+import org.opendc.experiments.capelin.trace.ParquetTraceReader
+import org.opendc.experiments.capelin.trace.RawParquetTraceReader
import org.opendc.format.environment.EnvironmentReader
-import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
import org.opendc.format.trace.TraceReader
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.core.runBlockingSimulation
@@ -161,9 +161,8 @@ class CapelinIntegrationTest {
* Obtain the trace reader for the test.
*/
private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<SimWorkload> {
- return Sc20ParquetTraceReader(
- listOf(Sc20RawParquetTraceReader(File("src/test/resources/trace"))),
- emptyMap(),
+ return ParquetTraceReader(
+ listOf(RawParquetTraceReader(File("src/test/resources/trace"))),
Workload("test", fraction),
seed
)
@@ -173,8 +172,8 @@ class CapelinIntegrationTest {
* Obtain the environment reader for the test.
*/
private fun createTestEnvironmentReader(name: String = "topology"): EnvironmentReader {
- val stream = object {}.javaClass.getResourceAsStream("/env/$name.txt")
- return Sc20ClusterEnvironmentReader(stream)
+ val stream = checkNotNull(object {}.javaClass.getResourceAsStream("/env/$name.txt"))
+ return ClusterEnvironmentReader(stream)
}
class TestExperimentReporter : ExperimentMonitor {
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt
new file mode 100644
index 00000000..9b1513dc
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace
+
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertAll
+
+/**
+ * Test suite for the [PerformanceInterferenceReader] class.
+ */
+class PerformanceInterferenceReaderTest {
+ @Test
+ fun testSmoke() {
+ val input = checkNotNull(PerformanceInterferenceReader::class.java.getResourceAsStream("/perf-interference.json"))
+ val reader = PerformanceInterferenceReader(input)
+
+ val result = reader.use { reader.read() }
+
+ assertAll(
+ { assertEquals(2, result.size) },
+ { assertEquals(setOf("vm_a", "vm_c", "vm_x", "vm_y"), result[0].members) },
+ { assertEquals(0.0, result[0].targetLoad, 0.001) },
+ { assertEquals(0.8830158730158756, result[0].score, 0.001) }
+ )
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/perf-interference.json b/opendc-experiments/opendc-experiments-capelin/src/test/resources/perf-interference.json
new file mode 100644
index 00000000..1be5852b
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/perf-interference.json
@@ -0,0 +1,22 @@
+[
+ {
+ "vms": [
+ "vm_a",
+ "vm_c",
+ "vm_x",
+ "vm_y"
+ ],
+ "minServerLoad": 0.0,
+ "performanceScore": 0.8830158730158756
+ },
+ {
+ "vms": [
+ "vm_a",
+ "vm_b",
+ "vm_c",
+ "vm_d"
+ ],
+ "minServerLoad": 0.0,
+ "performanceScore": 0.7133055555552751
+ }
+]
diff --git a/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt b/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt
index 28928dcb..8fc4f6b8 100644
--- a/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt
+++ b/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt
@@ -38,7 +38,7 @@ import org.opendc.compute.service.scheduler.weights.RandomWeigher
import org.opendc.compute.simulator.SimHost
import org.opendc.experiments.capelin.*
import org.opendc.experiments.capelin.monitor.ParquetExperimentMonitor
-import org.opendc.experiments.capelin.trace.Sc20StreamingParquetTraceReader
+import org.opendc.experiments.capelin.trace.StreamingParquetTraceReader
import org.opendc.harness.dsl.Experiment
import org.opendc.harness.dsl.anyOf
import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider
@@ -53,7 +53,6 @@ import org.opendc.simulator.resources.SimResourceInterpreter
import java.io.File
import java.time.Clock
import java.util.*
-import kotlin.random.asKotlinRandom
/**
* Experiments for the OpenDC project on Energy modeling.
@@ -88,7 +87,7 @@ public class EnergyExperiment : Experiment("Energy Modeling 2021") {
val meterProvider: MeterProvider = createMeterProvider(clock)
val monitor = ParquetExperimentMonitor(File(config.getString("output-path")), "power_model=$powerModel/run_id=$repeat", 4096)
- val trace = Sc20StreamingParquetTraceReader(File(config.getString("trace-path"), trace), random = Random(1).asKotlinRandom())
+ val trace = StreamingParquetTraceReader(File(config.getString("trace-path"), trace))
withComputeService(clock, meterProvider, allocationPolicy) { scheduler ->
withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Model.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Model.kt
deleted file mode 100644
index 58af8453..00000000
--- a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Model.kt
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.format.environment.sc20
-
-import com.fasterxml.jackson.annotation.JsonSubTypes
-import com.fasterxml.jackson.annotation.JsonTypeInfo
-
-/**
- * A topology setup.
- *
- * @property name The name of the setup.
- * @property rooms The rooms in the topology.
- */
-internal data class Setup(val name: String, val rooms: List<Room>)
-
-/**
- * A room in a topology.
- *
- * @property type The type of room in the topology.
- * @property objects The objects in the room.
- */
-internal data class Room(val type: String, val objects: List<RoomObject>)
-
-/**
- * An object in a [Room].
- *
- * @property type The type of the room object.
- */
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type")
-@JsonSubTypes(value = [JsonSubTypes.Type(name = "RACK", value = RoomObject.Rack::class)])
-internal sealed class RoomObject(val type: String) {
- /**
- * A rack in a server room.
- *
- * @property machines The machines in the rack.
- */
- internal data class Rack(val machines: List<Machine>) : RoomObject("RACK")
-}
-
-/**
- * A machine in the setup that consists of the specified CPU's represented as
- * integer identifiers and ethernet speed.
- *
- * @property cpus The CPUs in the machine represented as integer identifiers.
- * @property memories The memories in the machine represented as integer identifiers.
- */
-internal data class Machine(val cpus: List<Int>, val memories: List<Int>)
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt
deleted file mode 100644
index 9b77702e..00000000
--- a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.format.environment.sc20
-
-import com.fasterxml.jackson.databind.ObjectMapper
-import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
-import com.fasterxml.jackson.module.kotlin.readValue
-import org.opendc.format.environment.EnvironmentReader
-import org.opendc.format.environment.MachineDef
-import org.opendc.simulator.compute.model.MachineModel
-import org.opendc.simulator.compute.model.MemoryUnit
-import org.opendc.simulator.compute.model.ProcessingNode
-import org.opendc.simulator.compute.model.ProcessingUnit
-import org.opendc.simulator.compute.power.LinearPowerModel
-import java.io.InputStream
-import java.util.*
-
-/**
- * A parser for the JSON experiment setup files used for the SC20 paper.
- *
- * @param input The input stream to read from.
- * @param mapper The Jackson object mapper to use.
- */
-public class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonObjectMapper()) : EnvironmentReader {
- /**
- * The environment that was read from the file.
- */
- private val setup: Setup = mapper.readValue(input)
-
- /**
- * Read the environment.
- */
- public override fun read(): List<MachineDef> {
- var counter = 0
- return setup.rooms.flatMap { room ->
- room.objects.flatMap { roomObject ->
- when (roomObject) {
- is RoomObject.Rack -> {
- roomObject.machines.map { machine ->
- val cores = machine.cpus.flatMap { id ->
- when (id) {
- 1 -> {
- val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 4)
- List(node.coreCount) { ProcessingUnit(node, it, 4100.0) }
- }
- 2 -> {
- val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 2)
- List(node.coreCount) { ProcessingUnit(node, it, 3500.0) }
- }
- else -> throw IllegalArgumentException("The cpu id $id is not recognized")
- }
- }
- val memories = machine.memories.map { id ->
- when (id) {
- 1 -> MemoryUnit("Samsung", "PC DRAM K4A4G045WD", 1600.0, 4_000L)
- else -> throw IllegalArgumentException("The cpu id $id is not recognized")
- }
- }
- MachineDef(
- UUID(0L, counter++.toLong()),
- "node-$counter",
- emptyMap(),
- MachineModel(cores, memories),
- // For now we assume a simple linear load model with an idle draw of ~200W and a maximum
- // power draw of 350W.
- // Source: https://stackoverflow.com/questions/6128960
- LinearPowerModel(350.0, idlePower = 200.0)
- )
- }
- }
- }
- }
- }
- }
-
- override fun close() {}
-}
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt
index 7df1acd3..797a88d5 100644
--- a/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt
+++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt
@@ -22,8 +22,6 @@
package org.opendc.format.trace
-import java.io.Closeable
-
/**
* An interface for reading workloads into memory.
*
@@ -31,4 +29,4 @@ import java.io.Closeable
*
* @param T The shape of the workloads supported by this reader.
*/
-public interface TraceReader<T> : Iterator<TraceEntry<T>>, Closeable
+public interface TraceReader<T> : Iterator<TraceEntry<T>>, AutoCloseable
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt
index 769b2b13..aaf8a240 100644
--- a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt
+++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt
@@ -24,8 +24,6 @@ package org.opendc.format.trace.bitbrains
import org.opendc.format.trace.TraceEntry
import org.opendc.format.trace.TraceReader
-import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL
-import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.compute.workload.SimWorkload
import java.io.BufferedReader
@@ -38,12 +36,8 @@ import kotlin.math.min
* A [TraceReader] for the public VM workload trace format.
*
* @param traceDirectory The directory of the traces.
- * @param performanceInterferenceModel The performance model covering the workload in the VM trace.
*/
-public class BitbrainsTraceReader(
- traceDirectory: File,
- performanceInterferenceModel: PerformanceInterferenceModel
-) : TraceReader<SimWorkload> {
+public class BitbrainsTraceReader(traceDirectory: File) : TraceReader<SimWorkload> {
/**
* The internal iterator to use for this reader.
*/
@@ -123,12 +117,6 @@ public class BitbrainsTraceReader(
val uuid = UUID(0L, vmId)
- val relevantPerformanceInterferenceModelItems =
- PerformanceInterferenceModel(
- performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId.toString()) }
- .toSortedSet()
- )
-
val workload = SimTraceWorkload(flopsHistory.asSequence())
entries[vmId] = TraceEntry(
uuid,
@@ -136,7 +124,6 @@ public class BitbrainsTraceReader(
startTime,
workload,
mapOf(
- IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems,
"cores" to cores,
"required-memory" to requiredMemory,
"workload" to workload
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/PerformanceInterferenceEntry.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/PerformanceInterferenceEntry.kt
deleted file mode 100644
index 0da1f7c2..00000000
--- a/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/PerformanceInterferenceEntry.kt
+++ /dev/null
@@ -1,7 +0,0 @@
-package org.opendc.format.trace.sc20
-
-internal data class PerformanceInterferenceEntry(
- val vms: List<String>,
- val minServerLoad: Double,
- val performanceScore: Double
-)
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt
deleted file mode 100644
index 1eb4bac2..00000000
--- a/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.format.trace.sc20
-
-import org.opendc.format.trace.TraceEntry
-import org.opendc.format.trace.TraceReader
-import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL
-import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
-import org.opendc.simulator.compute.workload.SimTraceWorkload
-import org.opendc.simulator.compute.workload.SimWorkload
-import java.io.BufferedReader
-import java.io.File
-import java.io.FileReader
-import java.util.*
-import kotlin.math.max
-import kotlin.math.min
-import kotlin.random.Random
-
-/**
- * A [TraceReader] for the internal VM workload trace format.
- *
- * @param traceDirectory The directory of the traces.
- * @param performanceInterferenceModel The performance model covering the workload in the VM trace.
- */
-public class Sc20TraceReader(
- traceDirectory: File,
- performanceInterferenceModel: PerformanceInterferenceModel,
- selectedVms: List<String>,
- random: Random
-) : TraceReader<SimWorkload> {
- /**
- * The internal iterator to use for this reader.
- */
- private val iterator: Iterator<TraceEntry<SimWorkload>>
-
- /**
- * Initialize the reader.
- */
- init {
- val entries = mutableMapOf<UUID, TraceEntry<SimWorkload>>()
-
- val timestampCol = 0
- val cpuUsageCol = 1
- val coreCol = 12
- val provisionedMemoryCol = 20
- val traceInterval = 5 * 60 * 1000L
-
- val vms = if (selectedVms.isEmpty()) {
- traceDirectory.walk()
- .filterNot { it.isDirectory }
- .filter { it.extension == "csv" || it.extension == "txt" }
- .toList()
- } else {
- selectedVms.map {
- File(traceDirectory, it)
- }
- }
-
- vms
- .forEachIndexed { idx, vmFile ->
- println(vmFile)
-
- var vmId = ""
- var maxCores = -1
- var requiredMemory = -1L
- var timestamp: Long
- var cores = -1
- var minTime = Long.MAX_VALUE
-
- BufferedReader(FileReader(vmFile)).use { reader ->
- reader.lineSequence()
- .filter { line ->
- // Ignore comments in the trace
- !line.startsWith("#") && line.isNotBlank()
- }
- .forEach { line ->
- val values = line.split(" ")
-
- vmId = vmFile.name
- timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L
- cores = values[coreCol].trim().toInt()
- requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong())
- maxCores = max(maxCores, cores)
- minTime = min(minTime, timestamp)
- }
- }
-
- val flopsFragments = sequence {
- var last: SimTraceWorkload.Fragment? = null
-
- BufferedReader(FileReader(vmFile)).use { reader ->
- reader.lineSequence()
- .chunked(128)
- .forEach { lines ->
- for (line in lines) {
- // Ignore comments in the trace
- if (line.startsWith("#") || line.isBlank()) {
- continue
- }
-
- val values = line.split(" ")
- val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz
- requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong())
- maxCores = max(maxCores, cores)
-
- last = if (last != null && last!!.usage == 0.0 && cpuUsage == 0.0) {
- val oldFragment = last!!
- SimTraceWorkload.Fragment(
- oldFragment.duration + traceInterval,
- cpuUsage,
- cores
- )
- } else {
- val fragment =
- SimTraceWorkload.Fragment(traceInterval, cpuUsage, cores)
- if (last != null) {
- yield(last!!)
- }
- fragment
- }
- }
- }
-
- if (last != null) {
- yield(last!!)
- }
- }
- }
-
- val uuid = UUID(0, idx.toLong())
-
- val relevantPerformanceInterferenceModelItems =
- PerformanceInterferenceModel(
- performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId) }.toSortedSet(),
- Random(random.nextInt())
- )
- val workload = SimTraceWorkload(flopsFragments.asSequence())
- entries[uuid] = TraceEntry(
- uuid,
- vmId,
- minTime,
- workload,
- mapOf(
- IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems,
- "cores" to cores,
- "required-memory" to requiredMemory,
- "workload" to workload
- )
- )
- }
-
- // Create the entry iterator
- iterator = entries.values.sortedBy { it.start }.iterator()
- }
-
- override fun hasNext(): Boolean = iterator.hasNext()
-
- override fun next(): TraceEntry<SimWorkload> = iterator.next()
-
- override fun close() {}
-}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/interference/PerformanceInterferenceModel.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/interference/PerformanceInterferenceModel.kt
deleted file mode 100644
index 4c409887..00000000
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/interference/PerformanceInterferenceModel.kt
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.compute.interference
-
-import java.util.*
-import kotlin.random.Random
-
-/**
- * Meta-data key for the [PerformanceInterferenceModel] of an image.
- */
-public const val IMAGE_PERF_INTERFERENCE_MODEL: String = "image:performance-interference"
-
-/**
- * Performance Interference Model describing the variability incurred by different sets of workloads if colocated.
- *
- * @param items The [PerformanceInterferenceModel.Item]s that make up this model.
- */
-public class PerformanceInterferenceModel(
- public val items: SortedSet<Item>,
- private val random: Random = Random(0)
-) {
- private var intersectingItems: List<Item> = emptyList()
- private val colocatedWorkloads = TreeMap<String, Int>()
-
- /**
- * Indicate that a VM has started.
- */
- public fun onStart(name: String) {
- colocatedWorkloads.merge(name, 1, Int::plus)
- intersectingItems = items.filter { item -> doesMatch(item) }
- }
-
- /**
- * Indicate that a VM has stopped.
- */
- public fun onStop(name: String) {
- colocatedWorkloads.computeIfPresent(name) { _, v -> (v - 1).takeUnless { it == 0 } }
- intersectingItems = items.filter { item -> doesMatch(item) }
- }
-
- /**
- * Compute the performance interference based on the current server load.
- */
- public fun apply(currentServerLoad: Double): Double {
- if (intersectingItems.isEmpty()) {
- return 1.0
- }
- val score = intersectingItems
- .firstOrNull { it.minServerLoad <= currentServerLoad }
-
- // Apply performance penalty to (on average) only one of the VMs
- return if (score != null && random.nextInt(score.workloadNames.size) == 0) {
- score.performanceScore
- } else {
- 1.0
- }
- }
-
- private fun doesMatch(item: Item): Boolean {
- var count = 0
- for (
- name in item.workloadNames.subSet(
- colocatedWorkloads.firstKey(),
- colocatedWorkloads.lastKey() + "\u0000"
- )
- ) {
- count += colocatedWorkloads.getOrDefault(name, 0)
- if (count > 1)
- return true
- }
- return false
- }
-
- /**
- * Model describing how a specific set of workloads causes performance variability for each workload.
- *
- * @param workloadNames The names of the workloads that together cause performance variability for each workload in the set.
- * @param minServerLoad The minimum total server load at which this interference is activated and noticeable.
- * @param performanceScore The performance score that should be applied to each workload's performance. 1 means no
- * influence, <1 means that performance degrades, and >1 means that performance improves.
- */
- public data class Item(
- public val workloadNames: SortedSet<String>,
- public val minServerLoad: Double,
- public val performanceScore: Double
- ) : Comparable<Item> {
- override fun equals(other: Any?): Boolean {
- if (this === other) return true
- if (javaClass != other?.javaClass) return false
-
- other as Item
-
- if (workloadNames != other.workloadNames) return false
-
- return true
- }
-
- override fun hashCode(): Int = workloadNames.hashCode()
-
- override fun compareTo(other: Item): Int {
- var cmp = performanceScore.compareTo(other.performanceScore)
- if (cmp != 0) {
- return cmp
- }
-
- cmp = minServerLoad.compareTo(other.minServerLoad)
- if (cmp != 0) {
- return cmp
- }
-
- return hashCode().compareTo(other.hashCode())
- }
- }
-}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt
index fb46dab4..d287312f 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt
@@ -23,9 +23,9 @@
package org.opendc.simulator.compute.kernel
import org.opendc.simulator.compute.*
-import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor
import org.opendc.simulator.compute.kernel.cpufreq.ScalingPolicy
+import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.resources.*
@@ -39,7 +39,8 @@ import org.opendc.simulator.resources.SimResourceSwitch
*/
public abstract class SimAbstractHypervisor(
private val interpreter: SimResourceInterpreter,
- private val scalingGovernor: ScalingGovernor?
+ private val scalingGovernor: ScalingGovernor? = null,
+ protected val interferenceDomain: VmInterferenceDomain? = null
) : SimHypervisor {
/**
* The machine on which the hypervisor runs.
@@ -87,12 +88,9 @@ public abstract class SimAbstractHypervisor(
return canFit(model, switch)
}
- override fun createMachine(
- model: MachineModel,
- performanceInterferenceModel: PerformanceInterferenceModel?
- ): SimMachine {
+ override fun createMachine(model: MachineModel, interferenceId: String?): SimMachine {
require(canFit(model)) { "Machine does not fit" }
- val vm = VirtualMachine(model, performanceInterferenceModel)
+ val vm = VirtualMachine(model, interferenceId)
_vms.add(vm)
return vm
}
@@ -116,17 +114,18 @@ public abstract class SimAbstractHypervisor(
/**
* A virtual machine running on the hypervisor.
*
- * @property model The machine model of the virtual machine.
- * @property performanceInterferenceModel The performance interference model to utilize.
+ * @param model The machine model of the virtual machine.
*/
- private inner class VirtualMachine(
- model: MachineModel,
- val performanceInterferenceModel: PerformanceInterferenceModel? = null,
- ) : SimAbstractMachine(interpreter, parent = null, model) {
+ private inner class VirtualMachine(model: MachineModel, interferenceId: String? = null) : SimAbstractMachine(interpreter, parent = null, model) {
+ /**
+ * The interference key of this virtual machine.
+ */
+ private val interferenceKey = interferenceId?.let { interferenceDomain?.join(interferenceId) }
+
/**
* The vCPUs of the machine.
*/
- override val cpus = model.cpus.map { VCpu(switch.newOutput(), it) }
+ override val cpus = model.cpus.map { VCpu(switch.newOutput(interferenceKey), it) }
override fun close() {
super.close()
@@ -136,6 +135,9 @@ public abstract class SimAbstractHypervisor(
}
_vms.remove(this)
+ if (interferenceKey != null) {
+ interferenceDomain?.leave(interferenceKey)
+ }
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt
index 2ce51ea6..17130d34 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt
@@ -22,8 +22,10 @@
package org.opendc.simulator.compute.kernel
+import org.opendc.simulator.compute.SimMachine
import org.opendc.simulator.compute.SimMachineContext
import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor
+import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.resources.SimResourceInterpreter
@@ -32,20 +34,22 @@ import org.opendc.simulator.resources.SimResourceSwitchMaxMin
import org.opendc.simulator.resources.SimResourceSystem
/**
- * A [SimHypervisor] that distributes the computing requirements of multiple [SimWorkload] on a single
- * [SimBareMetalMachine] concurrently using weighted fair sharing.
+ * A [SimHypervisor] that distributes the computing requirements of multiple [SimWorkload]s on a single [SimMachine]
+ * concurrently using weighted fair sharing.
*
* @param interpreter The interpreter to manage the machine's resources.
* @param parent The parent simulation system.
* @param scalingGovernor The CPU frequency scaling governor to use for the hypervisor.
+ * @param interferenceDomain The resource interference domain to which the hypervisor belongs.
* @param listener The hypervisor listener to use.
*/
public class SimFairShareHypervisor(
private val interpreter: SimResourceInterpreter,
private val parent: SimResourceSystem? = null,
scalingGovernor: ScalingGovernor? = null,
+ interferenceDomain: VmInterferenceDomain? = null,
private val listener: SimHypervisor.Listener? = null
-) : SimAbstractHypervisor(interpreter, scalingGovernor) {
+) : SimAbstractHypervisor(interpreter, scalingGovernor, interferenceDomain) {
override fun canFit(model: MachineModel, switch: SimResourceSwitch): Boolean = true
@@ -54,7 +58,7 @@ public class SimFairShareHypervisor(
}
private inner class SwitchSystem(private val ctx: SimMachineContext) : SimResourceSystem {
- val switch = SimResourceSwitchMaxMin(interpreter, this)
+ val switch = SimResourceSwitchMaxMin(interpreter, this, interferenceDomain)
override val parent: SimResourceSystem? = this@SimFairShareHypervisor.parent
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt
index 542cd0d2..8d0592ec 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt
@@ -22,6 +22,8 @@
package org.opendc.simulator.compute.kernel
+import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor
+import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain
import org.opendc.simulator.resources.SimResourceInterpreter
import org.opendc.simulator.resources.SimResourceSystem
@@ -34,6 +36,14 @@ public class SimFairShareHypervisorProvider : SimHypervisorProvider {
override fun create(
interpreter: SimResourceInterpreter,
parent: SimResourceSystem?,
+ scalingGovernor: ScalingGovernor?,
+ interferenceDomain: VmInterferenceDomain?,
listener: SimHypervisor.Listener?
- ): SimHypervisor = SimFairShareHypervisor(interpreter, parent, listener = listener)
+ ): SimHypervisor = SimFairShareHypervisor(
+ interpreter,
+ parent,
+ scalingGovernor = scalingGovernor,
+ interferenceDomain = interferenceDomain,
+ listener = listener
+ )
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt
index 40402f5c..e398ab36 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt
@@ -23,13 +23,12 @@
package org.opendc.simulator.compute.kernel
import org.opendc.simulator.compute.SimMachine
-import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.workload.SimWorkload
/**
* A SimHypervisor facilitates the execution of multiple concurrent [SimWorkload]s, while acting as a single workload
- * to a [SimBareMetalMachine].
+ * to another [SimMachine].
*/
public interface SimHypervisor : SimWorkload {
/**
@@ -46,12 +45,9 @@ public interface SimHypervisor : SimWorkload {
* Create a [SimMachine] instance on which users may run a [SimWorkload].
*
* @param model The machine to create.
- * @param performanceInterferenceModel The performance interference model to use.
+ * @param interferenceId An identifier for the interference model.
*/
- public fun createMachine(
- model: MachineModel,
- performanceInterferenceModel: PerformanceInterferenceModel? = null
- ): SimMachine
+ public fun createMachine(model: MachineModel, interferenceId: String? = null): SimMachine
/**
* Event listener for hypervisor events.
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt
index cafd1ffc..b307a34d 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt
@@ -22,6 +22,8 @@
package org.opendc.simulator.compute.kernel
+import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor
+import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain
import org.opendc.simulator.resources.SimResourceInterpreter
import org.opendc.simulator.resources.SimResourceSystem
@@ -43,6 +45,8 @@ public interface SimHypervisorProvider {
public fun create(
interpreter: SimResourceInterpreter,
parent: SimResourceSystem? = null,
+ scalingGovernor: ScalingGovernor? = null,
+ interferenceDomain: VmInterferenceDomain? = null,
listener: SimHypervisor.Listener? = null
): SimHypervisor
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt
index 3ceebb9a..ac1c0250 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt
@@ -31,7 +31,7 @@ import org.opendc.simulator.resources.SimResourceSwitchExclusive
/**
* A [SimHypervisor] that allocates its sub-resources exclusively for the virtual machine that it hosts.
*/
-public class SimSpaceSharedHypervisor(interpreter: SimResourceInterpreter) : SimAbstractHypervisor(interpreter, null) {
+public class SimSpaceSharedHypervisor(interpreter: SimResourceInterpreter) : SimAbstractHypervisor(interpreter) {
override fun canFit(model: MachineModel, switch: SimResourceSwitch): Boolean {
return switch.inputs.size - switch.outputs.size >= model.cpus.size
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt
index fb47d9e5..3906cb9a 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt
@@ -22,6 +22,8 @@
package org.opendc.simulator.compute.kernel
+import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor
+import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain
import org.opendc.simulator.resources.SimResourceInterpreter
import org.opendc.simulator.resources.SimResourceSystem
@@ -34,6 +36,8 @@ public class SimSpaceSharedHypervisorProvider : SimHypervisorProvider {
override fun create(
interpreter: SimResourceInterpreter,
parent: SimResourceSystem?,
+ scalingGovernor: ScalingGovernor?,
+ interferenceDomain: VmInterferenceDomain?,
listener: SimHypervisor.Listener?
): SimHypervisor = SimSpaceSharedHypervisor(interpreter)
}
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/PerformanceInterferenceModelReader.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt
index f30e64cf..1801fcd0 100644
--- a/opendc-format/src/main/kotlin/org/opendc/format/trace/PerformanceInterferenceModelReader.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,18 +20,24 @@
* SOFTWARE.
*/
-package org.opendc.format.trace
+package org.opendc.simulator.compute.kernel.interference
-import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
-import java.io.Closeable
-import kotlin.random.Random
+import org.opendc.simulator.resources.interference.InterferenceDomain
+import org.opendc.simulator.resources.interference.InterferenceKey
/**
- * An interface for reading descriptions of performance interference models into memory.
+ * The interference domain of a hypervisor.
*/
-public interface PerformanceInterferenceModelReader : Closeable {
+public interface VmInterferenceDomain : InterferenceDomain {
/**
- * Construct a [PerformanceInterferenceModel].
+ * Join this interference domain.
+ *
+ * @param id The identifier of the virtual machine.
*/
- public fun construct(random: Random): Map<String, PerformanceInterferenceModel>
+ public fun join(id: String): InterferenceKey
+
+ /**
+ * Leave this interference domain.
+ */
+ public fun leave(key: InterferenceKey)
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceGroup.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceGroup.kt
new file mode 100644
index 00000000..708ddede
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceGroup.kt
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.compute.kernel.interference
+
+/**
+ * A group of virtual machines that together can interfere when operating on the same resources, causing performance
+ * variability.
+ */
+public data class VmInterferenceGroup(
+ /**
+ * The minimum load of the host before the interference occurs.
+ */
+ public val targetLoad: Double,
+
+ /**
+ * A score in [0, 1] representing the performance variability as a result of resource interference.
+ */
+ public val score: Double,
+
+ /**
+ * The members of this interference group.
+ */
+ public val members: Set<String>
+)
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt
new file mode 100644
index 00000000..c2e00c8e
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt
@@ -0,0 +1,170 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.compute.kernel.interference
+
+import org.opendc.simulator.resources.interference.InterferenceKey
+import java.util.*
+
+/**
+ * An interference model that models the resource interference between virtual machines on a host.
+ *
+ * @param groups The groups of virtual machines that interfere with each other.
+ * @param random The [Random] instance to select the affected virtual machines.
+ */
+public class VmInterferenceModel(
+ private val groups: List<VmInterferenceGroup>,
+ private val random: Random = Random(0)
+) {
+ /**
+ * Construct a new [VmInterferenceDomain].
+ */
+ public fun newDomain(): VmInterferenceDomain = object : VmInterferenceDomain {
+ /**
+ * The stateful groups of this domain.
+ */
+ private val groups = this@VmInterferenceModel.groups.map { GroupContext(it) }
+
+ /**
+ * The set of keys active in this domain.
+ */
+ private val keys = mutableSetOf<InterferenceKeyImpl>()
+
+ override fun join(id: String): InterferenceKey {
+ val key = InterferenceKeyImpl(id, groups.filter { id in it }.sortedBy { it.group.targetLoad })
+ keys += key
+ return key
+ }
+
+ override fun leave(key: InterferenceKey) {
+ if (key is InterferenceKeyImpl) {
+ keys -= key
+ key.leave()
+ }
+ }
+
+ override fun apply(key: InterferenceKey?, load: Double): Double {
+ if (key == null || key !is InterferenceKeyImpl) {
+ return 1.0
+ }
+
+ val ctx = key.findGroup(load)
+ val group = ctx?.group
+
+ // Apply performance penalty to (on average) only one of the VMs
+ return if (group != null && random.nextInt(group.members.size) == 0) {
+ group.score
+ } else {
+ 1.0
+ }
+ }
+
+ override fun toString(): String = "VmInterferenceDomain"
+ }
+
+ /**
+ * An interference key.
+ *
+ * @param id The identifier of the member.
+ * @param groups The groups to which the key belongs.
+ */
+ private inner class InterferenceKeyImpl(val id: String, private val groups: List<GroupContext>) : InterferenceKey {
+ init {
+ for (group in groups) {
+ group.join(this)
+ }
+ }
+
+ /**
+ * Find the active group that applies for the interference member.
+ */
+ fun findGroup(load: Double): GroupContext? {
+ // Find the first active group whose target load is lower than the current load
+ val index = groups.binarySearchBy(load) { it.group.targetLoad }
+ val target = if (index >= 0) index else -(index + 1)
+
+ // Check whether there are active groups ahead of the index
+ for (i in target until groups.size) {
+ val group = groups[i]
+ if (group.group.targetLoad > load) {
+ break
+ } else if (group.isActive) {
+ return group
+ }
+ }
+
+ // Check whether there are active groups before the index
+ for (i in (target - 1) downTo 0) {
+ val group = groups[i]
+ if (group.isActive) {
+ return group
+ }
+ }
+
+ return null
+ }
+
+ /**
+ * Leave all the groups.
+ */
+ fun leave() {
+ for (group in groups) {
+ group.leave(this)
+ }
+ }
+ }
+
+ /**
+ * A group context is used to track the active keys per interference group.
+ */
+ private inner class GroupContext(val group: VmInterferenceGroup) {
+ /**
+ * The active keys that are part of this group.
+ */
+ private val keys = mutableSetOf<InterferenceKeyImpl>()
+
+ /**
+ * A flag to indicate that the group is active.
+ */
+ val isActive
+ get() = keys.size > 1
+
+ /**
+ * Determine whether the specified [id] is part of this group.
+ */
+ operator fun contains(id: String): Boolean = id in group.members
+
+ /**
+ * Join this group with the specified [key].
+ */
+ fun join(key: InterferenceKeyImpl) {
+ keys += key
+ }
+
+ /**
+ * Leave this group with the specified [key].
+ */
+ fun leave(key: InterferenceKeyImpl) {
+ keys -= key
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
index 892d5223..a6d955ca 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
@@ -24,12 +24,9 @@ package org.opendc.simulator.compute
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.toList
+import org.junit.jupiter.api.*
import org.junit.jupiter.api.Assertions.assertArrayEquals
import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.BeforeEach
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertDoesNotThrow
-import org.junit.jupiter.api.assertThrows
import org.opendc.simulator.compute.device.SimNetworkAdapter
import org.opendc.simulator.compute.model.*
import org.opendc.simulator.compute.power.ConstantPowerModel
@@ -157,8 +154,10 @@ class SimMachineTest {
try {
coroutineScope {
launch { machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) }
- assertEquals(100.0, machine.psu.powerDraw)
- assertEquals(100.0, source.powerDraw)
+ assertAll(
+ { assertEquals(100.0, machine.psu.powerDraw) },
+ { assertEquals(100.0, source.powerDraw) }
+ )
}
} finally {
machine.close()
@@ -284,6 +283,7 @@ class SimMachineTest {
}
}
+ @Test
fun testDiskWriteUsage() = runBlockingSimulation {
val interpreter = SimResourceInterpreter(coroutineContext, clock)
val machine = SimBareMetalMachine(
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt
index 71d48a31..a61cba8d 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt
@@ -34,6 +34,8 @@ import org.junit.jupiter.api.assertAll
import org.junit.jupiter.api.assertDoesNotThrow
import org.opendc.simulator.compute.SimBareMetalMachine
import org.opendc.simulator.compute.kernel.cpufreq.PerformanceScalingGovernor
+import org.opendc.simulator.compute.kernel.interference.VmInterferenceGroup
+import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
@@ -223,4 +225,63 @@ internal class SimHypervisorTest {
machine.close()
}
+
+ @Test
+ fun testInterference() = runBlockingSimulation {
+ val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2)
+ val model = MachineModel(
+ cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 3200.0) },
+ memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
+ )
+
+ val groups = listOf(
+ VmInterferenceGroup(targetLoad = 0.0, score = 0.9, members = setOf("a", "b")),
+ VmInterferenceGroup(targetLoad = 0.0, score = 0.6, members = setOf("a", "c")),
+ VmInterferenceGroup(targetLoad = 0.1, score = 0.8, members = setOf("a", "n"))
+ )
+ val interferenceModel = VmInterferenceModel(groups)
+
+ val platform = SimResourceInterpreter(coroutineContext, clock)
+ val machine = SimBareMetalMachine(
+ platform, model, SimplePowerDriver(ConstantPowerModel(0.0))
+ )
+ val hypervisor = SimFairShareHypervisor(platform, interferenceDomain = interferenceModel.newDomain())
+
+ val duration = 5 * 60L
+ val workloadA =
+ SimTraceWorkload(
+ sequenceOf(
+ SimTraceWorkload.Fragment(duration * 1000, 0.0, 1),
+ SimTraceWorkload.Fragment(duration * 1000, 28.0, 1),
+ SimTraceWorkload.Fragment(duration * 1000, 3500.0, 1),
+ SimTraceWorkload.Fragment(duration * 1000, 183.0, 1)
+ ),
+ )
+ val workloadB =
+ SimTraceWorkload(
+ sequenceOf(
+ SimTraceWorkload.Fragment(duration * 1000, 0.0, 1),
+ SimTraceWorkload.Fragment(duration * 1000, 28.0, 1),
+ SimTraceWorkload.Fragment(duration * 1000, 3100.0, 1),
+ SimTraceWorkload.Fragment(duration * 1000, 73.0, 1)
+ )
+ )
+
+ launch {
+ machine.run(hypervisor)
+ }
+
+ coroutineScope {
+ launch {
+ val vm = hypervisor.createMachine(model, "a")
+ vm.run(workloadA)
+ vm.close()
+ }
+ val vm = hypervisor.createMachine(model, "b")
+ vm.run(workloadB)
+ vm.close()
+ }
+
+ machine.close()
+ }
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
index 84217278..8a24b3e7 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
@@ -116,8 +116,8 @@ public abstract class SimAbstractResourceAggregator(
updateCounters(ctx, work)
}
- override fun getRemainingWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double {
- return _inputConsumers.sumOf { it.remainingWork }
+ override fun getConsumedWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double {
+ return work - _inputConsumers.sumOf { it.remainingWork }
}
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt
index 6bfbfc99..f384582f 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt
@@ -22,6 +22,8 @@
package org.opendc.simulator.resources
+import org.opendc.simulator.resources.interference.InterferenceKey
+
/**
* A [SimResourceDistributor] distributes the capacity of some resource over multiple resource consumers.
*/
@@ -33,6 +35,8 @@ public interface SimResourceDistributor : SimResourceConsumer {
/**
* Create a new output for the distributor.
+ *
+ * @param key The key of the interference member to which the output belongs.
*/
- public fun newOutput(): SimResourceCloseableProvider
+ public fun newOutput(key: InterferenceKey? = null): SimResourceCloseableProvider
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
index d8fc8cb6..398797cf 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
@@ -22,15 +22,21 @@
package org.opendc.simulator.resources
-import kotlin.math.max
+import org.opendc.simulator.resources.interference.InterferenceDomain
+import org.opendc.simulator.resources.interference.InterferenceKey
import kotlin.math.min
/**
* A [SimResourceDistributor] that distributes the capacity of a resource over consumers using max-min fair sharing.
+ *
+ * @param interpreter The interpreter for managing the resource contexts.
+ * @param parent The parent resource system of the distributor.
+ * @param interferenceDomain The interference domain of the distributor.
*/
public class SimResourceDistributorMaxMin(
private val interpreter: SimResourceInterpreter,
- private val parent: SimResourceSystem? = null
+ private val parent: SimResourceSystem? = null,
+ private val interferenceDomain: InterferenceDomain? = null
) : SimResourceDistributor {
override val outputs: Set<SimResourceCloseableProvider>
get() = _outputs
@@ -56,9 +62,14 @@ public class SimResourceDistributorMaxMin(
*/
private var totalAllocatedSpeed = 0.0
+ /**
+ * The total requested speed for the output resources.
+ */
+ private var totalRequestedSpeed = 0.0
+
/* SimResourceDistributor */
- override fun newOutput(): SimResourceCloseableProvider {
- val provider = Output(ctx?.capacity ?: 0.0)
+ override fun newOutput(key: InterferenceKey?): SimResourceCloseableProvider {
+ val provider = Output(ctx?.capacity ?: 0.0, key)
_outputs.add(provider)
return provider
}
@@ -148,6 +159,7 @@ public class SimResourceDistributorMaxMin(
assert(deadline >= interpreter.clock.millis()) { "Deadline already passed" }
this.totalRequestedWork = totalRequestedWork
+ this.totalRequestedSpeed = totalRequestedSpeed
this.totalAllocatedSpeed = capacity - availableSpeed
val totalAllocatedWork = min(
totalRequestedWork,
@@ -169,7 +181,7 @@ public class SimResourceDistributorMaxMin(
/**
* An internal [SimResourceProvider] implementation for switch outputs.
*/
- private inner class Output(capacity: Double) :
+ private inner class Output(capacity: Double, private val key: InterferenceKey?) :
SimAbstractResourceProvider(interpreter, parent, capacity),
SimResourceCloseableProvider,
SimResourceProviderLogic,
@@ -216,7 +228,6 @@ public class SimResourceDistributorMaxMin(
check(!isClosed) { "Cannot re-use closed output" }
activeOutputs += this
-
interpreter.batch {
ctx.start()
// Interrupt the input to re-schedule the resources
@@ -262,19 +273,22 @@ public class SimResourceDistributorMaxMin(
lastCommandTimestamp = ctx.clock.millis()
}
- override fun getRemainingWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double {
+ override fun getConsumedWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double {
val totalRemainingWork = this@SimResourceDistributorMaxMin.ctx?.remainingWork ?: 0.0
- return if (work > 0.0) {
- // Compute the fraction of compute time allocated to the output
- val fraction = actualSpeed / totalAllocatedSpeed
+ // Compute the fraction of compute time allocated to the output
+ val fraction = actualSpeed / totalAllocatedSpeed
- // Compute the work that was actually granted to the output.
- val processingAvailable = max(0.0, totalRequestedWork - totalRemainingWork) * fraction
- max(0.0, work - processingAvailable)
+ // Compute the performance penalty due to resource interference
+ val perfScore = if (interferenceDomain != null) {
+ val load = totalAllocatedSpeed / requireNotNull(this@SimResourceDistributorMaxMin.ctx).capacity
+ interferenceDomain.apply(key, load)
} else {
- 0.0
+ 1.0
}
+
+ // Compute the work that was actually granted to the output.
+ return (totalRequestedWork - totalRemainingWork) * fraction * perfScore
}
/* Comparable */
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt
index 5231ecf5..17045557 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt
@@ -22,8 +22,6 @@
package org.opendc.simulator.resources
-import kotlin.math.max
-
/**
* The logic of a resource provider.
*/
@@ -63,19 +61,18 @@ public interface SimResourceProviderLogic {
public fun onFinish(ctx: SimResourceControllableContext)
/**
- * Get the remaining work to process after a resource consumption.
+ * Compute the amount of work that was consumed over the specified [duration].
*
- * @param work The size of the resource consumption.
- * @param speed The speed of consumption.
+ * @param work The total size of the resource consumption.
+ * @param speed The speed of the resource provider.
* @param duration The duration from the start of the consumption until now.
- * @return The amount of work remaining.
+ * @return The amount of work that was consumed by the resource provider.
*/
- public fun getRemainingWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double {
+ public fun getConsumedWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double {
return if (duration > 0L) {
- val processed = duration / 1000.0 * speed
- max(0.0, work - processed)
+ return (duration / 1000.0) * speed
} else {
- 0.0
+ work
}
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt
index f6e7b22f..d2aab634 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt
@@ -22,6 +22,8 @@
package org.opendc.simulator.resources
+import org.opendc.simulator.resources.interference.InterferenceKey
+
/**
* A [SimResourceSwitch] enables switching of capacity of multiple resources between multiple consumers.
*/
@@ -43,8 +45,10 @@ public interface SimResourceSwitch : AutoCloseable {
/**
* Create a new output on the switch.
+ *
+ * @param key The key of the interference member to which the output belongs.
*/
- public fun newOutput(): SimResourceCloseableProvider
+ public fun newOutput(key: InterferenceKey? = null): SimResourceCloseableProvider
/**
* Add the specified [input] to the switch.
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
index 4ff741ed..fbb541e5 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
@@ -22,6 +22,7 @@
package org.opendc.simulator.resources
+import org.opendc.simulator.resources.interference.InterferenceKey
import java.util.ArrayDeque
/**
@@ -61,7 +62,10 @@ public class SimResourceSwitchExclusive : SimResourceSwitch {
override fun toString(): String = "SimResourceCounters[demand=$demand,actual=$actual,overcommit=$overcommit]"
}
- override fun newOutput(): SimResourceCloseableProvider {
+ /**
+ * Add an output to the switch.
+ */
+ override fun newOutput(key: InterferenceKey?): SimResourceCloseableProvider {
check(!isClosed) { "Switch has been closed" }
check(availableResources.isNotEmpty()) { "No capacity to serve request" }
val forwarder = availableResources.poll()
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
index 50d58798..ceb5a1a4 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
@@ -22,13 +22,21 @@
package org.opendc.simulator.resources
+import org.opendc.simulator.resources.interference.InterferenceDomain
+import org.opendc.simulator.resources.interference.InterferenceKey
+
/**
* A [SimResourceSwitch] implementation that switches resource consumptions over the available resources using max-min
* fair sharing.
+ *
+ * @param interpreter The interpreter for managing the resource contexts.
+ * @param parent The parent resource system of the switch.
+ * @param interferenceDomain The interference domain of the switch.
*/
public class SimResourceSwitchMaxMin(
interpreter: SimResourceInterpreter,
- parent: SimResourceSystem? = null
+ parent: SimResourceSystem? = null,
+ interferenceDomain: InterferenceDomain? = null
) : SimResourceSwitch {
/**
* The output resource providers to which resource consumers can be attached.
@@ -61,7 +69,7 @@ public class SimResourceSwitchMaxMin(
/**
* The distributor to distribute the aggregated resources.
*/
- private val distributor = SimResourceDistributorMaxMin(interpreter, parent)
+ private val distributor = SimResourceDistributorMaxMin(interpreter, parent, interferenceDomain)
init {
aggregator.startConsumer(distributor)
@@ -70,10 +78,10 @@ public class SimResourceSwitchMaxMin(
/**
* Add an output to the switch.
*/
- override fun newOutput(): SimResourceCloseableProvider {
+ override fun newOutput(key: InterferenceKey?): SimResourceCloseableProvider {
check(!isClosed) { "Switch has been closed" }
- return distributor.newOutput()
+ return distributor.newOutput(key)
}
/**
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt
index 90c7bc75..98fad068 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt
@@ -24,6 +24,7 @@ package org.opendc.simulator.resources.impl
import org.opendc.simulator.resources.*
import java.time.Clock
+import kotlin.math.max
import kotlin.math.min
/**
@@ -318,7 +319,7 @@ internal class SimResourceContextImpl(
*/
private fun computeRemainingWork(now: Long): Double {
return if (_work > 0.0)
- logic.getRemainingWork(this, _work, speed, now - _timestamp)
+ max(0.0, _work - logic.getConsumedWork(this, _work, speed, now - _timestamp))
else 0.0
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/interference/InterferenceDomain.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/interference/InterferenceDomain.kt
new file mode 100644
index 00000000..1066777f
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/interference/InterferenceDomain.kt
@@ -0,0 +1,19 @@
+package org.opendc.simulator.resources.interference
+
+import org.opendc.simulator.resources.SimResourceConsumer
+
+/**
+ * An interference domain represents a system of resources where [resource consumers][SimResourceConsumer] may incur
+ * performance variability due to operating on the same resources and therefore causing interference.
+ */
+public interface InterferenceDomain {
+ /**
+ * Compute the performance score of a participant in this interference domain.
+ *
+ * @param key The participant to obtain the score of or `null` if the participant has no key.
+ * @param load The overall load on the interference domain.
+ * @return A score representing the performance score to be applied to the resource consumer, with 1
+ * meaning no influence, <1 means that performance degrades, and >1 means that performance improves.
+ */
+ public fun apply(key: InterferenceKey?, load: Double): Double
+}
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/VmPlacementReader.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/interference/InterferenceKey.kt
index 6861affe..8b12e7b4 100644
--- a/opendc-format/src/main/kotlin/org/opendc/format/trace/VmPlacementReader.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/interference/InterferenceKey.kt
@@ -1,7 +1,5 @@
/*
- * MIT License
- *
- * Copyright (c) 2019 atlarge-research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -22,16 +20,9 @@
* SOFTWARE.
*/
-package org.opendc.format.trace
-
-import java.io.Closeable
+package org.opendc.simulator.resources.interference
/**
- * An interface for reading VM placement data into memory.
+ * A key that uniquely identifies a participant of an interference domain.
*/
-public interface VmPlacementReader : Closeable {
- /**
- * Construct a map of VMs to clusters.
- */
- public fun construct(): Map<String, String>
-}
+public interface InterferenceKey
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/runner/web/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/runner/web/Main.kt
index 09f7de35..d0b97d90 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/runner/web/Main.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/runner/web/Main.kt
@@ -47,14 +47,16 @@ import org.opendc.compute.service.scheduler.filters.ComputeFilter
import org.opendc.compute.service.scheduler.weights.*
import org.opendc.experiments.capelin.*
import org.opendc.experiments.capelin.model.Workload
-import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader
-import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader
+import org.opendc.experiments.capelin.trace.ParquetTraceReader
+import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader
+import org.opendc.experiments.capelin.trace.RawParquetTraceReader
import org.opendc.format.environment.EnvironmentReader
-import org.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader
+import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.telemetry.sdk.toOtelClock
import java.io.File
import kotlin.random.Random
+import kotlin.random.asJavaRandom
private val logger = KotlinLogging.logger {}
@@ -62,7 +64,7 @@ private val logger = KotlinLogging.logger {}
* Represents the CLI command for starting the OpenDC web runner.
*/
@OptIn(ExperimentalCoroutinesApi::class)
-public class RunnerCli : CliktCommand(name = "runner") {
+class RunnerCli : CliktCommand(name = "runner") {
/**
* The name of the database to use.
*/
@@ -167,8 +169,8 @@ public class RunnerCli : CliktCommand(name = "runner") {
tracePath,
scenario.getEmbedded(listOf("trace", "traceId"), String::class.java)
)
- val traceReader = Sc20RawParquetTraceReader(traceDir)
- val performanceInterferenceReader = let {
+ val traceReader = RawParquetTraceReader(traceDir)
+ val interferenceGroups = let {
val path = File(traceDir, "performance-interference-model.json")
val operational = scenario.get("operational", Document::class.java)
val enabled = operational.getBoolean("performanceInterferenceEnabled")
@@ -177,17 +179,18 @@ public class RunnerCli : CliktCommand(name = "runner") {
return@let null
}
- path.inputStream().use { Sc20PerformanceInterferenceReader(it) }
+ PerformanceInterferenceReader(path.inputStream()).use { reader -> reader.read() }
}
val targets = portfolio.get("targets", Document::class.java)
val topologyId = scenario.getEmbedded(listOf("topology", "topologyId"), ObjectId::class.java)
val environment = topologyParser.read(topologyId)
- val results = (0 until targets.getInteger("repeatsPerScenario")).map {
- logger.info { "Starting repeat $it" }
+ val results = (0 until targets.getInteger("repeatsPerScenario")).map { repeat ->
+ logger.info { "Starting repeat $repeat" }
withTimeout(runTimeout * 1000) {
- runRepeat(scenario, it, environment, traceReader, performanceInterferenceReader)
+ val interferenceModel = interferenceGroups?.let { VmInterferenceModel(it, Random(repeat.toLong()).asJavaRandom()) }
+ runRepeat(scenario, repeat, environment, traceReader, interferenceModel)
}
}
@@ -203,8 +206,8 @@ public class RunnerCli : CliktCommand(name = "runner") {
scenario: Document,
repeat: Int,
environment: EnvironmentReader,
- traceReader: Sc20RawParquetTraceReader,
- performanceInterferenceReader: Sc20PerformanceInterferenceReader?
+ traceReader: RawParquetTraceReader,
+ interferenceModel: VmInterferenceModel?
): WebExperimentMonitor.Result {
val monitor = WebExperimentMonitor()
@@ -267,16 +270,14 @@ public class RunnerCli : CliktCommand(name = "runner") {
else -> throw IllegalArgumentException("Unknown policy $policyName")
}
- val performanceInterferenceModel = performanceInterferenceReader?.construct(seeder) ?: emptyMap()
- val trace = Sc20ParquetTraceReader(
+ val trace = ParquetTraceReader(
listOf(traceReader),
- performanceInterferenceModel,
Workload(workloadName, workloadFraction),
seed
)
val failureFrequency = if (operational.getBoolean("failuresEnabled", false)) 24.0 * 7 else 0.0
- withComputeService(clock, meterProvider, environment, allocationPolicy) { scheduler ->
+ withComputeService(clock, meterProvider, environment, allocationPolicy, interferenceModel) { scheduler ->
val failureDomain = if (failureFrequency > 0) {
logger.debug { "ENABLING failures" }
createFailureDomain(
@@ -377,4 +378,4 @@ public class RunnerCli : CliktCommand(name = "runner") {
/**
* Main entry point of the runner.
*/
-public fun main(args: Array<String>): Unit = RunnerCli().main(args)
+fun main(args: Array<String>): Unit = RunnerCli().main(args)
diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt
index 413112af..38c774a9 100644
--- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt
+++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt
@@ -25,7 +25,6 @@ package org.opendc.workflow.service
import io.opentelemetry.api.metrics.MeterProvider
import io.opentelemetry.sdk.metrics.SdkMeterProvider
import io.opentelemetry.sdk.metrics.export.MetricProducer
-import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
@@ -57,7 +56,6 @@ import kotlin.math.max
* Integration test suite for the [WorkflowServiceImpl].
*/
@DisplayName("WorkflowServiceImpl")
-@OptIn(ExperimentalCoroutinesApi::class)
internal class WorkflowServiceIntegrationTest {
/**
* A large integration test where we check whether all tasks in some trace are executed correctly.
@@ -70,7 +68,7 @@ internal class WorkflowServiceIntegrationTest {
.build()
val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val hosts = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json"))
+ val hosts = Sc18EnvironmentReader(checkNotNull(object {}.javaClass.getResourceAsStream("/environment.json")))
.use { it.read() }
.map { def ->
SimHost(
@@ -106,7 +104,7 @@ internal class WorkflowServiceIntegrationTest {
taskOrderPolicy = SubmissionTimeTaskOrderPolicy(),
)
- val reader = GwfTraceReader(object {}.javaClass.getResourceAsStream("/trace.gwf"))
+ val reader = GwfTraceReader(checkNotNull(object {}.javaClass.getResourceAsStream("/trace.gwf")))
var offset = Long.MIN_VALUE
coroutineScope {