summaryrefslogtreecommitdiff
path: root/opendc-experiments
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-experiments')
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt256
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt77
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/AvroUtils.kt (renamed from opendc-experiments/opendc-experiments-energy21/build.gradle.kts)39
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt124
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt14
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt86
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt95
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt46
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeSchedulers.kt86
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt222
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt38
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt97
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt262
-rw-r--r--opendc-experiments/opendc-experiments-energy21/.gitignore3
-rw-r--r--opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt208
-rw-r--r--opendc-experiments/opendc-experiments-energy21/src/main/resources/application.conf14
-rw-r--r--opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt3
17 files changed, 889 insertions, 781 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
deleted file mode 100644
index 8227bca9..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin
-
-import io.opentelemetry.api.metrics.MeterProvider
-import io.opentelemetry.sdk.metrics.SdkMeterProvider
-import kotlinx.coroutines.*
-import org.apache.commons.math3.distribution.LogNormalDistribution
-import org.apache.commons.math3.random.Well19937c
-import org.opendc.compute.api.*
-import org.opendc.compute.service.ComputeService
-import org.opendc.compute.service.scheduler.ComputeScheduler
-import org.opendc.compute.service.scheduler.FilterScheduler
-import org.opendc.compute.service.scheduler.ReplayScheduler
-import org.opendc.compute.service.scheduler.filters.ComputeFilter
-import org.opendc.compute.service.scheduler.filters.RamFilter
-import org.opendc.compute.service.scheduler.filters.VCpuFilter
-import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
-import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher
-import org.opendc.compute.service.scheduler.weights.RamWeigher
-import org.opendc.compute.service.scheduler.weights.VCpuWeigher
-import org.opendc.compute.simulator.SimHost
-import org.opendc.compute.simulator.failure.HostFaultInjector
-import org.opendc.compute.simulator.failure.StartStopHostFault
-import org.opendc.compute.simulator.failure.StochasticVictimSelector
-import org.opendc.experiments.capelin.env.EnvironmentReader
-import org.opendc.experiments.capelin.trace.TraceReader
-import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider
-import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
-import org.opendc.simulator.compute.power.SimplePowerDriver
-import org.opendc.simulator.compute.workload.SimTraceWorkload
-import org.opendc.simulator.compute.workload.SimWorkload
-import org.opendc.simulator.resources.SimResourceInterpreter
-import org.opendc.telemetry.compute.ComputeMonitor
-import org.opendc.telemetry.sdk.toOtelClock
-import java.time.Clock
-import kotlin.coroutines.CoroutineContext
-import kotlin.math.ln
-import kotlin.math.max
-import kotlin.random.Random
-
-/**
- * Obtain the [FaultInjector] to use for the experiments.
- */
-fun createFaultInjector(
- context: CoroutineContext,
- clock: Clock,
- hosts: Set<SimHost>,
- seed: Int,
- failureInterval: Double
-): HostFaultInjector {
- val rng = Well19937c(seed)
-
- // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009
- // GRID'5000
- return HostFaultInjector(
- context,
- clock,
- hosts,
- iat = LogNormalDistribution(rng, ln(failureInterval), 1.03),
- selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), Random(seed)),
- fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71))
- )
-}
-
-/**
- * Construct the environment for a simulated compute service..
- */
-suspend fun withComputeService(
- clock: Clock,
- meterProvider: MeterProvider,
- environmentReader: EnvironmentReader,
- scheduler: ComputeScheduler,
- interferenceModel: VmInterferenceModel? = null,
- block: suspend CoroutineScope.(ComputeService) -> Unit
-): Unit = coroutineScope {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val hosts = environmentReader
- .use { it.read() }
- .map { def ->
- SimHost(
- def.uid,
- def.name,
- def.model,
- def.meta,
- coroutineContext,
- interpreter,
- meterProvider.get("opendc-compute-simulator"),
- SimFairShareHypervisorProvider(),
- powerDriver = SimplePowerDriver(def.powerModel),
- interferenceDomain = interferenceModel?.newDomain()
- )
- }
-
- val serviceMeter = meterProvider.get("opendc-compute")
- val service =
- ComputeService(coroutineContext, clock, serviceMeter, scheduler)
-
- for (host in hosts) {
- service.addHost(host)
- }
-
- try {
- block(this, service)
- } finally {
- service.close()
- hosts.forEach(SimHost::close)
- }
-}
-
-/**
- * Process the trace.
- */
-suspend fun processTrace(
- clock: Clock,
- reader: TraceReader<SimWorkload>,
- scheduler: ComputeService,
- monitor: ComputeMonitor? = null,
-) {
- val client = scheduler.newClient()
- val watcher = object : ServerWatcher {
- override fun onStateChanged(server: Server, newState: ServerState) {
- monitor?.onStateChange(clock.millis(), server, newState)
- }
- }
-
- // Create new image for the virtual machine
- val image = client.newImage("vm-image")
-
- try {
- coroutineScope {
- var offset = Long.MIN_VALUE
-
- while (reader.hasNext()) {
- val entry = reader.next()
-
- if (offset < 0) {
- offset = entry.start - clock.millis()
- }
-
- // Make sure the trace entries are ordered by submission time
- assert(entry.start - offset >= 0) { "Invalid trace order" }
- delay(max(0, (entry.start - offset) - clock.millis()))
-
- launch {
- val workloadOffset = -offset + 300001
- val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace, workloadOffset)
-
- val server = client.newServer(
- entry.name,
- image,
- client.newFlavor(
- entry.name,
- entry.meta["cores"] as Int,
- entry.meta["required-memory"] as Long
- ),
- meta = entry.meta + mapOf("workload" to workload)
- )
- server.watch(watcher)
-
- // Wait for the server reach its end time
- val endTime = entry.meta["end-time"] as Long
- delay(endTime + workloadOffset - clock.millis() + 1)
-
- // Delete the server after reaching the end-time of the virtual machine
- server.delete()
- }
- }
- }
-
- yield()
- } finally {
- reader.close()
- client.close()
- }
-}
-
-/**
- * Create a [MeterProvider] instance for the experiment.
- */
-fun createMeterProvider(clock: Clock): MeterProvider {
- return SdkMeterProvider
- .builder()
- .setClock(clock.toOtelClock())
- .build()
-}
-
-/**
- * Create a [ComputeScheduler] for the experiment.
- */
-fun createComputeScheduler(allocationPolicy: String, seeder: Random, vmPlacements: Map<String, String> = emptyMap()): ComputeScheduler {
- val cpuAllocationRatio = 16.0
- val ramAllocationRatio = 1.5
- return when (allocationPolicy) {
- "mem" -> FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
- weighers = listOf(RamWeigher(multiplier = 1.0))
- )
- "mem-inv" -> FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
- weighers = listOf(RamWeigher(multiplier = -1.0))
- )
- "core-mem" -> FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
- weighers = listOf(CoreRamWeigher(multiplier = 1.0))
- )
- "core-mem-inv" -> FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
- weighers = listOf(CoreRamWeigher(multiplier = -1.0))
- )
- "active-servers" -> FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
- weighers = listOf(InstanceCountWeigher(multiplier = -1.0))
- )
- "active-servers-inv" -> FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
- weighers = listOf(InstanceCountWeigher(multiplier = 1.0))
- )
- "provisioned-cores" -> FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
- weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = 1.0))
- )
- "provisioned-cores-inv" -> FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
- weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = -1.0))
- )
- "random" -> FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
- weighers = emptyList(),
- subsetSize = Int.MAX_VALUE,
- random = java.util.Random(seeder.nextLong())
- )
- "replay" -> ReplayScheduler(vmPlacements)
- else -> throw IllegalArgumentException("Unknown policy $allocationPolicy")
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index 82794471..6261ebbf 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -23,10 +23,7 @@
package org.opendc.experiments.capelin
import com.typesafe.config.ConfigFactory
-import io.opentelemetry.sdk.metrics.export.MetricProducer
-import kotlinx.coroutines.ExperimentalCoroutinesApi
import mu.KotlinLogging
-import org.opendc.compute.simulator.SimHost
import org.opendc.experiments.capelin.env.ClusterEnvironmentReader
import org.opendc.experiments.capelin.export.parquet.ParquetExportMonitor
import org.opendc.experiments.capelin.model.CompositeWorkload
@@ -36,17 +33,21 @@ import org.opendc.experiments.capelin.model.Workload
import org.opendc.experiments.capelin.trace.ParquetTraceReader
import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader
import org.opendc.experiments.capelin.trace.RawParquetTraceReader
+import org.opendc.experiments.capelin.util.ComputeServiceSimulator
+import org.opendc.experiments.capelin.util.createComputeScheduler
import org.opendc.harness.dsl.Experiment
import org.opendc.harness.dsl.anyOf
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.telemetry.compute.ComputeMetricExporter
import org.opendc.telemetry.compute.collectServiceMetrics
-import org.opendc.telemetry.compute.withMonitor
+import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import java.io.File
import java.io.FileInputStream
+import java.time.Duration
import java.util.*
import java.util.concurrent.ConcurrentHashMap
-import kotlin.random.asKotlinRandom
+import kotlin.math.roundToLong
/**
* A portfolio represents a collection of scenarios are tested for the work.
@@ -97,28 +98,23 @@ abstract class Portfolio(name: String) : Experiment(name) {
/**
* Perform a single trial for this portfolio.
*/
- @OptIn(ExperimentalCoroutinesApi::class)
override fun doRun(repeat: Int): Unit = runBlockingSimulation {
val seeder = Random(repeat.toLong())
val environment = ClusterEnvironmentReader(File(config.getString("env-path"), "${topology.name}.txt"))
- val allocationPolicy = createComputeScheduler(allocationPolicy, seeder.asKotlinRandom(), vmPlacements)
-
- val meterProvider = createMeterProvider(clock)
val workload = workload
val workloadNames = if (workload is CompositeWorkload) {
workload.workloads.map { it.name }
} else {
listOf(workload.name)
}
-
val rawReaders = workloadNames.map { workloadName ->
traceReaders.computeIfAbsent(workloadName) {
logger.info { "Loading trace $workloadName" }
RawParquetTraceReader(File(config.getString("trace-path"), workloadName))
}
}
-
+ val trace = ParquetTraceReader(rawReaders, workload, seeder.nextInt())
val performanceInterferenceModel = if (operationalPhenomena.hasInterference)
PerformanceInterferenceReader()
.read(FileInputStream(config.getString("interference-model")))
@@ -126,49 +122,44 @@ abstract class Portfolio(name: String) : Experiment(name) {
else
null
- val trace = ParquetTraceReader(rawReaders, workload, seeder.nextInt())
+ val computeScheduler = createComputeScheduler(allocationPolicy, seeder, vmPlacements)
+ val failureModel =
+ if (operationalPhenomena.failureFrequency > 0)
+ grid5000(Duration.ofSeconds((operationalPhenomena.failureFrequency * 60).roundToLong()), seeder.nextInt())
+ else
+ null
+ val simulator = ComputeServiceSimulator(
+ coroutineContext,
+ clock,
+ computeScheduler,
+ environment.read(),
+ failureModel,
+ performanceInterferenceModel
+ )
val monitor = ParquetExportMonitor(
File(config.getString("output-path")),
"portfolio_id=$name/scenario_id=$id/run_id=$repeat",
4096
)
+ val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
- withComputeService(clock, meterProvider, environment, allocationPolicy, performanceInterferenceModel) { scheduler ->
- val faultInjector = if (operationalPhenomena.failureFrequency > 0) {
- logger.debug("ENABLING failures")
- createFaultInjector(
- coroutineContext,
- clock,
- scheduler.hosts.map { it as SimHost }.toSet(),
- seeder.nextInt(),
- operationalPhenomena.failureFrequency,
- )
- } else {
- null
- }
-
- withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
- faultInjector?.start()
- processTrace(
- clock,
- trace,
- scheduler,
- monitor
- )
- }
-
- faultInjector?.close()
+ try {
+ simulator.run(trace)
+ } finally {
+ simulator.close()
+ metricReader.close()
monitor.close()
}
- val monitorResults = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
+ val monitorResults = collectServiceMetrics(clock.instant(), simulator.producers[0])
logger.debug {
- "Finish " +
- "SUBMIT=${monitorResults.instanceCount} " +
- "FAIL=${monitorResults.failedInstanceCount} " +
- "QUEUE=${monitorResults.queuedInstanceCount} " +
- "RUNNING=${monitorResults.activeHostCount}"
+ "Scheduler " +
+ "Success=${monitorResults.attemptsSuccess} " +
+ "Failure=${monitorResults.attemptsFailure} " +
+ "Error=${monitorResults.attemptsError} " +
+ "Pending=${monitorResults.serversPending} " +
+ "Active=${monitorResults.serversActive}"
}
}
}
diff --git a/opendc-experiments/opendc-experiments-energy21/build.gradle.kts b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/AvroUtils.kt
index cc58e5f1..a4676f31 100644
--- a/opendc-experiments/opendc-experiments-energy21/build.gradle.kts
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/AvroUtils.kt
@@ -20,28 +20,25 @@
* SOFTWARE.
*/
-description = "Experiments for the OpenDC Energy work"
+@file:JvmName("AvroUtils")
+package org.opendc.experiments.capelin.export.parquet
-/* Build configuration */
-plugins {
- `experiment-conventions`
- `testing-conventions`
-}
+import org.apache.avro.LogicalTypes
+import org.apache.avro.Schema
-dependencies {
- api(platform(projects.opendcPlatform))
- api(projects.opendcHarness.opendcHarnessApi)
- implementation(projects.opendcSimulator.opendcSimulatorCore)
- implementation(projects.opendcSimulator.opendcSimulatorCompute)
- implementation(projects.opendcCompute.opendcComputeSimulator)
- implementation(projects.opendcExperiments.opendcExperimentsCapelin)
- implementation(projects.opendcTelemetry.opendcTelemetrySdk)
- implementation(projects.opendcTelemetry.opendcTelemetryCompute)
- implementation(libs.kotlin.logging)
- implementation(libs.config)
+/**
+ * Schema for UUID type.
+ */
+internal val UUID_SCHEMA = LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING))
- implementation(libs.parquet) {
- exclude(group = "org.slf4j", module = "slf4j-log4j12")
- exclude(group = "log4j")
- }
+/**
+ * Schema for timestamp type.
+ */
+internal val TIMESTAMP_SCHEMA = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG))
+
+/**
+ * Helper function to make a [Schema] field optional.
+ */
+internal fun Schema.optional(): Schema {
+ return Schema.createUnion(Schema.create(Schema.Type.NULL), this)
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt
index c5cb80e2..e3d15c3b 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt
@@ -25,11 +25,12 @@ package org.opendc.experiments.capelin.export.parquet
import mu.KotlinLogging
import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
+import org.apache.avro.generic.GenericRecordBuilder
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetFileWriter
+import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.opendc.trace.util.parquet.LocalOutputFile
-import java.io.Closeable
import java.io.File
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.BlockingQueue
@@ -38,50 +39,94 @@ import kotlin.concurrent.thread
/**
* A writer that writes data in Parquet format.
*/
-public open class ParquetDataWriter<in T>(
- private val path: File,
+abstract class ParquetDataWriter<in T>(
+ path: File,
private val schema: Schema,
- private val converter: (T, GenericData.Record) -> Unit,
- private val bufferSize: Int = 4096
-) : Runnable, Closeable {
+ bufferSize: Int = 4096
+) : AutoCloseable {
/**
* The logging instance to use.
*/
private val logger = KotlinLogging.logger {}
/**
- * The writer to write the Parquet file.
+ * The queue of commands to process.
*/
- private val writer = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(path))
- .withSchema(schema)
- .withCompressionCodec(CompressionCodecName.SNAPPY)
- .withPageSize(4 * 1024 * 1024) // For compression
- .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
- .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
- .build()
+ private val queue: BlockingQueue<T> = ArrayBlockingQueue(bufferSize)
/**
- * The queue of commands to process.
+ * An exception to be propagated to the actual writer.
*/
- private val queue: BlockingQueue<Action> = ArrayBlockingQueue(bufferSize)
+ private var exception: Throwable? = null
/**
* The thread that is responsible for writing the Parquet records.
*/
- private val writerThread = thread(start = false, name = this.toString()) { run() }
+ private val writerThread = thread(start = false, name = this.toString()) {
+ val writer = let {
+ val builder = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(path))
+ .withSchema(schema)
+ .withCompressionCodec(CompressionCodecName.ZSTD)
+ .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+ buildWriter(builder)
+ }
+
+ val queue = queue
+ val buf = mutableListOf<T>()
+ var shouldStop = false
+
+ try {
+ while (!shouldStop) {
+ try {
+ process(writer, queue.take())
+ } catch (e: InterruptedException) {
+ shouldStop = true
+ }
+
+ if (queue.drainTo(buf) > 0) {
+ for (data in buf) {
+ process(writer, data)
+ }
+ buf.clear()
+ }
+ }
+ } catch (e: Throwable) {
+ logger.error(e) { "Failure in Parquet data writer" }
+ exception = e
+ } finally {
+ writer.close()
+ }
+ }
+
+ /**
+ * Build the [ParquetWriter] used to write the Parquet files.
+ */
+ protected open fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> {
+ return builder.build()
+ }
+
+ /**
+ * Convert the specified [data] into a Parquet record.
+ */
+ protected abstract fun convert(builder: GenericRecordBuilder, data: T)
/**
* Write the specified metrics to the database.
*/
- public fun write(event: T) {
- queue.put(Action.Write(event))
+ fun write(data: T) {
+ val exception = exception
+ if (exception != null) {
+ throw IllegalStateException("Writer thread failed", exception)
+ }
+
+ queue.put(data)
}
/**
* Signal the writer to stop.
*/
- public override fun close() {
- queue.put(Action.Stop)
+ override fun close() {
+ writerThread.interrupt()
writerThread.join()
}
@@ -90,38 +135,11 @@ public open class ParquetDataWriter<in T>(
}
/**
- * Start the writer thread.
+ * Process the specified [data] to be written to the Parquet file.
*/
- override fun run() {
- try {
- loop@ while (true) {
- val action = queue.take()
- when (action) {
- is Action.Stop -> break@loop
- is Action.Write<*> -> {
- val record = GenericData.Record(schema)
- @Suppress("UNCHECKED_CAST")
- converter(action.data as T, record)
- writer.write(record)
- }
- }
- }
- } catch (e: Throwable) {
- logger.error("Writer failed", e)
- } finally {
- writer.close()
- }
- }
-
- public sealed class Action {
- /**
- * A poison pill that will stop the writer thread.
- */
- public object Stop : Action()
-
- /**
- * Write the specified metrics to the database.
- */
- public data class Write<out T>(val data: T) : Action()
+ private fun process(writer: ParquetWriter<GenericData.Record>, data: T) {
+ val builder = GenericRecordBuilder(schema)
+ convert(builder, data)
+ writer.write(builder.build())
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt
index 79b84e9d..b057e932 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt
@@ -24,22 +24,33 @@ package org.opendc.experiments.capelin.export.parquet
import org.opendc.telemetry.compute.ComputeMonitor
import org.opendc.telemetry.compute.table.HostData
+import org.opendc.telemetry.compute.table.ServerData
import org.opendc.telemetry.compute.table.ServiceData
import java.io.File
/**
* A [ComputeMonitor] that logs the events to a Parquet file.
*/
-public class ParquetExportMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable {
+class ParquetExportMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable {
+ private val serverWriter = ParquetServerDataWriter(
+ File(base, "server/$partition/data.parquet").also { it.parentFile.mkdirs() },
+ bufferSize
+ )
+
private val hostWriter = ParquetHostDataWriter(
File(base, "host/$partition/data.parquet").also { it.parentFile.mkdirs() },
bufferSize
)
+
private val serviceWriter = ParquetServiceDataWriter(
File(base, "service/$partition/data.parquet").also { it.parentFile.mkdirs() },
bufferSize
)
+ override fun record(data: ServerData) {
+ serverWriter.write(data)
+ }
+
override fun record(data: HostData) {
hostWriter.write(data)
}
@@ -51,5 +62,6 @@ public class ParquetExportMonitor(base: File, partition: String, bufferSize: Int
override fun close() {
hostWriter.close()
serviceWriter.close()
+ serverWriter.close()
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt
index 8912c12e..58388cb1 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt
@@ -25,6 +25,9 @@ package org.opendc.experiments.capelin.export.parquet
import org.apache.avro.Schema
import org.apache.avro.SchemaBuilder
import org.apache.avro.generic.GenericData
+import org.apache.avro.generic.GenericRecordBuilder
+import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.parquet.hadoop.ParquetWriter
import org.opendc.telemetry.compute.table.HostData
import java.io.File
@@ -32,42 +35,67 @@ import java.io.File
* A Parquet event writer for [HostData]s.
*/
public class ParquetHostDataWriter(path: File, bufferSize: Int) :
- ParquetDataWriter<HostData>(path, schema, convert, bufferSize) {
+ ParquetDataWriter<HostData>(path, SCHEMA, bufferSize) {
- override fun toString(): String = "host-writer"
+ override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> {
+ return builder
+ .withDictionaryEncoding("host_id", true)
+ .build()
+ }
+
+ override fun convert(builder: GenericRecordBuilder, data: HostData) {
+ builder["timestamp"] = data.timestamp.toEpochMilli()
- public companion object {
- private val convert: (HostData, GenericData.Record) -> Unit = { data, record ->
- record.put("host_id", data.host.name)
- record.put("state", data.host.state.name)
- record.put("timestamp", data.timestamp)
- record.put("total_work", data.totalWork)
- record.put("granted_work", data.grantedWork)
- record.put("overcommitted_work", data.overcommittedWork)
- record.put("interfered_work", data.interferedWork)
- record.put("cpu_usage", data.cpuUsage)
- record.put("cpu_demand", data.cpuDemand)
- record.put("power_draw", data.powerDraw)
- record.put("instance_count", data.instanceCount)
- record.put("cores", data.host.model.cpuCount)
+ builder["host_id"] = data.host.id
+
+ builder["uptime"] = data.uptime
+ builder["downtime"] = data.downtime
+ val bootTime = data.bootTime
+ if (bootTime != null) {
+ builder["boot_time"] = bootTime.toEpochMilli()
}
- private val schema: Schema = SchemaBuilder
+ builder["cpu_count"] = data.host.cpuCount
+ builder["cpu_limit"] = data.cpuLimit
+ builder["cpu_time_active"] = data.cpuActiveTime
+ builder["cpu_time_idle"] = data.cpuIdleTime
+ builder["cpu_time_steal"] = data.cpuStealTime
+ builder["cpu_time_lost"] = data.cpuLostTime
+
+ builder["mem_limit"] = data.host.memCapacity
+
+ builder["power_total"] = data.powerTotal
+
+ builder["guests_terminated"] = data.guestsTerminated
+ builder["guests_running"] = data.guestsRunning
+ builder["guests_error"] = data.guestsError
+ builder["guests_invalid"] = data.guestsInvalid
+ }
+
+ override fun toString(): String = "host-writer"
+
+ companion object {
+ private val SCHEMA: Schema = SchemaBuilder
.record("host")
.namespace("org.opendc.telemetry.compute")
.fields()
- .name("timestamp").type().longType().noDefault()
- .name("host_id").type().stringType().noDefault()
- .name("state").type().stringType().noDefault()
- .name("requested_work").type().longType().noDefault()
- .name("granted_work").type().longType().noDefault()
- .name("overcommitted_work").type().longType().noDefault()
- .name("interfered_work").type().longType().noDefault()
- .name("cpu_usage").type().doubleType().noDefault()
- .name("cpu_demand").type().doubleType().noDefault()
- .name("power_draw").type().doubleType().noDefault()
- .name("instance_count").type().intType().noDefault()
- .name("cores").type().intType().noDefault()
+ .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault()
+ .name("host_id").type(UUID_SCHEMA).noDefault()
+ .requiredLong("uptime")
+ .requiredLong("downtime")
+ .name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault()
+ .requiredInt("cpu_count")
+ .requiredDouble("cpu_limit")
+ .requiredLong("cpu_time_active")
+ .requiredLong("cpu_time_idle")
+ .requiredLong("cpu_time_steal")
+ .requiredLong("cpu_time_lost")
+ .requiredLong("mem_limit")
+ .requiredDouble("power_total")
+ .requiredInt("guests_terminated")
+ .requiredInt("guests_running")
+ .requiredInt("guests_error")
+ .requiredInt("guests_invalid")
.endRecord()
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt
new file mode 100644
index 00000000..43b5f469
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt
@@ -0,0 +1,95 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.export.parquet
+
+import org.apache.avro.Schema
+import org.apache.avro.SchemaBuilder
+import org.apache.avro.generic.GenericData
+import org.apache.avro.generic.GenericRecordBuilder
+import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.parquet.hadoop.ParquetWriter
+import org.opendc.telemetry.compute.table.ServerData
+import java.io.File
+import java.util.*
+
+/**
+ * A Parquet event writer for [ServerData]s.
+ */
+public class ParquetServerDataWriter(path: File, bufferSize: Int) :
+ ParquetDataWriter<ServerData>(path, SCHEMA, bufferSize) {
+
+ override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> {
+ return builder
+ .withDictionaryEncoding("server_id", true)
+ .withDictionaryEncoding("host_id", true)
+ .build()
+ }
+
+ override fun convert(builder: GenericRecordBuilder, data: ServerData) {
+ builder["timestamp"] = data.timestamp.toEpochMilli()
+
+ builder["server_id"] = data.server.id
+ builder["host_id"] = data.host?.id
+
+ builder["uptime"] = data.uptime
+ builder["downtime"] = data.downtime
+ val bootTime = data.bootTime
+ if (bootTime != null) {
+ builder["boot_time"] = bootTime.toEpochMilli()
+ }
+ builder["scheduling_latency"] = data.schedulingLatency
+
+ builder["cpu_count"] = data.server.cpuCount
+ builder["cpu_limit"] = data.cpuLimit
+ builder["cpu_time_active"] = data.cpuActiveTime
+ builder["cpu_time_idle"] = data.cpuIdleTime
+ builder["cpu_time_steal"] = data.cpuStealTime
+ builder["cpu_time_lost"] = data.cpuLostTime
+
+ builder["mem_limit"] = data.server.memCapacity
+ }
+
+ override fun toString(): String = "server-writer"
+
+ companion object {
+ private val SCHEMA: Schema = SchemaBuilder
+ .record("server")
+ .namespace("org.opendc.telemetry.compute")
+ .fields()
+ .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault()
+ .name("server_id").type(UUID_SCHEMA).noDefault()
+ .name("host_id").type(UUID_SCHEMA.optional()).noDefault()
+ .requiredLong("uptime")
+ .requiredLong("downtime")
+ .name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault()
+ .requiredLong("scheduling_latency")
+ .requiredInt("cpu_count")
+ .requiredDouble("cpu_limit")
+ .requiredLong("cpu_time_active")
+ .requiredLong("cpu_time_idle")
+ .requiredLong("cpu_time_steal")
+ .requiredLong("cpu_time_lost")
+ .requiredLong("mem_limit")
+ .endRecord()
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt
index 36d630f3..2928f445 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt
@@ -24,7 +24,7 @@ package org.opendc.experiments.capelin.export.parquet
import org.apache.avro.Schema
import org.apache.avro.SchemaBuilder
-import org.apache.avro.generic.GenericData
+import org.apache.avro.generic.GenericRecordBuilder
import org.opendc.telemetry.compute.table.ServiceData
import java.io.File
@@ -32,34 +32,34 @@ import java.io.File
* A Parquet event writer for [ServiceData]s.
*/
public class ParquetServiceDataWriter(path: File, bufferSize: Int) :
- ParquetDataWriter<ServiceData>(path, schema, convert, bufferSize) {
+ ParquetDataWriter<ServiceData>(path, SCHEMA, bufferSize) {
- override fun toString(): String = "service-writer"
+ override fun convert(builder: GenericRecordBuilder, data: ServiceData) {
+ builder["timestamp"] = data.timestamp.toEpochMilli()
+ builder["hosts_up"] = data.hostsUp
+ builder["hosts_down"] = data.hostsDown
+ builder["servers_pending"] = data.serversPending
+ builder["servers_active"] = data.serversActive
+ builder["attempts_success"] = data.attemptsSuccess
+ builder["attempts_failure"] = data.attemptsFailure
+ builder["attempts_error"] = data.attemptsError
+ }
- public companion object {
- private val convert: (ServiceData, GenericData.Record) -> Unit = { data, record ->
- record.put("timestamp", data.timestamp)
- record.put("host_total_count", data.hostCount)
- record.put("host_available_count", data.activeHostCount)
- record.put("instance_total_count", data.instanceCount)
- record.put("instance_active_count", data.runningInstanceCount)
- record.put("instance_inactive_count", data.finishedInstanceCount)
- record.put("instance_waiting_count", data.queuedInstanceCount)
- record.put("instance_failed_count", data.failedInstanceCount)
- }
+ override fun toString(): String = "service-writer"
- private val schema: Schema = SchemaBuilder
+ companion object {
+ private val SCHEMA: Schema = SchemaBuilder
.record("service")
.namespace("org.opendc.telemetry.compute")
.fields()
- .name("timestamp").type().longType().noDefault()
- .name("host_total_count").type().intType().noDefault()
- .name("host_available_count").type().intType().noDefault()
- .name("instance_total_count").type().intType().noDefault()
- .name("instance_active_count").type().intType().noDefault()
- .name("instance_inactive_count").type().intType().noDefault()
- .name("instance_waiting_count").type().intType().noDefault()
- .name("instance_failed_count").type().intType().noDefault()
+ .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault()
+ .requiredInt("hosts_up")
+ .requiredInt("hosts_down")
+ .requiredInt("servers_pending")
+ .requiredInt("servers_active")
+ .requiredInt("attempts_success")
+ .requiredInt("attempts_failure")
+ .requiredInt("attempts_error")
.endRecord()
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeSchedulers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeSchedulers.kt
new file mode 100644
index 00000000..3b7c3f0f
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeSchedulers.kt
@@ -0,0 +1,86 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+@file:JvmName("ComputeSchedulers")
+package org.opendc.experiments.capelin.util
+
+import org.opendc.compute.service.scheduler.ComputeScheduler
+import org.opendc.compute.service.scheduler.FilterScheduler
+import org.opendc.compute.service.scheduler.ReplayScheduler
+import org.opendc.compute.service.scheduler.filters.ComputeFilter
+import org.opendc.compute.service.scheduler.filters.RamFilter
+import org.opendc.compute.service.scheduler.filters.VCpuFilter
+import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
+import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher
+import org.opendc.compute.service.scheduler.weights.RamWeigher
+import org.opendc.compute.service.scheduler.weights.VCpuWeigher
+import java.util.*
+
+/**
+ * Create a [ComputeScheduler] for the experiment.
+ */
+fun createComputeScheduler(allocationPolicy: String, seeder: Random, vmPlacements: Map<String, String> = emptyMap()): ComputeScheduler {
+ val cpuAllocationRatio = 16.0
+ val ramAllocationRatio = 1.5
+ return when (allocationPolicy) {
+ "mem" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(RamWeigher(multiplier = 1.0))
+ )
+ "mem-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(RamWeigher(multiplier = -1.0))
+ )
+ "core-mem" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(CoreRamWeigher(multiplier = 1.0))
+ )
+ "core-mem-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(CoreRamWeigher(multiplier = -1.0))
+ )
+ "active-servers" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(InstanceCountWeigher(multiplier = -1.0))
+ )
+ "active-servers-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(InstanceCountWeigher(multiplier = 1.0))
+ )
+ "provisioned-cores" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = 1.0))
+ )
+ "provisioned-cores-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = -1.0))
+ )
+ "random" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = emptyList(),
+ subsetSize = Int.MAX_VALUE,
+ random = Random(seeder.nextLong())
+ )
+ "replay" -> ReplayScheduler(vmPlacements)
+ else -> throw IllegalArgumentException("Unknown policy $allocationPolicy")
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt
new file mode 100644
index 00000000..065a8c93
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt
@@ -0,0 +1,222 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.util
+
+import io.opentelemetry.sdk.metrics.SdkMeterProvider
+import io.opentelemetry.sdk.metrics.export.MetricProducer
+import io.opentelemetry.sdk.resources.Resource
+import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
+import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.yield
+import org.opendc.compute.service.ComputeService
+import org.opendc.compute.service.scheduler.ComputeScheduler
+import org.opendc.compute.simulator.SimHost
+import org.opendc.experiments.capelin.env.MachineDef
+import org.opendc.experiments.capelin.trace.TraceReader
+import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider
+import org.opendc.simulator.compute.kernel.SimHypervisorProvider
+import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
+import org.opendc.simulator.compute.power.SimplePowerDriver
+import org.opendc.simulator.compute.workload.SimTraceWorkload
+import org.opendc.simulator.compute.workload.SimWorkload
+import org.opendc.simulator.resources.SimResourceInterpreter
+import org.opendc.telemetry.compute.*
+import org.opendc.telemetry.sdk.toOtelClock
+import java.time.Clock
+import kotlin.coroutines.CoroutineContext
+import kotlin.math.max
+
+/**
+ * Helper class to manage a [ComputeService] simulation.
+ */
+class ComputeServiceSimulator(
+ private val context: CoroutineContext,
+ private val clock: Clock,
+ scheduler: ComputeScheduler,
+ machines: List<MachineDef>,
+ private val failureModel: FailureModel? = null,
+ interferenceModel: VmInterferenceModel? = null,
+ hypervisorProvider: SimHypervisorProvider = SimFairShareHypervisorProvider()
+) : AutoCloseable {
+ /**
+ * The [ComputeService] that has been configured by the manager.
+ */
+ val service: ComputeService
+
+ /**
+ * The [MetricProducer] that are used by the [ComputeService] and the simulated hosts.
+ */
+ val producers: List<MetricProducer>
+ get() = _metricProducers
+ private val _metricProducers = mutableListOf<MetricProducer>()
+
+ /**
+ * The [SimResourceInterpreter] to simulate the hosts.
+ */
+ private val interpreter = SimResourceInterpreter(context, clock)
+
+ /**
+ * The hosts that belong to this class.
+ */
+ private val hosts = mutableSetOf<SimHost>()
+
+ init {
+ val (service, serviceMeterProvider) = createService(scheduler)
+ this._metricProducers.add(serviceMeterProvider)
+ this.service = service
+
+ for (def in machines) {
+ val (host, hostMeterProvider) = createHost(def, hypervisorProvider, interferenceModel)
+ this._metricProducers.add(hostMeterProvider)
+ hosts.add(host)
+ this.service.addHost(host)
+ }
+ }
+
+ /**
+ * Run a simulation of the [ComputeService] by replaying the workload trace given by [reader].
+ */
+ suspend fun run(reader: TraceReader<SimWorkload>) {
+ val injector = failureModel?.createInjector(context, clock, service)
+ val client = service.newClient()
+
+ // Create new image for the virtual machine
+ val image = client.newImage("vm-image")
+
+ try {
+ coroutineScope {
+ // Start the fault injector
+ injector?.start()
+
+ var offset = Long.MIN_VALUE
+
+ while (reader.hasNext()) {
+ val entry = reader.next()
+
+ if (offset < 0) {
+ offset = entry.start - clock.millis()
+ }
+
+ // Make sure the trace entries are ordered by submission time
+ assert(entry.start - offset >= 0) { "Invalid trace order" }
+ delay(max(0, (entry.start - offset) - clock.millis()))
+
+ launch {
+ val workloadOffset = -offset + 300001
+ val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace, workloadOffset)
+
+ val server = client.newServer(
+ entry.name,
+ image,
+ client.newFlavor(
+ entry.name,
+ entry.meta["cores"] as Int,
+ entry.meta["required-memory"] as Long
+ ),
+ meta = entry.meta + mapOf("workload" to workload)
+ )
+
+ // Wait for the server reach its end time
+ val endTime = entry.meta["end-time"] as Long
+ delay(endTime + workloadOffset - clock.millis() + 1)
+
+ // Delete the server after reaching the end-time of the virtual machine
+ server.delete()
+ }
+ }
+ }
+
+ yield()
+ } finally {
+ injector?.close()
+ reader.close()
+ client.close()
+ }
+ }
+
+ override fun close() {
+ service.close()
+
+ for (host in hosts) {
+ host.close()
+ }
+
+ hosts.clear()
+ }
+
+ /**
+ * Construct a [ComputeService] instance.
+ */
+ private fun createService(scheduler: ComputeScheduler): Pair<ComputeService, SdkMeterProvider> {
+ val resource = Resource.builder()
+ .put(ResourceAttributes.SERVICE_NAME, "opendc-compute")
+ .build()
+
+ val meterProvider = SdkMeterProvider.builder()
+ .setClock(clock.toOtelClock())
+ .setResource(resource)
+ .build()
+
+ val service = ComputeService(context, clock, meterProvider, scheduler)
+ return service to meterProvider
+ }
+
+ /**
+ * Construct a [SimHost] instance for the specified [MachineDef].
+ */
+ private fun createHost(
+ def: MachineDef,
+ hypervisorProvider: SimHypervisorProvider,
+ interferenceModel: VmInterferenceModel? = null
+ ): Pair<SimHost, SdkMeterProvider> {
+ val resource = Resource.builder()
+ .put(HOST_ID, def.uid.toString())
+ .put(HOST_NAME, def.name)
+ .put(HOST_ARCH, ResourceAttributes.HostArchValues.AMD64)
+ .put(HOST_NCPUS, def.model.cpus.size)
+ .put(HOST_MEM_CAPACITY, def.model.memory.sumOf { it.size })
+ .build()
+
+ val meterProvider = SdkMeterProvider.builder()
+ .setClock(clock.toOtelClock())
+ .setResource(resource)
+ .build()
+
+ val host = SimHost(
+ def.uid,
+ def.name,
+ def.model,
+ def.meta,
+ context,
+ interpreter,
+ meterProvider,
+ hypervisorProvider,
+ powerDriver = SimplePowerDriver(def.powerModel),
+ interferenceDomain = interferenceModel?.newDomain()
+ )
+
+ return host to meterProvider
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt
new file mode 100644
index 00000000..83393896
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.util
+
+import org.opendc.compute.service.ComputeService
+import org.opendc.compute.simulator.failure.HostFaultInjector
+import java.time.Clock
+import kotlin.coroutines.CoroutineContext
+
+/**
+ * Factory interface for constructing [HostFaultInjector] for modeling failures of compute service hosts.
+ */
+interface FailureModel {
+ /**
+ * Construct a [HostFaultInjector] for the specified [service].
+ */
+ fun createInjector(context: CoroutineContext, clock: Clock, service: ComputeService): HostFaultInjector
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt
new file mode 100644
index 00000000..89b4a31c
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt
@@ -0,0 +1,97 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+@file:JvmName("FailureModels")
+package org.opendc.experiments.capelin
+
+import org.apache.commons.math3.distribution.LogNormalDistribution
+import org.apache.commons.math3.random.Well19937c
+import org.opendc.compute.service.ComputeService
+import org.opendc.compute.simulator.SimHost
+import org.opendc.compute.simulator.failure.HostFaultInjector
+import org.opendc.compute.simulator.failure.StartStopHostFault
+import org.opendc.compute.simulator.failure.StochasticVictimSelector
+import org.opendc.experiments.capelin.util.FailureModel
+import java.time.Clock
+import java.time.Duration
+import kotlin.coroutines.CoroutineContext
+import kotlin.math.ln
+import kotlin.random.Random
+
+/**
+ * Obtain a [FailureModel] based on the GRID'5000 failure trace.
+ *
+ * This fault injector uses parameters from the GRID'5000 failure trace as described in
+ * "A Framework for the Study of Grid Inter-Operation Mechanisms", A. Iosup, 2009.
+ */
+fun grid5000(failureInterval: Duration, seed: Int): FailureModel {
+ return object : FailureModel {
+ override fun createInjector(
+ context: CoroutineContext,
+ clock: Clock,
+ service: ComputeService
+ ): HostFaultInjector {
+ val rng = Well19937c(seed)
+ val hosts = service.hosts.map { it as SimHost }.toSet()
+
+ // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009
+ // GRID'5000
+ return HostFaultInjector(
+ context,
+ clock,
+ hosts,
+ iat = LogNormalDistribution(rng, ln(failureInterval.toHours().toDouble()), 1.03),
+ selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), Random(seed)),
+ fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71))
+ )
+ }
+
+ override fun toString(): String = "Grid5000FailureModel"
+ }
+}
+
+/**
+ * Obtain the [HostFaultInjector] to use for the experiments.
+ *
+ * This fault injector uses parameters from the GRID'5000 failure trace as described in
+ * "A Framework for the Study of Grid Inter-Operation Mechanisms", A. Iosup, 2009.
+ */
+fun createFaultInjector(
+ context: CoroutineContext,
+ clock: Clock,
+ hosts: Set<SimHost>,
+ seed: Int,
+ failureInterval: Double
+): HostFaultInjector {
+ val rng = Well19937c(seed)
+
+ // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009
+ // GRID'5000
+ return HostFaultInjector(
+ context,
+ clock,
+ hosts,
+ iat = LogNormalDistribution(rng, ln(failureInterval), 1.03),
+ selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), Random(seed)),
+ fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71))
+ )
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 44cf92a8..727530e3 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -22,7 +22,6 @@
package org.opendc.experiments.capelin
-import io.opentelemetry.sdk.metrics.export.MetricProducer
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
@@ -32,7 +31,6 @@ import org.opendc.compute.service.scheduler.filters.ComputeFilter
import org.opendc.compute.service.scheduler.filters.RamFilter
import org.opendc.compute.service.scheduler.filters.VCpuFilter
import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
-import org.opendc.compute.simulator.SimHost
import org.opendc.experiments.capelin.env.ClusterEnvironmentReader
import org.opendc.experiments.capelin.env.EnvironmentReader
import org.opendc.experiments.capelin.model.Workload
@@ -40,14 +38,17 @@ import org.opendc.experiments.capelin.trace.ParquetTraceReader
import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader
import org.opendc.experiments.capelin.trace.RawParquetTraceReader
import org.opendc.experiments.capelin.trace.TraceReader
+import org.opendc.experiments.capelin.util.ComputeServiceSimulator
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.telemetry.compute.ComputeMetricExporter
import org.opendc.telemetry.compute.ComputeMonitor
import org.opendc.telemetry.compute.collectServiceMetrics
import org.opendc.telemetry.compute.table.HostData
-import org.opendc.telemetry.compute.withMonitor
+import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import java.io.File
+import java.time.Duration
import java.util.*
/**
@@ -60,11 +61,20 @@ class CapelinIntegrationTest {
private lateinit var monitor: TestExperimentReporter
/**
+ * The [FilterScheduler] to use for all experiments.
+ */
+ private lateinit var computeScheduler: FilterScheduler
+
+ /**
* Setup the experimental environment.
*/
@BeforeEach
fun setUp() {
monitor = TestExperimentReporter()
+ computeScheduler = FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
+ weighers = listOf(CoreRamWeigher(multiplier = 1.0))
+ )
}
/**
@@ -72,45 +82,46 @@ class CapelinIntegrationTest {
*/
@Test
fun testLarge() = runBlockingSimulation {
- val allocationPolicy = FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
- weighers = listOf(CoreRamWeigher(multiplier = 1.0))
- )
val traceReader = createTestTraceReader()
val environmentReader = createTestEnvironmentReader()
- val meterProvider = createMeterProvider(clock)
- withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler ->
- withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
- processTrace(
- clock,
- traceReader,
- scheduler,
- monitor
- )
- }
+ val simulator = ComputeServiceSimulator(
+ coroutineContext,
+ clock,
+ computeScheduler,
+ environmentReader.read(),
+ )
+
+ val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
+
+ try {
+ simulator.run(traceReader)
+ } finally {
+ simulator.close()
+ metricReader.close()
}
- val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
+ val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0])
println(
- "Finish " +
- "SUBMIT=${serviceMetrics.instanceCount} " +
- "FAIL=${serviceMetrics.failedInstanceCount} " +
- "QUEUE=${serviceMetrics.queuedInstanceCount} " +
- "RUNNING=${serviceMetrics.runningInstanceCount}"
+ "Scheduler " +
+ "Success=${serviceMetrics.attemptsSuccess} " +
+ "Failure=${serviceMetrics.attemptsFailure} " +
+ "Error=${serviceMetrics.attemptsError} " +
+ "Pending=${serviceMetrics.serversPending} " +
+ "Active=${serviceMetrics.serversActive}"
)
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(50, serviceMetrics.instanceCount, "The trace contains 50 VMs") },
- { assertEquals(0, serviceMetrics.runningInstanceCount, "All VMs should finish after a run") },
- { assertEquals(0, serviceMetrics.failedInstanceCount, "No VM should not be unscheduled") },
- { assertEquals(0, serviceMetrics.queuedInstanceCount, "No VM should not be in the queue") },
- { assertEquals(220346369753, monitor.totalWork) { "Incorrect requested burst" } },
- { assertEquals(206667809529, monitor.totalGrantedWork) { "Incorrect granted burst" } },
- { assertEquals(1151611104, monitor.totalOvercommittedWork) { "Incorrect overcommitted burst" } },
- { assertEquals(0, monitor.totalInterferedWork) { "Incorrect interfered burst" } },
- { assertEquals(1.7671768767192196E7, monitor.totalPowerDraw, 0.01) { "Incorrect power draw" } },
+ { assertEquals(50, serviceMetrics.attemptsSuccess, "The scheduler should schedule 50 VMs") },
+ { assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") },
+ { assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") },
+ { assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") },
+ { assertEquals(223856043, monitor.idleTime) { "Incorrect idle time" } },
+ { assertEquals(66481557, monitor.activeTime) { "Incorrect active time" } },
+ { assertEquals(360441, monitor.stealTime) { "Incorrect steal time" } },
+ { assertEquals(0, monitor.lostTime) { "Incorrect lost time" } },
+ { assertEquals(5.418336360461193E9, monitor.energyUsage, 0.01) { "Incorrect power draw" } },
)
}
@@ -120,41 +131,41 @@ class CapelinIntegrationTest {
@Test
fun testSmall() = runBlockingSimulation {
val seed = 1
- val allocationPolicy = FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
- weighers = listOf(CoreRamWeigher(multiplier = 1.0))
- )
val traceReader = createTestTraceReader(0.25, seed)
val environmentReader = createTestEnvironmentReader("single")
- val meterProvider = createMeterProvider(clock)
-
- withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler ->
- withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
- processTrace(
- clock,
- traceReader,
- scheduler,
- monitor
- )
- }
+ val simulator = ComputeServiceSimulator(
+ coroutineContext,
+ clock,
+ computeScheduler,
+ environmentReader.read(),
+ )
+
+ val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
+
+ try {
+ simulator.run(traceReader)
+ } finally {
+ simulator.close()
+ metricReader.close()
}
- val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
+ val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0])
println(
- "Finish " +
- "SUBMIT=${serviceMetrics.instanceCount} " +
- "FAIL=${serviceMetrics.failedInstanceCount} " +
- "QUEUE=${serviceMetrics.queuedInstanceCount} " +
- "RUNNING=${serviceMetrics.runningInstanceCount}"
+ "Scheduler " +
+ "Success=${serviceMetrics.attemptsSuccess} " +
+ "Failure=${serviceMetrics.attemptsFailure} " +
+ "Error=${serviceMetrics.attemptsError} " +
+ "Pending=${serviceMetrics.serversPending} " +
+ "Active=${serviceMetrics.serversActive}"
)
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(39183961335, monitor.totalWork) { "Total requested work incorrect" } },
- { assertEquals(35649903197, monitor.totalGrantedWork) { "Total granted work incorrect" } },
- { assertEquals(1043641877, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } },
- { assertEquals(0, monitor.totalInterferedWork) { "Total interfered work incorrect" } }
+ { assertEquals(9597804, monitor.idleTime) { "Idle time incorrect" } },
+ { assertEquals(11140596, monitor.activeTime) { "Active time incorrect" } },
+ { assertEquals(326138, monitor.stealTime) { "Steal time incorrect" } },
+ { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }
)
}
@@ -164,10 +175,6 @@ class CapelinIntegrationTest {
@Test
fun testInterference() = runBlockingSimulation {
val seed = 1
- val allocationPolicy = FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
- weighers = listOf(CoreRamWeigher(multiplier = 1.0))
- )
val traceReader = createTestTraceReader(0.25, seed)
val environmentReader = createTestEnvironmentReader("single")
@@ -177,34 +184,39 @@ class CapelinIntegrationTest {
.read(perfInterferenceInput)
.let { VmInterferenceModel(it, Random(seed.toLong())) }
- val meterProvider = createMeterProvider(clock)
-
- withComputeService(clock, meterProvider, environmentReader, allocationPolicy, performanceInterferenceModel) { scheduler ->
- withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
- processTrace(
- clock,
- traceReader,
- scheduler,
- monitor
- )
- }
+ val simulator = ComputeServiceSimulator(
+ coroutineContext,
+ clock,
+ computeScheduler,
+ environmentReader.read(),
+ interferenceModel = performanceInterferenceModel
+ )
+
+ val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
+
+ try {
+ simulator.run(traceReader)
+ } finally {
+ simulator.close()
+ metricReader.close()
}
- val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
+ val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0])
println(
- "Finish " +
- "SUBMIT=${serviceMetrics.instanceCount} " +
- "FAIL=${serviceMetrics.failedInstanceCount} " +
- "QUEUE=${serviceMetrics.queuedInstanceCount} " +
- "RUNNING=${serviceMetrics.runningInstanceCount}"
+ "Scheduler " +
+ "Success=${serviceMetrics.attemptsSuccess} " +
+ "Failure=${serviceMetrics.attemptsFailure} " +
+ "Error=${serviceMetrics.attemptsError} " +
+ "Pending=${serviceMetrics.serversPending} " +
+ "Active=${serviceMetrics.serversActive}"
)
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(39183961335, monitor.totalWork) { "Total requested work incorrect" } },
- { assertEquals(35649903197, monitor.totalGrantedWork) { "Total granted work incorrect" } },
- { assertEquals(1043641877, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } },
- { assertEquals(2960970230, monitor.totalInterferedWork) { "Total interfered work incorrect" } }
+ { assertEquals(9597804, monitor.idleTime) { "Idle time incorrect" } },
+ { assertEquals(11140596, monitor.activeTime) { "Active time incorrect" } },
+ { assertEquals(326138, monitor.stealTime) { "Steal time incorrect" } },
+ { assertEquals(925305, monitor.lostTime) { "Lost time incorrect" } }
)
}
@@ -214,53 +226,43 @@ class CapelinIntegrationTest {
@Test
fun testFailures() = runBlockingSimulation {
val seed = 1
- val allocationPolicy = FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
- weighers = listOf(CoreRamWeigher(multiplier = 1.0))
- )
val traceReader = createTestTraceReader(0.25, seed)
val environmentReader = createTestEnvironmentReader("single")
- val meterProvider = createMeterProvider(clock)
-
- withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler ->
- val faultInjector =
- createFaultInjector(
- coroutineContext,
- clock,
- scheduler.hosts.map { it as SimHost }.toSet(),
- seed,
- 24.0 * 7,
- )
-
- withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
- faultInjector.start()
- processTrace(
- clock,
- traceReader,
- scheduler,
- monitor
- )
- }
-
- faultInjector.close()
+ val simulator = ComputeServiceSimulator(
+ coroutineContext,
+ clock,
+ computeScheduler,
+ environmentReader.read(),
+ grid5000(Duration.ofDays(7), seed)
+ )
+
+ val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
+
+ try {
+ simulator.run(traceReader)
+ } finally {
+ simulator.close()
+ metricReader.close()
}
- val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
+ val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0])
println(
- "Finish " +
- "SUBMIT=${serviceMetrics.instanceCount} " +
- "FAIL=${serviceMetrics.failedInstanceCount} " +
- "QUEUE=${serviceMetrics.queuedInstanceCount} " +
- "RUNNING=${serviceMetrics.runningInstanceCount}"
+ "Scheduler " +
+ "Success=${serviceMetrics.attemptsSuccess} " +
+ "Failure=${serviceMetrics.attemptsFailure} " +
+ "Error=${serviceMetrics.attemptsError} " +
+ "Pending=${serviceMetrics.serversPending} " +
+ "Active=${serviceMetrics.serversActive}"
)
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(38385852453, monitor.totalWork) { "Total requested work incorrect" } },
- { assertEquals(34886665781, monitor.totalGrantedWork) { "Total granted work incorrect" } },
- { assertEquals(979997253, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } },
- { assertEquals(0, monitor.totalInterferedWork) { "Total interfered work incorrect" } }
+ { assertEquals(9836315, monitor.idleTime) { "Idle time incorrect" } },
+ { assertEquals(10902085, monitor.activeTime) { "Active time incorrect" } },
+ { assertEquals(306249, monitor.stealTime) { "Steal time incorrect" } },
+ { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } },
+ { assertEquals(2540877457, monitor.uptime) { "Uptime incorrect" } }
)
}
@@ -284,18 +286,20 @@ class CapelinIntegrationTest {
}
class TestExperimentReporter : ComputeMonitor {
- var totalWork = 0L
- var totalGrantedWork = 0L
- var totalOvercommittedWork = 0L
- var totalInterferedWork = 0L
- var totalPowerDraw = 0.0
+ var idleTime = 0L
+ var activeTime = 0L
+ var stealTime = 0L
+ var lostTime = 0L
+ var energyUsage = 0.0
+ var uptime = 0L
override fun record(data: HostData) {
- this.totalWork += data.totalWork.toLong()
- totalGrantedWork += data.grantedWork.toLong()
- totalOvercommittedWork += data.overcommittedWork.toLong()
- totalInterferedWork += data.interferedWork.toLong()
- totalPowerDraw += data.powerDraw
+ idleTime += data.cpuIdleTime
+ activeTime += data.cpuActiveTime
+ stealTime += data.cpuStealTime
+ lostTime += data.cpuLostTime
+ energyUsage += data.powerTotal
+ uptime += data.uptime
}
}
}
diff --git a/opendc-experiments/opendc-experiments-energy21/.gitignore b/opendc-experiments/opendc-experiments-energy21/.gitignore
deleted file mode 100644
index 55da79f8..00000000
--- a/opendc-experiments/opendc-experiments-energy21/.gitignore
+++ /dev/null
@@ -1,3 +0,0 @@
-input/
-output/
-.ipynb_checkpoints
diff --git a/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt b/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt
deleted file mode 100644
index d9194969..00000000
--- a/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.energy21
-
-import com.typesafe.config.ConfigFactory
-import io.opentelemetry.api.metrics.MeterProvider
-import io.opentelemetry.sdk.metrics.export.MetricProducer
-import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.coroutineScope
-import mu.KotlinLogging
-import org.opendc.compute.service.ComputeService
-import org.opendc.compute.service.scheduler.ComputeScheduler
-import org.opendc.compute.service.scheduler.FilterScheduler
-import org.opendc.compute.service.scheduler.filters.ComputeFilter
-import org.opendc.compute.service.scheduler.filters.RamFilter
-import org.opendc.compute.service.scheduler.filters.VCpuFilter
-import org.opendc.compute.simulator.SimHost
-import org.opendc.experiments.capelin.*
-import org.opendc.experiments.capelin.export.parquet.ParquetExportMonitor
-import org.opendc.experiments.capelin.trace.StreamingParquetTraceReader
-import org.opendc.harness.dsl.Experiment
-import org.opendc.harness.dsl.anyOf
-import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider
-import org.opendc.simulator.compute.kernel.cpufreq.PerformanceScalingGovernor
-import org.opendc.simulator.compute.model.MachineModel
-import org.opendc.simulator.compute.model.MemoryUnit
-import org.opendc.simulator.compute.model.ProcessingNode
-import org.opendc.simulator.compute.model.ProcessingUnit
-import org.opendc.simulator.compute.power.*
-import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.simulator.resources.SimResourceInterpreter
-import org.opendc.telemetry.compute.collectServiceMetrics
-import org.opendc.telemetry.compute.withMonitor
-import java.io.File
-import java.time.Clock
-import java.util.*
-
-/**
- * Experiments for the OpenDC project on Energy modeling.
- */
-public class EnergyExperiment : Experiment("Energy Modeling 2021") {
- /**
- * The logger for this portfolio instance.
- */
- private val logger = KotlinLogging.logger {}
-
- /**
- * The configuration to use.
- */
- private val config = ConfigFactory.load().getConfig("opendc.experiments.energy21")
-
- /**
- * The traces to test.
- */
- private val trace by anyOf("solvinity")
-
- /**
- * The power models to test.
- */
- private val powerModel by anyOf(PowerModelType.LINEAR, PowerModelType.CUBIC, PowerModelType.INTERPOLATION)
-
- override fun doRun(repeat: Int): Unit = runBlockingSimulation {
- val allocationPolicy = FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)),
- weighers = listOf(),
- subsetSize = Int.MAX_VALUE
- )
-
- val meterProvider: MeterProvider = createMeterProvider(clock)
- val monitor = ParquetExportMonitor(File(config.getString("output-path")), "power_model=$powerModel/run_id=$repeat", 4096)
- val trace = StreamingParquetTraceReader(File(config.getString("trace-path"), trace))
-
- withComputeService(clock, meterProvider, allocationPolicy) { scheduler ->
- withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
- processTrace(
- clock,
- trace,
- scheduler,
- monitor
- )
- }
- }
-
- val monitorResults = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
- logger.debug {
- "Finish SUBMIT=${monitorResults.instanceCount} " +
- "FAIL=${monitorResults.failedInstanceCount} " +
- "QUEUE=${monitorResults.queuedInstanceCount} " +
- "RUNNING=${monitorResults.runningInstanceCount}"
- }
- }
-
- /**
- * Construct the environment for a simulated compute service..
- */
- public suspend fun withComputeService(
- clock: Clock,
- meterProvider: MeterProvider,
- scheduler: ComputeScheduler,
- block: suspend CoroutineScope.(ComputeService) -> Unit
- ): Unit = coroutineScope {
- val model = createMachineModel()
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val hosts = List(64) { id ->
- SimHost(
- UUID(0, id.toLong()),
- "node-$id",
- model,
- emptyMap(),
- coroutineContext,
- interpreter,
- meterProvider.get("opendc-compute-simulator"),
- SimFairShareHypervisorProvider(),
- PerformanceScalingGovernor(),
- powerModel.driver
- )
- }
-
- val serviceMeter = meterProvider.get("opendc-compute")
- val service =
- ComputeService(coroutineContext, clock, serviceMeter, scheduler)
-
- for (host in hosts) {
- service.addHost(host)
- }
-
- try {
- block(this, service)
- } finally {
- service.close()
- hosts.forEach(SimHost::close)
- }
- }
-
- /**
- * The machine model based on: https://www.spec.org/power_ssj2008/results/res2020q1/power_ssj2008-20191125-01012.html
- */
- private fun createMachineModel(): MachineModel {
- val node = ProcessingNode("AMD", "am64", "EPYC 7742", 64)
- val cpus = List(node.coreCount) { id -> ProcessingUnit(node, id, 3400.0) }
- val memory = List(8) { MemoryUnit("Samsung", "Unknown", 2933.0, 16_000) }
-
- return MachineModel(cpus, memory)
- }
-
- /**
- * The power models to test.
- */
- public enum class PowerModelType {
- CUBIC {
- override val driver: PowerDriver = SimplePowerDriver(CubicPowerModel(206.0, 56.4))
- },
-
- LINEAR {
- override val driver: PowerDriver = SimplePowerDriver(LinearPowerModel(206.0, 56.4))
- },
-
- SQRT {
- override val driver: PowerDriver = SimplePowerDriver(SqrtPowerModel(206.0, 56.4))
- },
-
- SQUARE {
- override val driver: PowerDriver = SimplePowerDriver(SquarePowerModel(206.0, 56.4))
- },
-
- INTERPOLATION {
- override val driver: PowerDriver = SimplePowerDriver(
- InterpolationPowerModel(
- listOf(56.4, 100.0, 107.0, 117.0, 127.0, 138.0, 149.0, 162.0, 177.0, 191.0, 206.0)
- )
- )
- },
-
- MSE {
- override val driver: PowerDriver = SimplePowerDriver(MsePowerModel(206.0, 56.4, 1.4))
- },
-
- ASYMPTOTIC {
- override val driver: PowerDriver = SimplePowerDriver(AsymptoticPowerModel(206.0, 56.4, 0.3, false))
- },
-
- ASYMPTOTIC_DVFS {
- override val driver: PowerDriver = SimplePowerDriver(AsymptoticPowerModel(206.0, 56.4, 0.3, true))
- };
-
- public abstract val driver: PowerDriver
- }
-}
diff --git a/opendc-experiments/opendc-experiments-energy21/src/main/resources/application.conf b/opendc-experiments/opendc-experiments-energy21/src/main/resources/application.conf
deleted file mode 100644
index 263da0fe..00000000
--- a/opendc-experiments/opendc-experiments-energy21/src/main/resources/application.conf
+++ /dev/null
@@ -1,14 +0,0 @@
-# Default configuration for the energy experiments
-opendc.experiments.energy21 {
- # Path to the directory containing the input traces
- trace-path = input/traces
-
- # Path to the output directory to write the results to
- output-path = output
-}
-
-opendc.experiments.capelin {
- env-path = input/environments/
- trace-path = input/traces/
- output-path = output
-}
diff --git a/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt b/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt
index 650416f5..3312d6c0 100644
--- a/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt
+++ b/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt
@@ -46,6 +46,7 @@ import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.telemetry.sdk.toOtelClock
import java.io.File
+import java.time.Duration
import java.util.*
import kotlin.math.max
@@ -85,7 +86,7 @@ public class ServerlessExperiment : Experiment("Serverless") {
val delayInjector = StochasticDelayInjector(coldStartModel, Random())
val deployer = SimFunctionDeployer(clock, this, createMachineModel(), delayInjector) { FunctionTraceWorkload(traceById.getValue(it.name)) }
val service =
- FaaSService(coroutineContext, clock, meterProvider.get("opendc-serverless"), deployer, routingPolicy, FunctionTerminationPolicyFixed(coroutineContext, clock, timeout = 10L * 60 * 1000))
+ FaaSService(coroutineContext, clock, meterProvider, deployer, routingPolicy, FunctionTerminationPolicyFixed(coroutineContext, clock, timeout = Duration.ofMinutes(10)))
val client = service.newClient()
coroutineScope {