summaryrefslogtreecommitdiff
path: root/opendc-experiments
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-experiments')
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt256
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt67
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt7
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt8
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeSchedulers.kt86
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt222
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt38
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt97
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt190
-rw-r--r--opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt3
10 files changed, 574 insertions, 400 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
deleted file mode 100644
index 8227bca9..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin
-
-import io.opentelemetry.api.metrics.MeterProvider
-import io.opentelemetry.sdk.metrics.SdkMeterProvider
-import kotlinx.coroutines.*
-import org.apache.commons.math3.distribution.LogNormalDistribution
-import org.apache.commons.math3.random.Well19937c
-import org.opendc.compute.api.*
-import org.opendc.compute.service.ComputeService
-import org.opendc.compute.service.scheduler.ComputeScheduler
-import org.opendc.compute.service.scheduler.FilterScheduler
-import org.opendc.compute.service.scheduler.ReplayScheduler
-import org.opendc.compute.service.scheduler.filters.ComputeFilter
-import org.opendc.compute.service.scheduler.filters.RamFilter
-import org.opendc.compute.service.scheduler.filters.VCpuFilter
-import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
-import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher
-import org.opendc.compute.service.scheduler.weights.RamWeigher
-import org.opendc.compute.service.scheduler.weights.VCpuWeigher
-import org.opendc.compute.simulator.SimHost
-import org.opendc.compute.simulator.failure.HostFaultInjector
-import org.opendc.compute.simulator.failure.StartStopHostFault
-import org.opendc.compute.simulator.failure.StochasticVictimSelector
-import org.opendc.experiments.capelin.env.EnvironmentReader
-import org.opendc.experiments.capelin.trace.TraceReader
-import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider
-import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
-import org.opendc.simulator.compute.power.SimplePowerDriver
-import org.opendc.simulator.compute.workload.SimTraceWorkload
-import org.opendc.simulator.compute.workload.SimWorkload
-import org.opendc.simulator.resources.SimResourceInterpreter
-import org.opendc.telemetry.compute.ComputeMonitor
-import org.opendc.telemetry.sdk.toOtelClock
-import java.time.Clock
-import kotlin.coroutines.CoroutineContext
-import kotlin.math.ln
-import kotlin.math.max
-import kotlin.random.Random
-
-/**
- * Obtain the [FaultInjector] to use for the experiments.
- */
-fun createFaultInjector(
- context: CoroutineContext,
- clock: Clock,
- hosts: Set<SimHost>,
- seed: Int,
- failureInterval: Double
-): HostFaultInjector {
- val rng = Well19937c(seed)
-
- // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009
- // GRID'5000
- return HostFaultInjector(
- context,
- clock,
- hosts,
- iat = LogNormalDistribution(rng, ln(failureInterval), 1.03),
- selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), Random(seed)),
- fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71))
- )
-}
-
-/**
- * Construct the environment for a simulated compute service..
- */
-suspend fun withComputeService(
- clock: Clock,
- meterProvider: MeterProvider,
- environmentReader: EnvironmentReader,
- scheduler: ComputeScheduler,
- interferenceModel: VmInterferenceModel? = null,
- block: suspend CoroutineScope.(ComputeService) -> Unit
-): Unit = coroutineScope {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val hosts = environmentReader
- .use { it.read() }
- .map { def ->
- SimHost(
- def.uid,
- def.name,
- def.model,
- def.meta,
- coroutineContext,
- interpreter,
- meterProvider.get("opendc-compute-simulator"),
- SimFairShareHypervisorProvider(),
- powerDriver = SimplePowerDriver(def.powerModel),
- interferenceDomain = interferenceModel?.newDomain()
- )
- }
-
- val serviceMeter = meterProvider.get("opendc-compute")
- val service =
- ComputeService(coroutineContext, clock, serviceMeter, scheduler)
-
- for (host in hosts) {
- service.addHost(host)
- }
-
- try {
- block(this, service)
- } finally {
- service.close()
- hosts.forEach(SimHost::close)
- }
-}
-
-/**
- * Process the trace.
- */
-suspend fun processTrace(
- clock: Clock,
- reader: TraceReader<SimWorkload>,
- scheduler: ComputeService,
- monitor: ComputeMonitor? = null,
-) {
- val client = scheduler.newClient()
- val watcher = object : ServerWatcher {
- override fun onStateChanged(server: Server, newState: ServerState) {
- monitor?.onStateChange(clock.millis(), server, newState)
- }
- }
-
- // Create new image for the virtual machine
- val image = client.newImage("vm-image")
-
- try {
- coroutineScope {
- var offset = Long.MIN_VALUE
-
- while (reader.hasNext()) {
- val entry = reader.next()
-
- if (offset < 0) {
- offset = entry.start - clock.millis()
- }
-
- // Make sure the trace entries are ordered by submission time
- assert(entry.start - offset >= 0) { "Invalid trace order" }
- delay(max(0, (entry.start - offset) - clock.millis()))
-
- launch {
- val workloadOffset = -offset + 300001
- val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace, workloadOffset)
-
- val server = client.newServer(
- entry.name,
- image,
- client.newFlavor(
- entry.name,
- entry.meta["cores"] as Int,
- entry.meta["required-memory"] as Long
- ),
- meta = entry.meta + mapOf("workload" to workload)
- )
- server.watch(watcher)
-
- // Wait for the server reach its end time
- val endTime = entry.meta["end-time"] as Long
- delay(endTime + workloadOffset - clock.millis() + 1)
-
- // Delete the server after reaching the end-time of the virtual machine
- server.delete()
- }
- }
- }
-
- yield()
- } finally {
- reader.close()
- client.close()
- }
-}
-
-/**
- * Create a [MeterProvider] instance for the experiment.
- */
-fun createMeterProvider(clock: Clock): MeterProvider {
- return SdkMeterProvider
- .builder()
- .setClock(clock.toOtelClock())
- .build()
-}
-
-/**
- * Create a [ComputeScheduler] for the experiment.
- */
-fun createComputeScheduler(allocationPolicy: String, seeder: Random, vmPlacements: Map<String, String> = emptyMap()): ComputeScheduler {
- val cpuAllocationRatio = 16.0
- val ramAllocationRatio = 1.5
- return when (allocationPolicy) {
- "mem" -> FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
- weighers = listOf(RamWeigher(multiplier = 1.0))
- )
- "mem-inv" -> FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
- weighers = listOf(RamWeigher(multiplier = -1.0))
- )
- "core-mem" -> FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
- weighers = listOf(CoreRamWeigher(multiplier = 1.0))
- )
- "core-mem-inv" -> FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
- weighers = listOf(CoreRamWeigher(multiplier = -1.0))
- )
- "active-servers" -> FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
- weighers = listOf(InstanceCountWeigher(multiplier = -1.0))
- )
- "active-servers-inv" -> FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
- weighers = listOf(InstanceCountWeigher(multiplier = 1.0))
- )
- "provisioned-cores" -> FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
- weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = 1.0))
- )
- "provisioned-cores-inv" -> FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
- weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = -1.0))
- )
- "random" -> FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
- weighers = emptyList(),
- subsetSize = Int.MAX_VALUE,
- random = java.util.Random(seeder.nextLong())
- )
- "replay" -> ReplayScheduler(vmPlacements)
- else -> throw IllegalArgumentException("Unknown policy $allocationPolicy")
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index 82794471..f7f9336e 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -23,10 +23,7 @@
package org.opendc.experiments.capelin
import com.typesafe.config.ConfigFactory
-import io.opentelemetry.sdk.metrics.export.MetricProducer
-import kotlinx.coroutines.ExperimentalCoroutinesApi
import mu.KotlinLogging
-import org.opendc.compute.simulator.SimHost
import org.opendc.experiments.capelin.env.ClusterEnvironmentReader
import org.opendc.experiments.capelin.export.parquet.ParquetExportMonitor
import org.opendc.experiments.capelin.model.CompositeWorkload
@@ -36,17 +33,21 @@ import org.opendc.experiments.capelin.model.Workload
import org.opendc.experiments.capelin.trace.ParquetTraceReader
import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader
import org.opendc.experiments.capelin.trace.RawParquetTraceReader
+import org.opendc.experiments.capelin.util.ComputeServiceSimulator
+import org.opendc.experiments.capelin.util.createComputeScheduler
import org.opendc.harness.dsl.Experiment
import org.opendc.harness.dsl.anyOf
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.telemetry.compute.ComputeMetricExporter
import org.opendc.telemetry.compute.collectServiceMetrics
-import org.opendc.telemetry.compute.withMonitor
+import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import java.io.File
import java.io.FileInputStream
+import java.time.Duration
import java.util.*
import java.util.concurrent.ConcurrentHashMap
-import kotlin.random.asKotlinRandom
+import kotlin.math.roundToLong
/**
* A portfolio represents a collection of scenarios are tested for the work.
@@ -97,28 +98,23 @@ abstract class Portfolio(name: String) : Experiment(name) {
/**
* Perform a single trial for this portfolio.
*/
- @OptIn(ExperimentalCoroutinesApi::class)
override fun doRun(repeat: Int): Unit = runBlockingSimulation {
val seeder = Random(repeat.toLong())
val environment = ClusterEnvironmentReader(File(config.getString("env-path"), "${topology.name}.txt"))
- val allocationPolicy = createComputeScheduler(allocationPolicy, seeder.asKotlinRandom(), vmPlacements)
-
- val meterProvider = createMeterProvider(clock)
val workload = workload
val workloadNames = if (workload is CompositeWorkload) {
workload.workloads.map { it.name }
} else {
listOf(workload.name)
}
-
val rawReaders = workloadNames.map { workloadName ->
traceReaders.computeIfAbsent(workloadName) {
logger.info { "Loading trace $workloadName" }
RawParquetTraceReader(File(config.getString("trace-path"), workloadName))
}
}
-
+ val trace = ParquetTraceReader(rawReaders, workload, seeder.nextInt())
val performanceInterferenceModel = if (operationalPhenomena.hasInterference)
PerformanceInterferenceReader()
.read(FileInputStream(config.getString("interference-model")))
@@ -126,43 +122,36 @@ abstract class Portfolio(name: String) : Experiment(name) {
else
null
- val trace = ParquetTraceReader(rawReaders, workload, seeder.nextInt())
+ val computeScheduler = createComputeScheduler(allocationPolicy, seeder, vmPlacements)
+ val failureModel =
+ if (operationalPhenomena.failureFrequency > 0)
+ grid5000(Duration.ofSeconds((operationalPhenomena.failureFrequency * 60).roundToLong()), seeder.nextInt())
+ else
+ null
+ val simulator = ComputeServiceSimulator(
+ coroutineContext,
+ clock,
+ computeScheduler,
+ environment.read(),
+ failureModel,
+ performanceInterferenceModel
+ )
val monitor = ParquetExportMonitor(
File(config.getString("output-path")),
"portfolio_id=$name/scenario_id=$id/run_id=$repeat",
4096
)
+ val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
- withComputeService(clock, meterProvider, environment, allocationPolicy, performanceInterferenceModel) { scheduler ->
- val faultInjector = if (operationalPhenomena.failureFrequency > 0) {
- logger.debug("ENABLING failures")
- createFaultInjector(
- coroutineContext,
- clock,
- scheduler.hosts.map { it as SimHost }.toSet(),
- seeder.nextInt(),
- operationalPhenomena.failureFrequency,
- )
- } else {
- null
- }
-
- withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
- faultInjector?.start()
- processTrace(
- clock,
- trace,
- scheduler,
- monitor
- )
- }
-
- faultInjector?.close()
- monitor.close()
+ try {
+ simulator.run(trace)
+ } finally {
+ simulator.close()
+ metricReader.close()
}
- val monitorResults = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
+ val monitorResults = collectServiceMetrics(clock.millis(), simulator.producers[0])
logger.debug {
"Finish " +
"SUBMIT=${monitorResults.instanceCount} " +
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt
index 7062a275..fa00fc35 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt
@@ -28,7 +28,6 @@ import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericRecordBuilder
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
-import org.opendc.compute.service.driver.HostState
import org.opendc.telemetry.compute.table.HostData
import java.io.File
@@ -46,8 +45,8 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) :
override fun convert(builder: GenericRecordBuilder, data: HostData) {
builder["timestamp"] = data.timestamp
- builder["host_id"] = data.host.name
- builder["powered_on"] = data.host.state == HostState.UP
+ builder["host_id"] = data.host.id
+ builder["powered_on"] = true
builder["uptime"] = data.uptime
builder["downtime"] = data.downtime
builder["total_work"] = data.totalWork
@@ -58,7 +57,7 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) :
builder["cpu_demand"] = data.cpuDemand
builder["power_draw"] = data.powerDraw
builder["num_instances"] = data.instanceCount
- builder["num_cpus"] = data.host.model.cpuCount
+ builder["num_cpus"] = data.host.cpuCount
}
override fun toString(): String = "host-writer"
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt
index 9904adde..bb2db4b7 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt
@@ -46,12 +46,12 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) :
override fun convert(builder: GenericRecordBuilder, data: ServerData) {
builder["timestamp"] = data.timestamp
- builder["server_id"] = data.server.uid.toString()
- builder["state"] = data.server.state
+ builder["server_id"] = data.server
+ // builder["state"] = data.server.state
builder["uptime"] = data.uptime
builder["downtime"] = data.downtime
- builder["num_vcpus"] = data.server.flavor.cpuCount
- builder["mem_capacity"] = data.server.flavor.memorySize
+ // builder["num_vcpus"] = data.server.flavor.cpuCount
+ // builder["mem_capacity"] = data.server.flavor.memorySize
}
override fun toString(): String = "server-writer"
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeSchedulers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeSchedulers.kt
new file mode 100644
index 00000000..3b7c3f0f
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeSchedulers.kt
@@ -0,0 +1,86 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+@file:JvmName("ComputeSchedulers")
+package org.opendc.experiments.capelin.util
+
+import org.opendc.compute.service.scheduler.ComputeScheduler
+import org.opendc.compute.service.scheduler.FilterScheduler
+import org.opendc.compute.service.scheduler.ReplayScheduler
+import org.opendc.compute.service.scheduler.filters.ComputeFilter
+import org.opendc.compute.service.scheduler.filters.RamFilter
+import org.opendc.compute.service.scheduler.filters.VCpuFilter
+import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
+import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher
+import org.opendc.compute.service.scheduler.weights.RamWeigher
+import org.opendc.compute.service.scheduler.weights.VCpuWeigher
+import java.util.*
+
+/**
+ * Create a [ComputeScheduler] for the experiment.
+ */
+fun createComputeScheduler(allocationPolicy: String, seeder: Random, vmPlacements: Map<String, String> = emptyMap()): ComputeScheduler {
+ val cpuAllocationRatio = 16.0
+ val ramAllocationRatio = 1.5
+ return when (allocationPolicy) {
+ "mem" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(RamWeigher(multiplier = 1.0))
+ )
+ "mem-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(RamWeigher(multiplier = -1.0))
+ )
+ "core-mem" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(CoreRamWeigher(multiplier = 1.0))
+ )
+ "core-mem-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(CoreRamWeigher(multiplier = -1.0))
+ )
+ "active-servers" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(InstanceCountWeigher(multiplier = -1.0))
+ )
+ "active-servers-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(InstanceCountWeigher(multiplier = 1.0))
+ )
+ "provisioned-cores" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = 1.0))
+ )
+ "provisioned-cores-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = -1.0))
+ )
+ "random" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = emptyList(),
+ subsetSize = Int.MAX_VALUE,
+ random = Random(seeder.nextLong())
+ )
+ "replay" -> ReplayScheduler(vmPlacements)
+ else -> throw IllegalArgumentException("Unknown policy $allocationPolicy")
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt
new file mode 100644
index 00000000..065a8c93
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt
@@ -0,0 +1,222 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.util
+
+import io.opentelemetry.sdk.metrics.SdkMeterProvider
+import io.opentelemetry.sdk.metrics.export.MetricProducer
+import io.opentelemetry.sdk.resources.Resource
+import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
+import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.yield
+import org.opendc.compute.service.ComputeService
+import org.opendc.compute.service.scheduler.ComputeScheduler
+import org.opendc.compute.simulator.SimHost
+import org.opendc.experiments.capelin.env.MachineDef
+import org.opendc.experiments.capelin.trace.TraceReader
+import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider
+import org.opendc.simulator.compute.kernel.SimHypervisorProvider
+import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
+import org.opendc.simulator.compute.power.SimplePowerDriver
+import org.opendc.simulator.compute.workload.SimTraceWorkload
+import org.opendc.simulator.compute.workload.SimWorkload
+import org.opendc.simulator.resources.SimResourceInterpreter
+import org.opendc.telemetry.compute.*
+import org.opendc.telemetry.sdk.toOtelClock
+import java.time.Clock
+import kotlin.coroutines.CoroutineContext
+import kotlin.math.max
+
+/**
+ * Helper class to manage a [ComputeService] simulation.
+ */
+class ComputeServiceSimulator(
+ private val context: CoroutineContext,
+ private val clock: Clock,
+ scheduler: ComputeScheduler,
+ machines: List<MachineDef>,
+ private val failureModel: FailureModel? = null,
+ interferenceModel: VmInterferenceModel? = null,
+ hypervisorProvider: SimHypervisorProvider = SimFairShareHypervisorProvider()
+) : AutoCloseable {
+ /**
+ * The [ComputeService] that has been configured by the manager.
+ */
+ val service: ComputeService
+
+ /**
+ * The [MetricProducer] that are used by the [ComputeService] and the simulated hosts.
+ */
+ val producers: List<MetricProducer>
+ get() = _metricProducers
+ private val _metricProducers = mutableListOf<MetricProducer>()
+
+ /**
+ * The [SimResourceInterpreter] to simulate the hosts.
+ */
+ private val interpreter = SimResourceInterpreter(context, clock)
+
+ /**
+ * The hosts that belong to this class.
+ */
+ private val hosts = mutableSetOf<SimHost>()
+
+ init {
+ val (service, serviceMeterProvider) = createService(scheduler)
+ this._metricProducers.add(serviceMeterProvider)
+ this.service = service
+
+ for (def in machines) {
+ val (host, hostMeterProvider) = createHost(def, hypervisorProvider, interferenceModel)
+ this._metricProducers.add(hostMeterProvider)
+ hosts.add(host)
+ this.service.addHost(host)
+ }
+ }
+
+ /**
+ * Run a simulation of the [ComputeService] by replaying the workload trace given by [reader].
+ */
+ suspend fun run(reader: TraceReader<SimWorkload>) {
+ val injector = failureModel?.createInjector(context, clock, service)
+ val client = service.newClient()
+
+ // Create new image for the virtual machine
+ val image = client.newImage("vm-image")
+
+ try {
+ coroutineScope {
+ // Start the fault injector
+ injector?.start()
+
+ var offset = Long.MIN_VALUE
+
+ while (reader.hasNext()) {
+ val entry = reader.next()
+
+ if (offset < 0) {
+ offset = entry.start - clock.millis()
+ }
+
+ // Make sure the trace entries are ordered by submission time
+ assert(entry.start - offset >= 0) { "Invalid trace order" }
+ delay(max(0, (entry.start - offset) - clock.millis()))
+
+ launch {
+ val workloadOffset = -offset + 300001
+ val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace, workloadOffset)
+
+ val server = client.newServer(
+ entry.name,
+ image,
+ client.newFlavor(
+ entry.name,
+ entry.meta["cores"] as Int,
+ entry.meta["required-memory"] as Long
+ ),
+ meta = entry.meta + mapOf("workload" to workload)
+ )
+
+ // Wait for the server reach its end time
+ val endTime = entry.meta["end-time"] as Long
+ delay(endTime + workloadOffset - clock.millis() + 1)
+
+ // Delete the server after reaching the end-time of the virtual machine
+ server.delete()
+ }
+ }
+ }
+
+ yield()
+ } finally {
+ injector?.close()
+ reader.close()
+ client.close()
+ }
+ }
+
+ override fun close() {
+ service.close()
+
+ for (host in hosts) {
+ host.close()
+ }
+
+ hosts.clear()
+ }
+
+ /**
+ * Construct a [ComputeService] instance.
+ */
+ private fun createService(scheduler: ComputeScheduler): Pair<ComputeService, SdkMeterProvider> {
+ val resource = Resource.builder()
+ .put(ResourceAttributes.SERVICE_NAME, "opendc-compute")
+ .build()
+
+ val meterProvider = SdkMeterProvider.builder()
+ .setClock(clock.toOtelClock())
+ .setResource(resource)
+ .build()
+
+ val service = ComputeService(context, clock, meterProvider, scheduler)
+ return service to meterProvider
+ }
+
+ /**
+ * Construct a [SimHost] instance for the specified [MachineDef].
+ */
+ private fun createHost(
+ def: MachineDef,
+ hypervisorProvider: SimHypervisorProvider,
+ interferenceModel: VmInterferenceModel? = null
+ ): Pair<SimHost, SdkMeterProvider> {
+ val resource = Resource.builder()
+ .put(HOST_ID, def.uid.toString())
+ .put(HOST_NAME, def.name)
+ .put(HOST_ARCH, ResourceAttributes.HostArchValues.AMD64)
+ .put(HOST_NCPUS, def.model.cpus.size)
+ .put(HOST_MEM_CAPACITY, def.model.memory.sumOf { it.size })
+ .build()
+
+ val meterProvider = SdkMeterProvider.builder()
+ .setClock(clock.toOtelClock())
+ .setResource(resource)
+ .build()
+
+ val host = SimHost(
+ def.uid,
+ def.name,
+ def.model,
+ def.meta,
+ context,
+ interpreter,
+ meterProvider,
+ hypervisorProvider,
+ powerDriver = SimplePowerDriver(def.powerModel),
+ interferenceDomain = interferenceModel?.newDomain()
+ )
+
+ return host to meterProvider
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt
new file mode 100644
index 00000000..83393896
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.util
+
+import org.opendc.compute.service.ComputeService
+import org.opendc.compute.simulator.failure.HostFaultInjector
+import java.time.Clock
+import kotlin.coroutines.CoroutineContext
+
+/**
+ * Factory interface for constructing [HostFaultInjector] for modeling failures of compute service hosts.
+ */
+interface FailureModel {
+ /**
+ * Construct a [HostFaultInjector] for the specified [service].
+ */
+ fun createInjector(context: CoroutineContext, clock: Clock, service: ComputeService): HostFaultInjector
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt
new file mode 100644
index 00000000..89b4a31c
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt
@@ -0,0 +1,97 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+@file:JvmName("FailureModels")
+package org.opendc.experiments.capelin
+
+import org.apache.commons.math3.distribution.LogNormalDistribution
+import org.apache.commons.math3.random.Well19937c
+import org.opendc.compute.service.ComputeService
+import org.opendc.compute.simulator.SimHost
+import org.opendc.compute.simulator.failure.HostFaultInjector
+import org.opendc.compute.simulator.failure.StartStopHostFault
+import org.opendc.compute.simulator.failure.StochasticVictimSelector
+import org.opendc.experiments.capelin.util.FailureModel
+import java.time.Clock
+import java.time.Duration
+import kotlin.coroutines.CoroutineContext
+import kotlin.math.ln
+import kotlin.random.Random
+
+/**
+ * Obtain a [FailureModel] based on the GRID'5000 failure trace.
+ *
+ * This fault injector uses parameters from the GRID'5000 failure trace as described in
+ * "A Framework for the Study of Grid Inter-Operation Mechanisms", A. Iosup, 2009.
+ */
+fun grid5000(failureInterval: Duration, seed: Int): FailureModel {
+ return object : FailureModel {
+ override fun createInjector(
+ context: CoroutineContext,
+ clock: Clock,
+ service: ComputeService
+ ): HostFaultInjector {
+ val rng = Well19937c(seed)
+ val hosts = service.hosts.map { it as SimHost }.toSet()
+
+ // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009
+ // GRID'5000
+ return HostFaultInjector(
+ context,
+ clock,
+ hosts,
+ iat = LogNormalDistribution(rng, ln(failureInterval.toHours().toDouble()), 1.03),
+ selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), Random(seed)),
+ fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71))
+ )
+ }
+
+ override fun toString(): String = "Grid5000FailureModel"
+ }
+}
+
+/**
+ * Obtain the [HostFaultInjector] to use for the experiments.
+ *
+ * This fault injector uses parameters from the GRID'5000 failure trace as described in
+ * "A Framework for the Study of Grid Inter-Operation Mechanisms", A. Iosup, 2009.
+ */
+fun createFaultInjector(
+ context: CoroutineContext,
+ clock: Clock,
+ hosts: Set<SimHost>,
+ seed: Int,
+ failureInterval: Double
+): HostFaultInjector {
+ val rng = Well19937c(seed)
+
+ // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009
+ // GRID'5000
+ return HostFaultInjector(
+ context,
+ clock,
+ hosts,
+ iat = LogNormalDistribution(rng, ln(failureInterval), 1.03),
+ selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), Random(seed)),
+ fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71))
+ )
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index cf88535d..f4cf3e5e 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -22,7 +22,6 @@
package org.opendc.experiments.capelin
-import io.opentelemetry.sdk.metrics.export.MetricProducer
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
@@ -32,7 +31,6 @@ import org.opendc.compute.service.scheduler.filters.ComputeFilter
import org.opendc.compute.service.scheduler.filters.RamFilter
import org.opendc.compute.service.scheduler.filters.VCpuFilter
import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
-import org.opendc.compute.simulator.SimHost
import org.opendc.experiments.capelin.env.ClusterEnvironmentReader
import org.opendc.experiments.capelin.env.EnvironmentReader
import org.opendc.experiments.capelin.model.Workload
@@ -40,15 +38,19 @@ import org.opendc.experiments.capelin.trace.ParquetTraceReader
import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader
import org.opendc.experiments.capelin.trace.RawParquetTraceReader
import org.opendc.experiments.capelin.trace.TraceReader
+import org.opendc.experiments.capelin.util.ComputeServiceSimulator
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.telemetry.compute.ComputeMetricExporter
import org.opendc.telemetry.compute.ComputeMonitor
import org.opendc.telemetry.compute.collectServiceMetrics
import org.opendc.telemetry.compute.table.HostData
-import org.opendc.telemetry.compute.withMonitor
+import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import java.io.File
+import java.time.Duration
import java.util.*
+import kotlin.math.roundToLong
/**
* An integration test suite for the Capelin experiments.
@@ -60,11 +62,20 @@ class CapelinIntegrationTest {
private lateinit var monitor: TestExperimentReporter
/**
+ * The [FilterScheduler] to use for all experiments.
+ */
+ private lateinit var computeScheduler: FilterScheduler
+
+ /**
* Setup the experimental environment.
*/
@BeforeEach
fun setUp() {
monitor = TestExperimentReporter()
+ computeScheduler = FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
+ weighers = listOf(CoreRamWeigher(multiplier = 1.0))
+ )
}
/**
@@ -72,26 +83,26 @@ class CapelinIntegrationTest {
*/
@Test
fun testLarge() = runBlockingSimulation {
- val allocationPolicy = FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
- weighers = listOf(CoreRamWeigher(multiplier = 1.0))
- )
val traceReader = createTestTraceReader()
val environmentReader = createTestEnvironmentReader()
- val meterProvider = createMeterProvider(clock)
- withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler ->
- withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
- processTrace(
- clock,
- traceReader,
- scheduler,
- monitor
- )
- }
+ val simulator = ComputeServiceSimulator(
+ coroutineContext,
+ clock,
+ computeScheduler,
+ environmentReader.read(),
+ )
+
+ val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
+
+ try {
+ simulator.run(traceReader)
+ } finally {
+ simulator.close()
+ metricReader.close()
}
- val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
+ val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0])
println(
"Finish " +
"SUBMIT=${serviceMetrics.instanceCount} " +
@@ -106,11 +117,11 @@ class CapelinIntegrationTest {
{ assertEquals(0, serviceMetrics.runningInstanceCount, "All VMs should finish after a run") },
{ assertEquals(0, serviceMetrics.failedInstanceCount, "No VM should not be unscheduled") },
{ assertEquals(0, serviceMetrics.queuedInstanceCount, "No VM should not be in the queue") },
- { assertEquals(220346369753, monitor.totalWork) { "Incorrect requested burst" } },
- { assertEquals(206667809529, monitor.totalGrantedWork) { "Incorrect granted burst" } },
- { assertEquals(1151611104, monitor.totalOvercommittedWork) { "Incorrect overcommitted burst" } },
+ { assertEquals(220346412191, monitor.totalWork) { "Incorrect requested burst" } },
+ { assertEquals(206667852689, monitor.totalGrantedWork) { "Incorrect granted burst" } },
+ { assertEquals(1151612221, monitor.totalOvercommittedWork) { "Incorrect overcommitted burst" } },
{ assertEquals(0, monitor.totalInterferedWork) { "Incorrect interfered burst" } },
- { assertEquals(1.8175860403178412E7, monitor.totalPowerDraw, 0.01) { "Incorrect power draw" } },
+ { assertEquals(9.088769763540529E7, monitor.totalPowerDraw, 0.01) { "Incorrect power draw" } },
)
}
@@ -120,27 +131,26 @@ class CapelinIntegrationTest {
@Test
fun testSmall() = runBlockingSimulation {
val seed = 1
- val allocationPolicy = FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
- weighers = listOf(CoreRamWeigher(multiplier = 1.0))
- )
val traceReader = createTestTraceReader(0.25, seed)
val environmentReader = createTestEnvironmentReader("single")
- val meterProvider = createMeterProvider(clock)
-
- withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler ->
- withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
- processTrace(
- clock,
- traceReader,
- scheduler,
- monitor
- )
- }
+ val simulator = ComputeServiceSimulator(
+ coroutineContext,
+ clock,
+ computeScheduler,
+ environmentReader.read(),
+ )
+
+ val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
+
+ try {
+ simulator.run(traceReader)
+ } finally {
+ simulator.close()
+ metricReader.close()
}
- val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
+ val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0])
println(
"Finish " +
"SUBMIT=${serviceMetrics.instanceCount} " +
@@ -151,9 +161,9 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(39183961335, monitor.totalWork) { "Total requested work incorrect" } },
- { assertEquals(35649903197, monitor.totalGrantedWork) { "Total granted work incorrect" } },
- { assertEquals(1043641877, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } },
+ { assertEquals(39183965664, monitor.totalWork) { "Total work incorrect" } },
+ { assertEquals(35649907631, monitor.totalGrantedWork) { "Total granted work incorrect" } },
+ { assertEquals(1043642275, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } },
{ assertEquals(0, monitor.totalInterferedWork) { "Total interfered work incorrect" } }
)
}
@@ -164,10 +174,6 @@ class CapelinIntegrationTest {
@Test
fun testInterference() = runBlockingSimulation {
val seed = 1
- val allocationPolicy = FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
- weighers = listOf(CoreRamWeigher(multiplier = 1.0))
- )
val traceReader = createTestTraceReader(0.25, seed)
val environmentReader = createTestEnvironmentReader("single")
@@ -177,20 +183,24 @@ class CapelinIntegrationTest {
.read(perfInterferenceInput)
.let { VmInterferenceModel(it, Random(seed.toLong())) }
- val meterProvider = createMeterProvider(clock)
-
- withComputeService(clock, meterProvider, environmentReader, allocationPolicy, performanceInterferenceModel) { scheduler ->
- withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
- processTrace(
- clock,
- traceReader,
- scheduler,
- monitor
- )
- }
+ val simulator = ComputeServiceSimulator(
+ coroutineContext,
+ clock,
+ computeScheduler,
+ environmentReader.read(),
+ interferenceModel = performanceInterferenceModel
+ )
+
+ val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
+
+ try {
+ simulator.run(traceReader)
+ } finally {
+ simulator.close()
+ metricReader.close()
}
- val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
+ val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0])
println(
"Finish " +
"SUBMIT=${serviceMetrics.instanceCount} " +
@@ -201,10 +211,10 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(39183961335, monitor.totalWork) { "Total requested work incorrect" } },
- { assertEquals(35649903197, monitor.totalGrantedWork) { "Total granted work incorrect" } },
- { assertEquals(1043641877, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } },
- { assertEquals(2960970230, monitor.totalInterferedWork) { "Total interfered work incorrect" } }
+ { assertEquals(39183965664, monitor.totalWork) { "Total work incorrect" } },
+ { assertEquals(35649907631, monitor.totalGrantedWork) { "Total granted work incorrect" } },
+ { assertEquals(1043642275, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } },
+ { assertEquals(2960974524, monitor.totalInterferedWork) { "Total interfered work incorrect" } }
)
}
@@ -214,39 +224,27 @@ class CapelinIntegrationTest {
@Test
fun testFailures() = runBlockingSimulation {
val seed = 1
- val allocationPolicy = FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
- weighers = listOf(CoreRamWeigher(multiplier = 1.0))
- )
val traceReader = createTestTraceReader(0.25, seed)
val environmentReader = createTestEnvironmentReader("single")
- val meterProvider = createMeterProvider(clock)
-
- withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler ->
- val faultInjector =
- createFaultInjector(
- coroutineContext,
- clock,
- scheduler.hosts.map { it as SimHost }.toSet(),
- seed,
- 24.0 * 7,
- )
-
- withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
- faultInjector.start()
- processTrace(
- clock,
- traceReader,
- scheduler,
- monitor
- )
- }
-
- faultInjector.close()
+ val simulator = ComputeServiceSimulator(
+ coroutineContext,
+ clock,
+ computeScheduler,
+ environmentReader.read(),
+ grid5000(Duration.ofDays(7), seed)
+ )
+
+ val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
+
+ try {
+ simulator.run(traceReader)
+ } finally {
+ simulator.close()
+ metricReader.close()
}
- val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
+ val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0])
println(
"Finish " +
"SUBMIT=${serviceMetrics.instanceCount} " +
@@ -257,9 +255,9 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(38385852453, monitor.totalWork) { "Total requested work incorrect" } },
- { assertEquals(34886665781, monitor.totalGrantedWork) { "Total granted work incorrect" } },
- { assertEquals(979997253, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } },
+ { assertEquals(38385856700, monitor.totalWork) { "Total requested work incorrect" } },
+ { assertEquals(34886670127, monitor.totalGrantedWork) { "Total granted work incorrect" } },
+ { assertEquals(979997628, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } },
{ assertEquals(0, monitor.totalInterferedWork) { "Total interfered work incorrect" } }
)
}
@@ -291,10 +289,10 @@ class CapelinIntegrationTest {
var totalPowerDraw = 0.0
override fun record(data: HostData) {
- this.totalWork += data.totalWork.toLong()
- totalGrantedWork += data.grantedWork.toLong()
- totalOvercommittedWork += data.overcommittedWork.toLong()
- totalInterferedWork += data.interferedWork.toLong()
+ this.totalWork += data.totalWork.roundToLong()
+ totalGrantedWork += data.grantedWork.roundToLong()
+ totalOvercommittedWork += data.overcommittedWork.roundToLong()
+ totalInterferedWork += data.interferedWork.roundToLong()
totalPowerDraw += data.powerDraw
}
}
diff --git a/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt b/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt
index 650416f5..3312d6c0 100644
--- a/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt
+++ b/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt
@@ -46,6 +46,7 @@ import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.telemetry.sdk.toOtelClock
import java.io.File
+import java.time.Duration
import java.util.*
import kotlin.math.max
@@ -85,7 +86,7 @@ public class ServerlessExperiment : Experiment("Serverless") {
val delayInjector = StochasticDelayInjector(coldStartModel, Random())
val deployer = SimFunctionDeployer(clock, this, createMachineModel(), delayInjector) { FunctionTraceWorkload(traceById.getValue(it.name)) }
val service =
- FaaSService(coroutineContext, clock, meterProvider.get("opendc-serverless"), deployer, routingPolicy, FunctionTerminationPolicyFixed(coroutineContext, clock, timeout = 10L * 60 * 1000))
+ FaaSService(coroutineContext, clock, meterProvider, deployer, routingPolicy, FunctionTerminationPolicyFixed(coroutineContext, clock, timeout = Duration.ofMinutes(10)))
val client = service.newClient()
coroutineScope {