diff options
Diffstat (limited to 'opendc-experiments/opendc-experiments-capelin/src')
9 files changed, 572 insertions, 399 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt deleted file mode 100644 index 8227bca9..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ /dev/null @@ -1,256 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin - -import io.opentelemetry.api.metrics.MeterProvider -import io.opentelemetry.sdk.metrics.SdkMeterProvider -import kotlinx.coroutines.* -import org.apache.commons.math3.distribution.LogNormalDistribution -import org.apache.commons.math3.random.Well19937c -import org.opendc.compute.api.* -import org.opendc.compute.service.ComputeService -import org.opendc.compute.service.scheduler.ComputeScheduler -import org.opendc.compute.service.scheduler.FilterScheduler -import org.opendc.compute.service.scheduler.ReplayScheduler -import org.opendc.compute.service.scheduler.filters.ComputeFilter -import org.opendc.compute.service.scheduler.filters.RamFilter -import org.opendc.compute.service.scheduler.filters.VCpuFilter -import org.opendc.compute.service.scheduler.weights.CoreRamWeigher -import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher -import org.opendc.compute.service.scheduler.weights.RamWeigher -import org.opendc.compute.service.scheduler.weights.VCpuWeigher -import org.opendc.compute.simulator.SimHost -import org.opendc.compute.simulator.failure.HostFaultInjector -import org.opendc.compute.simulator.failure.StartStopHostFault -import org.opendc.compute.simulator.failure.StochasticVictimSelector -import org.opendc.experiments.capelin.env.EnvironmentReader -import org.opendc.experiments.capelin.trace.TraceReader -import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider -import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel -import org.opendc.simulator.compute.power.SimplePowerDriver -import org.opendc.simulator.compute.workload.SimTraceWorkload -import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.simulator.resources.SimResourceInterpreter -import org.opendc.telemetry.compute.ComputeMonitor -import org.opendc.telemetry.sdk.toOtelClock -import java.time.Clock -import kotlin.coroutines.CoroutineContext -import kotlin.math.ln -import kotlin.math.max -import kotlin.random.Random - -/** - * Obtain the [FaultInjector] to use for the experiments. - */ -fun createFaultInjector( - context: CoroutineContext, - clock: Clock, - hosts: Set<SimHost>, - seed: Int, - failureInterval: Double -): HostFaultInjector { - val rng = Well19937c(seed) - - // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 - // GRID'5000 - return HostFaultInjector( - context, - clock, - hosts, - iat = LogNormalDistribution(rng, ln(failureInterval), 1.03), - selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), Random(seed)), - fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71)) - ) -} - -/** - * Construct the environment for a simulated compute service.. - */ -suspend fun withComputeService( - clock: Clock, - meterProvider: MeterProvider, - environmentReader: EnvironmentReader, - scheduler: ComputeScheduler, - interferenceModel: VmInterferenceModel? = null, - block: suspend CoroutineScope.(ComputeService) -> Unit -): Unit = coroutineScope { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val hosts = environmentReader - .use { it.read() } - .map { def -> - SimHost( - def.uid, - def.name, - def.model, - def.meta, - coroutineContext, - interpreter, - meterProvider.get("opendc-compute-simulator"), - SimFairShareHypervisorProvider(), - powerDriver = SimplePowerDriver(def.powerModel), - interferenceDomain = interferenceModel?.newDomain() - ) - } - - val serviceMeter = meterProvider.get("opendc-compute") - val service = - ComputeService(coroutineContext, clock, serviceMeter, scheduler) - - for (host in hosts) { - service.addHost(host) - } - - try { - block(this, service) - } finally { - service.close() - hosts.forEach(SimHost::close) - } -} - -/** - * Process the trace. - */ -suspend fun processTrace( - clock: Clock, - reader: TraceReader<SimWorkload>, - scheduler: ComputeService, - monitor: ComputeMonitor? = null, -) { - val client = scheduler.newClient() - val watcher = object : ServerWatcher { - override fun onStateChanged(server: Server, newState: ServerState) { - monitor?.onStateChange(clock.millis(), server, newState) - } - } - - // Create new image for the virtual machine - val image = client.newImage("vm-image") - - try { - coroutineScope { - var offset = Long.MIN_VALUE - - while (reader.hasNext()) { - val entry = reader.next() - - if (offset < 0) { - offset = entry.start - clock.millis() - } - - // Make sure the trace entries are ordered by submission time - assert(entry.start - offset >= 0) { "Invalid trace order" } - delay(max(0, (entry.start - offset) - clock.millis())) - - launch { - val workloadOffset = -offset + 300001 - val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace, workloadOffset) - - val server = client.newServer( - entry.name, - image, - client.newFlavor( - entry.name, - entry.meta["cores"] as Int, - entry.meta["required-memory"] as Long - ), - meta = entry.meta + mapOf("workload" to workload) - ) - server.watch(watcher) - - // Wait for the server reach its end time - val endTime = entry.meta["end-time"] as Long - delay(endTime + workloadOffset - clock.millis() + 1) - - // Delete the server after reaching the end-time of the virtual machine - server.delete() - } - } - } - - yield() - } finally { - reader.close() - client.close() - } -} - -/** - * Create a [MeterProvider] instance for the experiment. - */ -fun createMeterProvider(clock: Clock): MeterProvider { - return SdkMeterProvider - .builder() - .setClock(clock.toOtelClock()) - .build() -} - -/** - * Create a [ComputeScheduler] for the experiment. - */ -fun createComputeScheduler(allocationPolicy: String, seeder: Random, vmPlacements: Map<String, String> = emptyMap()): ComputeScheduler { - val cpuAllocationRatio = 16.0 - val ramAllocationRatio = 1.5 - return when (allocationPolicy) { - "mem" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(RamWeigher(multiplier = 1.0)) - ) - "mem-inv" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(RamWeigher(multiplier = -1.0)) - ) - "core-mem" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(CoreRamWeigher(multiplier = 1.0)) - ) - "core-mem-inv" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(CoreRamWeigher(multiplier = -1.0)) - ) - "active-servers" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(InstanceCountWeigher(multiplier = -1.0)) - ) - "active-servers-inv" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(InstanceCountWeigher(multiplier = 1.0)) - ) - "provisioned-cores" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = 1.0)) - ) - "provisioned-cores-inv" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = -1.0)) - ) - "random" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = emptyList(), - subsetSize = Int.MAX_VALUE, - random = java.util.Random(seeder.nextLong()) - ) - "replay" -> ReplayScheduler(vmPlacements) - else -> throw IllegalArgumentException("Unknown policy $allocationPolicy") - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 82794471..f7f9336e 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -23,10 +23,7 @@ package org.opendc.experiments.capelin import com.typesafe.config.ConfigFactory -import io.opentelemetry.sdk.metrics.export.MetricProducer -import kotlinx.coroutines.ExperimentalCoroutinesApi import mu.KotlinLogging -import org.opendc.compute.simulator.SimHost import org.opendc.experiments.capelin.env.ClusterEnvironmentReader import org.opendc.experiments.capelin.export.parquet.ParquetExportMonitor import org.opendc.experiments.capelin.model.CompositeWorkload @@ -36,17 +33,21 @@ import org.opendc.experiments.capelin.model.Workload import org.opendc.experiments.capelin.trace.ParquetTraceReader import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader import org.opendc.experiments.capelin.trace.RawParquetTraceReader +import org.opendc.experiments.capelin.util.ComputeServiceSimulator +import org.opendc.experiments.capelin.util.createComputeScheduler import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.collectServiceMetrics -import org.opendc.telemetry.compute.withMonitor +import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import java.io.File import java.io.FileInputStream +import java.time.Duration import java.util.* import java.util.concurrent.ConcurrentHashMap -import kotlin.random.asKotlinRandom +import kotlin.math.roundToLong /** * A portfolio represents a collection of scenarios are tested for the work. @@ -97,28 +98,23 @@ abstract class Portfolio(name: String) : Experiment(name) { /** * Perform a single trial for this portfolio. */ - @OptIn(ExperimentalCoroutinesApi::class) override fun doRun(repeat: Int): Unit = runBlockingSimulation { val seeder = Random(repeat.toLong()) val environment = ClusterEnvironmentReader(File(config.getString("env-path"), "${topology.name}.txt")) - val allocationPolicy = createComputeScheduler(allocationPolicy, seeder.asKotlinRandom(), vmPlacements) - - val meterProvider = createMeterProvider(clock) val workload = workload val workloadNames = if (workload is CompositeWorkload) { workload.workloads.map { it.name } } else { listOf(workload.name) } - val rawReaders = workloadNames.map { workloadName -> traceReaders.computeIfAbsent(workloadName) { logger.info { "Loading trace $workloadName" } RawParquetTraceReader(File(config.getString("trace-path"), workloadName)) } } - + val trace = ParquetTraceReader(rawReaders, workload, seeder.nextInt()) val performanceInterferenceModel = if (operationalPhenomena.hasInterference) PerformanceInterferenceReader() .read(FileInputStream(config.getString("interference-model"))) @@ -126,43 +122,36 @@ abstract class Portfolio(name: String) : Experiment(name) { else null - val trace = ParquetTraceReader(rawReaders, workload, seeder.nextInt()) + val computeScheduler = createComputeScheduler(allocationPolicy, seeder, vmPlacements) + val failureModel = + if (operationalPhenomena.failureFrequency > 0) + grid5000(Duration.ofSeconds((operationalPhenomena.failureFrequency * 60).roundToLong()), seeder.nextInt()) + else + null + val simulator = ComputeServiceSimulator( + coroutineContext, + clock, + computeScheduler, + environment.read(), + failureModel, + performanceInterferenceModel + ) val monitor = ParquetExportMonitor( File(config.getString("output-path")), "portfolio_id=$name/scenario_id=$id/run_id=$repeat", 4096 ) + val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) - withComputeService(clock, meterProvider, environment, allocationPolicy, performanceInterferenceModel) { scheduler -> - val faultInjector = if (operationalPhenomena.failureFrequency > 0) { - logger.debug("ENABLING failures") - createFaultInjector( - coroutineContext, - clock, - scheduler.hosts.map { it as SimHost }.toSet(), - seeder.nextInt(), - operationalPhenomena.failureFrequency, - ) - } else { - null - } - - withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { - faultInjector?.start() - processTrace( - clock, - trace, - scheduler, - monitor - ) - } - - faultInjector?.close() - monitor.close() + try { + simulator.run(trace) + } finally { + simulator.close() + metricReader.close() } - val monitorResults = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + val monitorResults = collectServiceMetrics(clock.millis(), simulator.producers[0]) logger.debug { "Finish " + "SUBMIT=${monitorResults.instanceCount} " + diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt index 7062a275..fa00fc35 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt @@ -28,7 +28,6 @@ import org.apache.avro.generic.GenericData import org.apache.avro.generic.GenericRecordBuilder import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetWriter -import org.opendc.compute.service.driver.HostState import org.opendc.telemetry.compute.table.HostData import java.io.File @@ -46,8 +45,8 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) : override fun convert(builder: GenericRecordBuilder, data: HostData) { builder["timestamp"] = data.timestamp - builder["host_id"] = data.host.name - builder["powered_on"] = data.host.state == HostState.UP + builder["host_id"] = data.host.id + builder["powered_on"] = true builder["uptime"] = data.uptime builder["downtime"] = data.downtime builder["total_work"] = data.totalWork @@ -58,7 +57,7 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) : builder["cpu_demand"] = data.cpuDemand builder["power_draw"] = data.powerDraw builder["num_instances"] = data.instanceCount - builder["num_cpus"] = data.host.model.cpuCount + builder["num_cpus"] = data.host.cpuCount } override fun toString(): String = "host-writer" diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt index 9904adde..bb2db4b7 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt @@ -46,12 +46,12 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) : override fun convert(builder: GenericRecordBuilder, data: ServerData) { builder["timestamp"] = data.timestamp - builder["server_id"] = data.server.uid.toString() - builder["state"] = data.server.state + builder["server_id"] = data.server + // builder["state"] = data.server.state builder["uptime"] = data.uptime builder["downtime"] = data.downtime - builder["num_vcpus"] = data.server.flavor.cpuCount - builder["mem_capacity"] = data.server.flavor.memorySize + // builder["num_vcpus"] = data.server.flavor.cpuCount + // builder["mem_capacity"] = data.server.flavor.memorySize } override fun toString(): String = "server-writer" diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeSchedulers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeSchedulers.kt new file mode 100644 index 00000000..3b7c3f0f --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeSchedulers.kt @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("ComputeSchedulers") +package org.opendc.experiments.capelin.util + +import org.opendc.compute.service.scheduler.ComputeScheduler +import org.opendc.compute.service.scheduler.FilterScheduler +import org.opendc.compute.service.scheduler.ReplayScheduler +import org.opendc.compute.service.scheduler.filters.ComputeFilter +import org.opendc.compute.service.scheduler.filters.RamFilter +import org.opendc.compute.service.scheduler.filters.VCpuFilter +import org.opendc.compute.service.scheduler.weights.CoreRamWeigher +import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher +import org.opendc.compute.service.scheduler.weights.RamWeigher +import org.opendc.compute.service.scheduler.weights.VCpuWeigher +import java.util.* + +/** + * Create a [ComputeScheduler] for the experiment. + */ +fun createComputeScheduler(allocationPolicy: String, seeder: Random, vmPlacements: Map<String, String> = emptyMap()): ComputeScheduler { + val cpuAllocationRatio = 16.0 + val ramAllocationRatio = 1.5 + return when (allocationPolicy) { + "mem" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(RamWeigher(multiplier = 1.0)) + ) + "mem-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(RamWeigher(multiplier = -1.0)) + ) + "core-mem" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(CoreRamWeigher(multiplier = 1.0)) + ) + "core-mem-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(CoreRamWeigher(multiplier = -1.0)) + ) + "active-servers" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(InstanceCountWeigher(multiplier = -1.0)) + ) + "active-servers-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(InstanceCountWeigher(multiplier = 1.0)) + ) + "provisioned-cores" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = 1.0)) + ) + "provisioned-cores-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = -1.0)) + ) + "random" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = emptyList(), + subsetSize = Int.MAX_VALUE, + random = Random(seeder.nextLong()) + ) + "replay" -> ReplayScheduler(vmPlacements) + else -> throw IllegalArgumentException("Unknown policy $allocationPolicy") + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt new file mode 100644 index 00000000..065a8c93 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt @@ -0,0 +1,222 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.util + +import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.metrics.export.MetricProducer +import io.opentelemetry.sdk.resources.Resource +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.yield +import org.opendc.compute.service.ComputeService +import org.opendc.compute.service.scheduler.ComputeScheduler +import org.opendc.compute.simulator.SimHost +import org.opendc.experiments.capelin.env.MachineDef +import org.opendc.experiments.capelin.trace.TraceReader +import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider +import org.opendc.simulator.compute.kernel.SimHypervisorProvider +import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel +import org.opendc.simulator.compute.power.SimplePowerDriver +import org.opendc.simulator.compute.workload.SimTraceWorkload +import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.simulator.resources.SimResourceInterpreter +import org.opendc.telemetry.compute.* +import org.opendc.telemetry.sdk.toOtelClock +import java.time.Clock +import kotlin.coroutines.CoroutineContext +import kotlin.math.max + +/** + * Helper class to manage a [ComputeService] simulation. + */ +class ComputeServiceSimulator( + private val context: CoroutineContext, + private val clock: Clock, + scheduler: ComputeScheduler, + machines: List<MachineDef>, + private val failureModel: FailureModel? = null, + interferenceModel: VmInterferenceModel? = null, + hypervisorProvider: SimHypervisorProvider = SimFairShareHypervisorProvider() +) : AutoCloseable { + /** + * The [ComputeService] that has been configured by the manager. + */ + val service: ComputeService + + /** + * The [MetricProducer] that are used by the [ComputeService] and the simulated hosts. + */ + val producers: List<MetricProducer> + get() = _metricProducers + private val _metricProducers = mutableListOf<MetricProducer>() + + /** + * The [SimResourceInterpreter] to simulate the hosts. + */ + private val interpreter = SimResourceInterpreter(context, clock) + + /** + * The hosts that belong to this class. + */ + private val hosts = mutableSetOf<SimHost>() + + init { + val (service, serviceMeterProvider) = createService(scheduler) + this._metricProducers.add(serviceMeterProvider) + this.service = service + + for (def in machines) { + val (host, hostMeterProvider) = createHost(def, hypervisorProvider, interferenceModel) + this._metricProducers.add(hostMeterProvider) + hosts.add(host) + this.service.addHost(host) + } + } + + /** + * Run a simulation of the [ComputeService] by replaying the workload trace given by [reader]. + */ + suspend fun run(reader: TraceReader<SimWorkload>) { + val injector = failureModel?.createInjector(context, clock, service) + val client = service.newClient() + + // Create new image for the virtual machine + val image = client.newImage("vm-image") + + try { + coroutineScope { + // Start the fault injector + injector?.start() + + var offset = Long.MIN_VALUE + + while (reader.hasNext()) { + val entry = reader.next() + + if (offset < 0) { + offset = entry.start - clock.millis() + } + + // Make sure the trace entries are ordered by submission time + assert(entry.start - offset >= 0) { "Invalid trace order" } + delay(max(0, (entry.start - offset) - clock.millis())) + + launch { + val workloadOffset = -offset + 300001 + val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace, workloadOffset) + + val server = client.newServer( + entry.name, + image, + client.newFlavor( + entry.name, + entry.meta["cores"] as Int, + entry.meta["required-memory"] as Long + ), + meta = entry.meta + mapOf("workload" to workload) + ) + + // Wait for the server reach its end time + val endTime = entry.meta["end-time"] as Long + delay(endTime + workloadOffset - clock.millis() + 1) + + // Delete the server after reaching the end-time of the virtual machine + server.delete() + } + } + } + + yield() + } finally { + injector?.close() + reader.close() + client.close() + } + } + + override fun close() { + service.close() + + for (host in hosts) { + host.close() + } + + hosts.clear() + } + + /** + * Construct a [ComputeService] instance. + */ + private fun createService(scheduler: ComputeScheduler): Pair<ComputeService, SdkMeterProvider> { + val resource = Resource.builder() + .put(ResourceAttributes.SERVICE_NAME, "opendc-compute") + .build() + + val meterProvider = SdkMeterProvider.builder() + .setClock(clock.toOtelClock()) + .setResource(resource) + .build() + + val service = ComputeService(context, clock, meterProvider, scheduler) + return service to meterProvider + } + + /** + * Construct a [SimHost] instance for the specified [MachineDef]. + */ + private fun createHost( + def: MachineDef, + hypervisorProvider: SimHypervisorProvider, + interferenceModel: VmInterferenceModel? = null + ): Pair<SimHost, SdkMeterProvider> { + val resource = Resource.builder() + .put(HOST_ID, def.uid.toString()) + .put(HOST_NAME, def.name) + .put(HOST_ARCH, ResourceAttributes.HostArchValues.AMD64) + .put(HOST_NCPUS, def.model.cpus.size) + .put(HOST_MEM_CAPACITY, def.model.memory.sumOf { it.size }) + .build() + + val meterProvider = SdkMeterProvider.builder() + .setClock(clock.toOtelClock()) + .setResource(resource) + .build() + + val host = SimHost( + def.uid, + def.name, + def.model, + def.meta, + context, + interpreter, + meterProvider, + hypervisorProvider, + powerDriver = SimplePowerDriver(def.powerModel), + interferenceDomain = interferenceModel?.newDomain() + ) + + return host to meterProvider + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt new file mode 100644 index 00000000..83393896 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.util + +import org.opendc.compute.service.ComputeService +import org.opendc.compute.simulator.failure.HostFaultInjector +import java.time.Clock +import kotlin.coroutines.CoroutineContext + +/** + * Factory interface for constructing [HostFaultInjector] for modeling failures of compute service hosts. + */ +interface FailureModel { + /** + * Construct a [HostFaultInjector] for the specified [service]. + */ + fun createInjector(context: CoroutineContext, clock: Clock, service: ComputeService): HostFaultInjector +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt new file mode 100644 index 00000000..89b4a31c --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt @@ -0,0 +1,97 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("FailureModels") +package org.opendc.experiments.capelin + +import org.apache.commons.math3.distribution.LogNormalDistribution +import org.apache.commons.math3.random.Well19937c +import org.opendc.compute.service.ComputeService +import org.opendc.compute.simulator.SimHost +import org.opendc.compute.simulator.failure.HostFaultInjector +import org.opendc.compute.simulator.failure.StartStopHostFault +import org.opendc.compute.simulator.failure.StochasticVictimSelector +import org.opendc.experiments.capelin.util.FailureModel +import java.time.Clock +import java.time.Duration +import kotlin.coroutines.CoroutineContext +import kotlin.math.ln +import kotlin.random.Random + +/** + * Obtain a [FailureModel] based on the GRID'5000 failure trace. + * + * This fault injector uses parameters from the GRID'5000 failure trace as described in + * "A Framework for the Study of Grid Inter-Operation Mechanisms", A. Iosup, 2009. + */ +fun grid5000(failureInterval: Duration, seed: Int): FailureModel { + return object : FailureModel { + override fun createInjector( + context: CoroutineContext, + clock: Clock, + service: ComputeService + ): HostFaultInjector { + val rng = Well19937c(seed) + val hosts = service.hosts.map { it as SimHost }.toSet() + + // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 + // GRID'5000 + return HostFaultInjector( + context, + clock, + hosts, + iat = LogNormalDistribution(rng, ln(failureInterval.toHours().toDouble()), 1.03), + selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), Random(seed)), + fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71)) + ) + } + + override fun toString(): String = "Grid5000FailureModel" + } +} + +/** + * Obtain the [HostFaultInjector] to use for the experiments. + * + * This fault injector uses parameters from the GRID'5000 failure trace as described in + * "A Framework for the Study of Grid Inter-Operation Mechanisms", A. Iosup, 2009. + */ +fun createFaultInjector( + context: CoroutineContext, + clock: Clock, + hosts: Set<SimHost>, + seed: Int, + failureInterval: Double +): HostFaultInjector { + val rng = Well19937c(seed) + + // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 + // GRID'5000 + return HostFaultInjector( + context, + clock, + hosts, + iat = LogNormalDistribution(rng, ln(failureInterval), 1.03), + selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), Random(seed)), + fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71)) + ) +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index cf88535d..f4cf3e5e 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -22,7 +22,6 @@ package org.opendc.experiments.capelin -import io.opentelemetry.sdk.metrics.export.MetricProducer import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test @@ -32,7 +31,6 @@ import org.opendc.compute.service.scheduler.filters.ComputeFilter import org.opendc.compute.service.scheduler.filters.RamFilter import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.service.scheduler.weights.CoreRamWeigher -import org.opendc.compute.simulator.SimHost import org.opendc.experiments.capelin.env.ClusterEnvironmentReader import org.opendc.experiments.capelin.env.EnvironmentReader import org.opendc.experiments.capelin.model.Workload @@ -40,15 +38,19 @@ import org.opendc.experiments.capelin.trace.ParquetTraceReader import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader import org.opendc.experiments.capelin.trace.RawParquetTraceReader import org.opendc.experiments.capelin.trace.TraceReader +import org.opendc.experiments.capelin.util.ComputeServiceSimulator import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.ComputeMonitor import org.opendc.telemetry.compute.collectServiceMetrics import org.opendc.telemetry.compute.table.HostData -import org.opendc.telemetry.compute.withMonitor +import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import java.io.File +import java.time.Duration import java.util.* +import kotlin.math.roundToLong /** * An integration test suite for the Capelin experiments. @@ -60,11 +62,20 @@ class CapelinIntegrationTest { private lateinit var monitor: TestExperimentReporter /** + * The [FilterScheduler] to use for all experiments. + */ + private lateinit var computeScheduler: FilterScheduler + + /** * Setup the experimental environment. */ @BeforeEach fun setUp() { monitor = TestExperimentReporter() + computeScheduler = FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), + weighers = listOf(CoreRamWeigher(multiplier = 1.0)) + ) } /** @@ -72,26 +83,26 @@ class CapelinIntegrationTest { */ @Test fun testLarge() = runBlockingSimulation { - val allocationPolicy = FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), - weighers = listOf(CoreRamWeigher(multiplier = 1.0)) - ) val traceReader = createTestTraceReader() val environmentReader = createTestEnvironmentReader() - val meterProvider = createMeterProvider(clock) - withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler -> - withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { - processTrace( - clock, - traceReader, - scheduler, - monitor - ) - } + val simulator = ComputeServiceSimulator( + coroutineContext, + clock, + computeScheduler, + environmentReader.read(), + ) + + val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) + + try { + simulator.run(traceReader) + } finally { + simulator.close() + metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0]) println( "Finish " + "SUBMIT=${serviceMetrics.instanceCount} " + @@ -106,11 +117,11 @@ class CapelinIntegrationTest { { assertEquals(0, serviceMetrics.runningInstanceCount, "All VMs should finish after a run") }, { assertEquals(0, serviceMetrics.failedInstanceCount, "No VM should not be unscheduled") }, { assertEquals(0, serviceMetrics.queuedInstanceCount, "No VM should not be in the queue") }, - { assertEquals(220346369753, monitor.totalWork) { "Incorrect requested burst" } }, - { assertEquals(206667809529, monitor.totalGrantedWork) { "Incorrect granted burst" } }, - { assertEquals(1151611104, monitor.totalOvercommittedWork) { "Incorrect overcommitted burst" } }, + { assertEquals(220346412191, monitor.totalWork) { "Incorrect requested burst" } }, + { assertEquals(206667852689, monitor.totalGrantedWork) { "Incorrect granted burst" } }, + { assertEquals(1151612221, monitor.totalOvercommittedWork) { "Incorrect overcommitted burst" } }, { assertEquals(0, monitor.totalInterferedWork) { "Incorrect interfered burst" } }, - { assertEquals(1.8175860403178412E7, monitor.totalPowerDraw, 0.01) { "Incorrect power draw" } }, + { assertEquals(9.088769763540529E7, monitor.totalPowerDraw, 0.01) { "Incorrect power draw" } }, ) } @@ -120,27 +131,26 @@ class CapelinIntegrationTest { @Test fun testSmall() = runBlockingSimulation { val seed = 1 - val allocationPolicy = FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), - weighers = listOf(CoreRamWeigher(multiplier = 1.0)) - ) val traceReader = createTestTraceReader(0.25, seed) val environmentReader = createTestEnvironmentReader("single") - val meterProvider = createMeterProvider(clock) - - withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler -> - withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { - processTrace( - clock, - traceReader, - scheduler, - monitor - ) - } + val simulator = ComputeServiceSimulator( + coroutineContext, + clock, + computeScheduler, + environmentReader.read(), + ) + + val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) + + try { + simulator.run(traceReader) + } finally { + simulator.close() + metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0]) println( "Finish " + "SUBMIT=${serviceMetrics.instanceCount} " + @@ -151,9 +161,9 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(39183961335, monitor.totalWork) { "Total requested work incorrect" } }, - { assertEquals(35649903197, monitor.totalGrantedWork) { "Total granted work incorrect" } }, - { assertEquals(1043641877, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, + { assertEquals(39183965664, monitor.totalWork) { "Total work incorrect" } }, + { assertEquals(35649907631, monitor.totalGrantedWork) { "Total granted work incorrect" } }, + { assertEquals(1043642275, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, { assertEquals(0, monitor.totalInterferedWork) { "Total interfered work incorrect" } } ) } @@ -164,10 +174,6 @@ class CapelinIntegrationTest { @Test fun testInterference() = runBlockingSimulation { val seed = 1 - val allocationPolicy = FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), - weighers = listOf(CoreRamWeigher(multiplier = 1.0)) - ) val traceReader = createTestTraceReader(0.25, seed) val environmentReader = createTestEnvironmentReader("single") @@ -177,20 +183,24 @@ class CapelinIntegrationTest { .read(perfInterferenceInput) .let { VmInterferenceModel(it, Random(seed.toLong())) } - val meterProvider = createMeterProvider(clock) - - withComputeService(clock, meterProvider, environmentReader, allocationPolicy, performanceInterferenceModel) { scheduler -> - withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { - processTrace( - clock, - traceReader, - scheduler, - monitor - ) - } + val simulator = ComputeServiceSimulator( + coroutineContext, + clock, + computeScheduler, + environmentReader.read(), + interferenceModel = performanceInterferenceModel + ) + + val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) + + try { + simulator.run(traceReader) + } finally { + simulator.close() + metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0]) println( "Finish " + "SUBMIT=${serviceMetrics.instanceCount} " + @@ -201,10 +211,10 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(39183961335, monitor.totalWork) { "Total requested work incorrect" } }, - { assertEquals(35649903197, monitor.totalGrantedWork) { "Total granted work incorrect" } }, - { assertEquals(1043641877, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, - { assertEquals(2960970230, monitor.totalInterferedWork) { "Total interfered work incorrect" } } + { assertEquals(39183965664, monitor.totalWork) { "Total work incorrect" } }, + { assertEquals(35649907631, monitor.totalGrantedWork) { "Total granted work incorrect" } }, + { assertEquals(1043642275, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, + { assertEquals(2960974524, monitor.totalInterferedWork) { "Total interfered work incorrect" } } ) } @@ -214,39 +224,27 @@ class CapelinIntegrationTest { @Test fun testFailures() = runBlockingSimulation { val seed = 1 - val allocationPolicy = FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), - weighers = listOf(CoreRamWeigher(multiplier = 1.0)) - ) val traceReader = createTestTraceReader(0.25, seed) val environmentReader = createTestEnvironmentReader("single") - val meterProvider = createMeterProvider(clock) - - withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler -> - val faultInjector = - createFaultInjector( - coroutineContext, - clock, - scheduler.hosts.map { it as SimHost }.toSet(), - seed, - 24.0 * 7, - ) - - withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { - faultInjector.start() - processTrace( - clock, - traceReader, - scheduler, - monitor - ) - } - - faultInjector.close() + val simulator = ComputeServiceSimulator( + coroutineContext, + clock, + computeScheduler, + environmentReader.read(), + grid5000(Duration.ofDays(7), seed) + ) + + val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) + + try { + simulator.run(traceReader) + } finally { + simulator.close() + metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0]) println( "Finish " + "SUBMIT=${serviceMetrics.instanceCount} " + @@ -257,9 +255,9 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(38385852453, monitor.totalWork) { "Total requested work incorrect" } }, - { assertEquals(34886665781, monitor.totalGrantedWork) { "Total granted work incorrect" } }, - { assertEquals(979997253, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, + { assertEquals(38385856700, monitor.totalWork) { "Total requested work incorrect" } }, + { assertEquals(34886670127, monitor.totalGrantedWork) { "Total granted work incorrect" } }, + { assertEquals(979997628, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, { assertEquals(0, monitor.totalInterferedWork) { "Total interfered work incorrect" } } ) } @@ -291,10 +289,10 @@ class CapelinIntegrationTest { var totalPowerDraw = 0.0 override fun record(data: HostData) { - this.totalWork += data.totalWork.toLong() - totalGrantedWork += data.grantedWork.toLong() - totalOvercommittedWork += data.overcommittedWork.toLong() - totalInterferedWork += data.interferedWork.toLong() + this.totalWork += data.totalWork.roundToLong() + totalGrantedWork += data.grantedWork.roundToLong() + totalOvercommittedWork += data.overcommittedWork.roundToLong() + totalInterferedWork += data.interferedWork.roundToLong() totalPowerDraw += data.powerDraw } } |
