diff options
| author | Georgios Andreadis <g.andreadis@student.tudelft.nl> | 2020-05-17 22:00:20 +0200 |
|---|---|---|
| committer | Georgios Andreadis <g.andreadis@student.tudelft.nl> | 2020-05-17 22:00:20 +0200 |
| commit | c78d99c198e48dfa1b9f0f22f936ebc9457e5a5b (patch) | |
| tree | 633cfa8c2331c34c3572ee8e404ff80567010d5a | |
| parent | 210ca9d4b82cda82d0651e539c38d36eb61aec1e (diff) | |
| parent | 2211cc2af7fced34921055e8d5fb5516457b4cc0 (diff) | |
Merge branch 'feat/database-metrics' into '2.x'
Implement experiment runner in Kotlin
Closes #63
See merge request opendc/opendc-simulator!66
57 files changed, 2757 insertions, 787 deletions
diff --git a/buildSrc/src/main/kotlin/library.kt b/buildSrc/src/main/kotlin/library.kt index 3b05f3a4..de88f114 100644 --- a/buildSrc/src/main/kotlin/library.kt +++ b/buildSrc/src/main/kotlin/library.kt @@ -45,5 +45,5 @@ object Library { /** * Kotlin coroutines support */ - val KOTLINX_COROUTINES = "1.3.5" + val KOTLINX_COROUTINES = "1.3.6" } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt index 45024a49..f458877b 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt @@ -40,30 +40,12 @@ const val IMAGE_PERF_INTERFERENCE_MODEL = "image:performance-interference" * @param items The [PerformanceInterferenceModelItem]s that make up this model. */ class PerformanceInterferenceModel( - items: Set<PerformanceInterferenceModelItem>, + val items: SortedSet<PerformanceInterferenceModelItem>, val random: Random = Random(0) ) { private var intersectingItems: List<PerformanceInterferenceModelItem> = emptyList() - private val comparator = Comparator<PerformanceInterferenceModelItem> { lhs, rhs -> - var cmp = lhs.performanceScore.compareTo(rhs.performanceScore) - if (cmp != 0) { - return@Comparator cmp - } - - cmp = lhs.minServerLoad.compareTo(rhs.minServerLoad) - if (cmp != 0) { - return@Comparator cmp - } - - lhs.hashCode().compareTo(rhs.hashCode()) - } - val items = TreeSet(comparator) private val colocatedWorkloads = TreeSet<String>() - init { - this.items.addAll(items) - } - fun vmStarted(server: Server) { colocatedWorkloads.add(server.image.name) intersectingItems = items.filter { item -> doesMatch(item) } @@ -113,7 +95,7 @@ data class PerformanceInterferenceModelItem( val workloadNames: SortedSet<String>, val minServerLoad: Double, val performanceScore: Double -) { +) : Comparable<PerformanceInterferenceModelItem> { override fun equals(other: Any?): Boolean { if (this === other) return true if (javaClass != other?.javaClass) return false @@ -126,4 +108,18 @@ data class PerformanceInterferenceModelItem( } override fun hashCode(): Int = workloadNames.hashCode() + + override fun compareTo(other: PerformanceInterferenceModelItem): Int { + var cmp = performanceScore.compareTo(other.performanceScore) + if (cmp != 0) { + return cmp + } + + cmp = minServerLoad.compareTo(other.minServerLoad) + if (cmp != 0) { + return cmp + } + + return hashCode().compareTo(other.hashCode()) + } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt index 53fa463b..ce814dd8 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt @@ -339,7 +339,7 @@ class SimpleVirtDriver( min(totalRequestedSubBurst, totalGrantedBurst), // We can run more than requested due to timing totalOvercommissionedBurst, totalInterferedBurst, // Might be smaller than zero due to FP rounding errors, - totalAllocatedUsage, + min(totalAllocatedUsage, totalRequestedUsage), // The allocated usage might be slightly higher due to FP rounding totalRequestedUsage, vmCount, // Some VMs might already have finished, so keep initial VM count server diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt index 3603ae69..c3d9c745 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt @@ -1,6 +1,7 @@ package com.atlarge.opendc.compute.virt.service import com.atlarge.odcsim.SimulationContext +import com.atlarge.odcsim.flow.EventFlow import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.Server @@ -19,6 +20,7 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.Job import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch @@ -57,11 +59,11 @@ class SimpleVirtProvisioningService( */ private val activeImages: MutableSet<ImageView> = mutableSetOf() - public var submittedVms = 0L - public var queuedVms = 0L - public var runningVms = 0L - public var finishedVms = 0L - public var unscheduledVms = 0L + public var submittedVms = 0 + public var queuedVms = 0 + public var runningVms = 0 + public var finishedVms = 0 + public var unscheduledVms = 0 private var maxCores = 0 private var maxMemory = 0L @@ -71,6 +73,13 @@ class SimpleVirtProvisioningService( */ private val allocationLogic = allocationPolicy() + /** + * The [EventFlow] to emit the events. + */ + internal val eventFlow = EventFlow<VirtProvisioningEvent>() + + override val events: Flow<VirtProvisioningEvent> = eventFlow + init { launch { val provisionedNodes = provisioningService.nodes() @@ -96,8 +105,17 @@ class SimpleVirtProvisioningService( image: Image, flavor: Flavor ): Server = withContext(coroutineContext) { - submittedVms++ - queuedVms++ + eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + ++submittedVms, + runningVms, + finishedVms, + ++queuedVms, + unscheduledVms + )) + suspendCancellableCoroutine<Server> { cont -> val vmInstance = ImageView(name, image, flavor, cont) incomingImages += vmInstance @@ -141,7 +159,17 @@ class SimpleVirtProvisioningService( if (selectedHv == null) { if (requiredMemory > maxMemory || imageInstance.flavor.cpuCount > maxCores) { - unscheduledVms++ + eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + runningVms, + finishedVms, + queuedVms, + ++unscheduledVms + )) + incomingImages -= imageInstance logger.warn("Failed to spawn ${imageInstance.image}: does not fit [${clock.millis()}]") @@ -168,8 +196,17 @@ class SimpleVirtProvisioningService( ) imageInstance.server = server imageInstance.continuation.resume(server) - queuedVms-- - runningVms++ + + eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + ++runningVms, + finishedVms, + --queuedVms, + unscheduledVms + )) activeImages += imageInstance server.events @@ -178,8 +215,17 @@ class SimpleVirtProvisioningService( is ServerEvent.StateChanged -> { if (event.server.state == ServerState.SHUTOFF) { logger.info { "Server ${event.server.uid} ${event.server.name} ${event.server.flavor} finished." } - runningVms-- - finishedVms++ + + eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + --runningVms, + ++finishedVms, + queuedVms, + unscheduledVms + )) activeImages -= imageInstance selectedHv.provisionedCores -= server.flavor.cpuCount @@ -223,11 +269,33 @@ class SimpleVirtProvisioningService( maxMemory = max(maxMemory, server.flavor.memorySize) hypervisors[server] = hv } + + eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + runningVms, + finishedVms, + queuedVms, + unscheduledVms + )) } ServerState.SHUTOFF, ServerState.ERROR -> { val hv = hypervisors[server] ?: return availableHypervisors -= hv + eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + runningVms, + finishedVms, + queuedVms, + unscheduledVms + )) + if (incomingImages.isNotEmpty()) { requestCycle() } @@ -242,6 +310,17 @@ class SimpleVirtProvisioningService( hv.driver = server.services[VirtDriver] availableHypervisors += hv + eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + runningVms, + finishedVms, + queuedVms, + unscheduledVms + )) + hv.driver.events .onEach { event -> if (event is HypervisorEvent.VmsUpdated) { diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningEvent.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningEvent.kt new file mode 100644 index 00000000..c3fb99f9 --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningEvent.kt @@ -0,0 +1,49 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.compute.virt.service + +/** + * An event that is emitted by the [VirtProvisioningService]. + */ +public sealed class VirtProvisioningEvent { + /** + * The service that has emitted the event. + */ + public abstract val provisioner: VirtProvisioningService + + /** + * An event emitted for writing metrics. + */ + data class MetricsAvailable( + override val provisioner: VirtProvisioningService, + public val totalHostCount: Int, + public val availableHostCount: Int, + public val totalVmCount: Int, + public val activeVmCount: Int, + public val inactiveVmCount: Int, + public val waitingVmCount: Int, + public val failedVmCount: Int + ) : VirtProvisioningEvent() +} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt index 2ad7df84..c4cbd711 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt @@ -5,6 +5,7 @@ import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.virt.driver.VirtDriver import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy +import kotlinx.coroutines.flow.Flow /** * A service for VM provisioning on a cloud. @@ -16,6 +17,11 @@ interface VirtProvisioningService { val allocationPolicy: AllocationPolicy /** + * The events emitted by the service. + */ + public val events: Flow<VirtProvisioningEvent> + + /** * Obtain the active hypervisors for this provisioner. */ public suspend fun drivers(): Set<VirtDriver> diff --git a/opendc/opendc-experiments-sc20/build.gradle.kts b/opendc/opendc-experiments-sc20/build.gradle.kts index 6b6366a7..46d99564 100644 --- a/opendc/opendc-experiments-sc20/build.gradle.kts +++ b/opendc/opendc-experiments-sc20/build.gradle.kts @@ -31,23 +31,26 @@ plugins { } application { - mainClassName = "com.atlarge.opendc.experiments.sc20.Sc20ExperimentKt" - applicationDefaultJvmArgs = listOf("-Xmx2500M", "-Xms1800M") + mainClassName = "com.atlarge.opendc.experiments.sc20.MainKt" + applicationDefaultJvmArgs = listOf("-Xms2500M") } dependencies { api(project(":opendc:opendc-core")) implementation(project(":opendc:opendc-format")) implementation(kotlin("stdlib")) + implementation("com.github.ajalt:clikt:2.6.0") + implementation("me.tongfei:progressbar:0.8.1") implementation("io.github.microutils:kotlin-logging:1.7.9") + implementation("org.apache.parquet:parquet-avro:1.11.0") implementation("org.apache.hadoop:hadoop-client:3.2.1") { exclude(group = "org.slf4j", module = "slf4j-log4j12") exclude(group = "log4j") } + runtimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:2.13.1") - runtimeOnly("org.postgresql:postgresql:42.2.12") runtimeOnly(project(":odcsim:odcsim-engine-omega")) testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}") diff --git a/opendc/opendc-experiments-sc20/schema.sql b/opendc/opendc-experiments-sc20/schema.sql deleted file mode 100644 index 51990a75..00000000 --- a/opendc/opendc-experiments-sc20/schema.sql +++ /dev/null @@ -1,22 +0,0 @@ -DROP TABLE IF EXISTS host_reports; -CREATE TABLE host_reports ( - id BIGSERIAL PRIMARY KEY NOT NULL, - experiment_id BIGINT NOT NULL, - time BIGINT NOT NULL, - duration BIGINT NOT NULL, - requested_burst BIGINT NOT NULL, - granted_burst BIGINT NOT NULL, - overcommissioned_burst BIGINT NOT NULL, - interfered_burst BIGINT NOT NULL, - cpu_usage DOUBLE PRECISION NOT NULL, - cpu_demand DOUBLE PRECISION NOT NULL, - image_count INTEGER NOT NULL, - server TEXT NOT NULL, - host_state TEXT NOT NULL, - host_usage DOUBLE PRECISION NOT NULL, - power_draw DOUBLE PRECISION NOT NULL, - total_submitted_vms BIGINT NOT NULL, - total_queued_vms BIGINT NOT NULL, - total_running_vms BIGINT NOT NULL, - total_finished_vms BIGINT NOT NULL -); diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt new file mode 100644 index 00000000..14de52b8 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt @@ -0,0 +1,159 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20 + +import com.atlarge.opendc.experiments.sc20.experiment.Experiment +import com.atlarge.opendc.experiments.sc20.experiment.HorVerPortfolio +import com.atlarge.opendc.experiments.sc20.experiment.MoreHpcPortfolio +import com.atlarge.opendc.experiments.sc20.experiment.MoreVelocityPortfolio +import com.atlarge.opendc.experiments.sc20.experiment.OperationalPhenomenaPortfolio +import com.atlarge.opendc.experiments.sc20.experiment.Portfolio +import com.atlarge.opendc.experiments.sc20.experiment.TestPortfolio +import com.atlarge.opendc.experiments.sc20.reporter.ConsoleExperimentReporter +import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor +import com.atlarge.opendc.experiments.sc20.runner.execution.ThreadPoolExperimentScheduler +import com.atlarge.opendc.experiments.sc20.runner.internal.DefaultExperimentRunner +import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader +import com.atlarge.opendc.format.trace.sc20.Sc20VmPlacementReader +import com.github.ajalt.clikt.core.CliktCommand +import com.github.ajalt.clikt.parameters.options.convert +import com.github.ajalt.clikt.parameters.options.default +import com.github.ajalt.clikt.parameters.options.defaultLazy +import com.github.ajalt.clikt.parameters.options.multiple +import com.github.ajalt.clikt.parameters.options.option +import com.github.ajalt.clikt.parameters.options.required +import com.github.ajalt.clikt.parameters.types.choice +import com.github.ajalt.clikt.parameters.types.file +import com.github.ajalt.clikt.parameters.types.int +import mu.KotlinLogging +import java.io.File +import java.io.InputStream + +/** + * The logger for this experiment. + */ +private val logger = KotlinLogging.logger {} + +/** + * Represents the command for running the experiment. + */ +class ExperimentCli : CliktCommand(name = "sc20-experiment") { + /** + * The path to the directory where the topology descriptions are located. + */ + private val environmentPath by option("--environment-path", help = "path to the environment directory") + .file(canBeFile = false) + .required() + + /** + * The path to the directory where the traces are located. + */ + private val tracePath by option("--trace-path", help = "path to the traces directory") + .file(canBeFile = false) + .required() + + /** + * The path to the performance interference model. + */ + private val performanceInterferenceStream by option("--performance-interference-model", help = "path to the performance interference file") + .file() + .convert { it.inputStream() as InputStream } + + /** + * The path to the original VM placements file. + */ + private val vmPlacements by option("--vm-placements-file", help = "path to the VM placement file") + .file() + .convert { + Sc20VmPlacementReader(it.inputStream().buffered()).construct() + } + .default(emptyMap()) + + /** + * The selected portfolios to run. + */ + private val portfolios by option("--portfolio") + .choice( + "hor-ver" to { experiment: Experiment, i: Int -> HorVerPortfolio(experiment, i) } as (Experiment, Int) -> Portfolio, + "more-velocity" to { experiment, i -> MoreVelocityPortfolio(experiment, i) }, + "more-hpc" to { experiment, i -> MoreHpcPortfolio(experiment, i) }, + "operational-phenomena" to { experiment, i -> OperationalPhenomenaPortfolio(experiment, i) }, + "test" to { experiment, i -> TestPortfolio(experiment, i) }, + ignoreCase = true + ) + .multiple() + + /** + * The maximum number of worker threads to use. + */ + private val parallelism by option("--parallelism") + .int() + .default(Runtime.getRuntime().availableProcessors()) + + /** + * The buffer size for writing results. + */ + private val bufferSize by option("--buffer-size") + .int() + .default(4096) + + /** + * The path to the output directory. + */ + private val output by option("-O", "--output", help = "path to the output directory") + .file(canBeFile = false) + .defaultLazy { File("data") } + + override fun run() { + logger.info { "Constructing performance interference model" } + + val performanceInterferenceModel = + performanceInterferenceStream?.let { Sc20PerformanceInterferenceReader(it) } + + logger.info { "Creating experiment descriptor" } + val descriptor = object : Experiment(environmentPath, tracePath, output, performanceInterferenceModel, vmPlacements, bufferSize) { + private val descriptor = this + override val children: Sequence<ExperimentDescriptor> = sequence { + for ((i, producer) in portfolios.withIndex()) { + yield(producer(descriptor, i)) + } + } + } + + logger.info { "Starting experiment runner [parallelism=$parallelism]" } + val scheduler = ThreadPoolExperimentScheduler(parallelism) + val runner = DefaultExperimentRunner(scheduler) + try { + runner.execute(descriptor, ConsoleExperimentReporter()) + } finally { + scheduler.close() + } + } +} + +/** + * Main entry point of the experiment. + */ +fun main(args: Array<String>) = ExperimentCli().main(args) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt deleted file mode 100644 index 51448c9e..00000000 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt +++ /dev/null @@ -1,235 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.experiments.sc20 - -import com.atlarge.odcsim.SimulationEngineProvider -import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy -import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy -import com.atlarge.opendc.compute.virt.service.allocation.NumberOfActiveServersAllocationPolicy -import com.atlarge.opendc.compute.virt.service.allocation.ProvisionedCoresAllocationPolicy -import com.atlarge.opendc.compute.virt.service.allocation.RandomAllocationPolicy -import com.atlarge.opendc.compute.virt.service.allocation.ReplayAllocationPolicy -import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader -import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader -import com.atlarge.opendc.format.trace.sc20.Sc20VmPlacementReader -import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper -import com.fasterxml.jackson.module.kotlin.readValue -import com.github.ajalt.clikt.core.CliktCommand -import com.github.ajalt.clikt.parameters.groups.OptionGroup -import com.github.ajalt.clikt.parameters.groups.default -import com.github.ajalt.clikt.parameters.groups.groupChoice -import com.github.ajalt.clikt.parameters.groups.mutuallyExclusiveOptions -import com.github.ajalt.clikt.parameters.groups.required -import com.github.ajalt.clikt.parameters.options.convert -import com.github.ajalt.clikt.parameters.options.default -import com.github.ajalt.clikt.parameters.options.defaultLazy -import com.github.ajalt.clikt.parameters.options.flag -import com.github.ajalt.clikt.parameters.options.option -import com.github.ajalt.clikt.parameters.options.required -import com.github.ajalt.clikt.parameters.types.choice -import com.github.ajalt.clikt.parameters.types.file -import com.github.ajalt.clikt.parameters.types.int -import com.github.ajalt.clikt.parameters.types.long -import kotlinx.coroutines.cancel -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking -import mu.KotlinLogging -import java.io.File -import java.io.FileReader -import java.io.InputStream -import java.sql.DriverManager -import java.util.ServiceLoader -import kotlin.random.Random - -/** - * The logger for this experiment. - */ -private val logger = KotlinLogging.logger {} - -/** - * Represents the command for running the experiment. - */ -class ExperimentCommand : CliktCommand(name = "sc20-experiment") { - private val environment by option("--environment-file", help = "path to the environment file") - .file() - .required() - private val performanceInterferenceStream by option("--performance-interference-file", help = "path to the performance interference file") - .file() - .convert { it.inputStream() as InputStream } - .defaultLazy { ExperimentCommand::class.java.getResourceAsStream("/env/performance-interference.json") } - - private val vmPlacements by option("--vm-placements-file", help = "path to the VM placement file") - .file() - .convert { - Sc20VmPlacementReader(it.inputStream().buffered()).construct() - } - .default(emptyMap()) - - private val selectedVms by mutuallyExclusiveOptions( - option("--selected-vms", help = "the VMs to run").convert { parseVMs(it) }, - option("--selected-vms-file").file().convert { parseVMs(FileReader(it).readText()) } - ).default(emptyList()) - - private val seed by option(help = "the random seed") - .int() - .default(0) - private val failures by option("-x", "--failures", help = "enable (correlated) machine failures") - .flag() - private val failureInterval by option(help = "expected number of hours between failures") - .int() - .default(24 * 7) // one week - private val allocationPolicy by option(help = "name of VM allocation policy to use") - .choice( - "mem", "mem-inv", - "core-mem", "core-mem-inv", - "active-servers", "active-servers-inv", - "provisioned-cores", "provisioned-cores-inv", - "random", "replay" - ) - .default("core-mem") - - private val trace by option("--trace-directory", help = "path to the trace directory") - .file(canBeFile = false) - .required() - - private val reporter by option().groupChoice( - "parquet" to Parquet(), - "postgres" to Postgres() - ).required() - - private fun parseVMs(string: String): List<String> { - // Handle case where VM list contains a VM name with an (escaped) single-quote in it - val sanitizedString = string.replace("\\'", "\\\\[") - .replace("'", "\"") - .replace("\\\\[", "'") - val vms: List<String> = jacksonObjectMapper().readValue(sanitizedString) - return vms - } - - override fun run() { - logger.info("seed: $seed") - logger.info("failures: $failures") - logger.info("allocation-policy: $allocationPolicy") - - val start = System.currentTimeMillis() - val reporter: Sc20Reporter = reporter.createReporter() - - val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - val system = provider("test") - val root = system.newDomain("root") - - val chan = Channel<Unit>(Channel.CONFLATED) - val allocationPolicy = when (this.allocationPolicy) { - "mem" -> AvailableMemoryAllocationPolicy() - "mem-inv" -> AvailableMemoryAllocationPolicy(true) - "core-mem" -> AvailableCoreMemoryAllocationPolicy() - "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true) - "active-servers" -> NumberOfActiveServersAllocationPolicy() - "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true) - "provisioned-cores" -> ProvisionedCoresAllocationPolicy() - "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true) - "random" -> RandomAllocationPolicy(Random(seed)) - "replay" -> ReplayAllocationPolicy(vmPlacements) - else -> throw IllegalArgumentException("Unknown policy ${this.allocationPolicy}") - } - - val performanceInterferenceModel = try { - Sc20PerformanceInterferenceReader(performanceInterferenceStream).construct() - } catch (e: Throwable) { - reporter.close() - throw e - } - val environmentReader = Sc20ClusterEnvironmentReader(environment) - val traceReader = try { - createTraceReader(trace, performanceInterferenceModel, selectedVms, seed) - } catch (e: Throwable) { - reporter.close() - throw e - } - - root.launch { - val (bareMetalProvisioner, scheduler) = createProvisioner(root, environmentReader, allocationPolicy) - - val failureDomain = if (failures) { - logger.info("ENABLING failures") - createFailureDomain(seed, failureInterval, bareMetalProvisioner, chan) - } else { - null - } - - attachMonitor(scheduler, reporter) - processTrace(traceReader, scheduler, chan, reporter, vmPlacements) - - logger.debug("SUBMIT=${scheduler.submittedVms}") - logger.debug("FAIL=${scheduler.unscheduledVms}") - logger.debug("QUEUED=${scheduler.queuedVms}") - logger.debug("RUNNING=${scheduler.runningVms}") - logger.debug("FINISHED=${scheduler.finishedVms}") - - failureDomain?.cancel() - scheduler.terminate() - logger.info("Simulation took ${System.currentTimeMillis() - start} milliseconds") - } - - runBlocking { - system.run() - system.terminate() - } - - // Explicitly close the monitor to flush its buffer - reporter.close() - } -} - -sealed class Reporter(name: String) : OptionGroup(name) { - /** - * Create the [Sc20Reporter] for this option. - */ - abstract fun createReporter(): Sc20Reporter -} - -class Parquet : Reporter("Options for reporting using Parquet") { - private val path by option(help = "path to where the output should be stored") - .file() - .defaultLazy { File("data/results-${System.currentTimeMillis()}.parquet") } - - override fun createReporter(): Sc20Reporter = Sc20ParquetReporter(path) -} - -class Postgres : Reporter("Options for reporting using PostgreSQL") { - private val url by option(help = "JDBC connection url").required() - private val experimentId by option(help = "Experiment ID").long().required() - - override fun createReporter(): Sc20Reporter { - val conn = DriverManager.getConnection(url) - return Sc20PostgresReporter(conn, experimentId) - } -} - -/** - * Main entry point of the experiment. - */ -fun main(args: Array<String>) = ExperimentCommand().main(args) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt deleted file mode 100644 index f2139144..00000000 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt +++ /dev/null @@ -1,151 +0,0 @@ -package com.atlarge.opendc.experiments.sc20 - -import com.atlarge.odcsim.simulationContext -import com.atlarge.opendc.compute.core.Server -import com.atlarge.opendc.compute.core.ServerState -import com.atlarge.opendc.compute.metal.driver.BareMetalDriver -import com.atlarge.opendc.compute.virt.driver.VirtDriver -import kotlinx.coroutines.flow.first -import mu.KotlinLogging -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData -import org.apache.hadoop.fs.Path -import org.apache.parquet.avro.AvroParquetWriter -import org.apache.parquet.hadoop.metadata.CompressionCodecName -import java.io.File -import java.util.concurrent.ArrayBlockingQueue -import kotlin.concurrent.thread - -private val logger = KotlinLogging.logger {} - -class Sc20ParquetReporter(destination: File) : Sc20Reporter { - private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>() - private val schema = SchemaBuilder - .record("slice") - .namespace("com.atlarge.opendc.experiments.sc20") - .fields() - .name("time").type().longType().noDefault() - .name("duration").type().longType().noDefault() - .name("requested_burst").type().longType().noDefault() - .name("granted_burst").type().longType().noDefault() - .name("overcommissioned_burst").type().longType().noDefault() - .name("interfered_burst").type().longType().noDefault() - .name("cpu_usage").type().doubleType().noDefault() - .name("cpu_demand").type().doubleType().noDefault() - .name("image_count").type().intType().noDefault() - .name("server").type().stringType().noDefault() - .name("host_state").type().stringType().noDefault() - .name("host_usage").type().doubleType().noDefault() - .name("power_draw").type().doubleType().noDefault() - .name("total_submitted_vms").type().longType().noDefault() - .name("total_queued_vms").type().longType().noDefault() - .name("total_running_vms").type().longType().noDefault() - .name("total_finished_vms").type().longType().noDefault() - .endRecord() - private val writer = AvroParquetWriter.builder<GenericData.Record>(Path(destination.absolutePath)) - .withSchema(schema) - .withCompressionCodec(CompressionCodecName.SNAPPY) - .withPageSize(4 * 1024 * 1024) // For compression - .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) - .build() - private val queue = ArrayBlockingQueue<GenericData.Record>(2048) - private val writerThread = thread(start = true, name = "sc20-writer") { - try { - while (true) { - val record = queue.take() - writer.write(record) - } - } catch (e: InterruptedException) { - // Do not rethrow this - } finally { - writer.close() - } - } - - override suspend fun reportVmStateChange(server: Server) {} - - override suspend fun reportHostStateChange( - driver: VirtDriver, - server: Server, - submittedVms: Long, - queuedVms: Long, - runningVms: Long, - finishedVms: Long - ) { - val lastServerState = lastServerStates[server] - if (server.state == ServerState.SHUTOFF && lastServerState != null) { - val duration = simulationContext.clock.millis() - lastServerState.second - reportHostSlice( - simulationContext.clock.millis(), - 0, - 0, - 0, - 0, - 0.0, - 0.0, - 0, - server, - submittedVms, - queuedVms, - runningVms, - finishedVms, - duration - ) - } - - logger.info("Host ${server.uid} changed state ${server.state} [${simulationContext.clock.millis()}]") - - lastServerStates[server] = Pair(server.state, simulationContext.clock.millis()) - } - - override suspend fun reportHostSlice( - time: Long, - requestedBurst: Long, - grantedBurst: Long, - overcommissionedBurst: Long, - interferedBurst: Long, - cpuUsage: Double, - cpuDemand: Double, - numberOfDeployedImages: Int, - hostServer: Server, - submittedVms: Long, - queuedVms: Long, - runningVms: Long, - finishedVms: Long, - duration: Long - ) { - // Assume for now that the host is not virtualized and measure the current power draw - val driver = hostServer.services[BareMetalDriver.Key] - val usage = driver.usage.first() - val powerDraw = driver.powerDraw.first() - - val record = GenericData.Record(schema) - record.put("time", time) - record.put("duration", duration) - record.put("requested_burst", requestedBurst) - record.put("granted_burst", grantedBurst) - record.put("overcommissioned_burst", overcommissionedBurst) - record.put("interfered_burst", interferedBurst) - record.put("cpu_usage", cpuUsage) - record.put("cpu_demand", cpuDemand) - record.put("image_count", numberOfDeployedImages) - record.put("server", hostServer.uid) - record.put("host_state", hostServer.state) - record.put("host_usage", usage) - record.put("power_draw", powerDraw) - record.put("total_submitted_vms", submittedVms) - record.put("total_queued_vms", queuedVms) - record.put("total_running_vms", runningVms) - record.put("total_finished_vms", finishedVms) - - queue.put(record) - } - - override fun close() { - // Busy loop to wait for writer thread to finish - while (queue.isNotEmpty()) { - Thread.sleep(500) - } - writerThread.interrupt() - } -} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20PostgresReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20PostgresReporter.kt deleted file mode 100644 index 5c5e6ceb..00000000 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20PostgresReporter.kt +++ /dev/null @@ -1,200 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.experiments.sc20 - -import com.atlarge.odcsim.simulationContext -import com.atlarge.opendc.compute.core.Server -import com.atlarge.opendc.compute.core.ServerState -import com.atlarge.opendc.compute.metal.driver.BareMetalDriver -import com.atlarge.opendc.compute.virt.driver.VirtDriver -import kotlinx.coroutines.flow.first -import mu.KotlinLogging -import java.sql.Connection -import java.util.concurrent.ArrayBlockingQueue -import java.util.concurrent.atomic.AtomicBoolean -import kotlin.concurrent.thread - -private val logger = KotlinLogging.logger {} - -class Sc20PostgresReporter(val conn: Connection, val experimentId: Long) : Sc20Reporter { - private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>() - private val queue = ArrayBlockingQueue<Report>(2048) - private val stop = AtomicBoolean(false) - private val writerThread = thread(start = true, name = "sc20-writer") { - val stmt = try { - conn.prepareStatement( - """ - INSERT INTO host_reports (experiment_id, time, duration, requested_burst, granted_burst, overcommissioned_burst, interfered_burst, cpu_usage, cpu_demand, image_count, server, host_state, host_usage, power_draw, total_submitted_vms, total_queued_vms, total_running_vms, total_finished_vms) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """.trimIndent() - ) - } catch (e: Throwable) { - conn.close() - throw e - } - - val batchSize = 4096 - var batch = 0 - - try { - while (!stop.get()) { - val record = queue.take() - stmt.setLong(1, experimentId) - stmt.setLong(2, record.time) - stmt.setLong(3, record.duration) - stmt.setLong(4, record.requestedBurst) - stmt.setLong(5, record.grantedBurst) - stmt.setLong(6, record.overcommissionedBurst) - stmt.setLong(7, record.interferedBurst) - stmt.setDouble(8, record.cpuUsage) - stmt.setDouble(9, record.cpuDemand) - stmt.setInt(10, record.numberOfDeployedImages) - stmt.setString(11, record.hostServer.uid.toString()) - stmt.setString(12, record.hostServer.state.name) - stmt.setDouble(13, record.hostUsage) - stmt.setDouble(14, record.powerDraw) - stmt.setLong(15, record.submittedVms) - stmt.setLong(16, record.queuedVms) - stmt.setLong(17, record.runningVms) - stmt.setLong(18, record.finishedVms) - stmt.addBatch() - batch++ - - if (batch > batchSize) { - stmt.executeBatch() - batch = 0 - } - } - } finally { - stmt.executeBatch() - stmt.close() - conn.close() - } - } - - override suspend fun reportVmStateChange(server: Server) {} - - override suspend fun reportHostStateChange( - driver: VirtDriver, - server: Server, - submittedVms: Long, - queuedVms: Long, - runningVms: Long, - finishedVms: Long - ) { - val lastServerState = lastServerStates[server] - if (server.state == ServerState.SHUTOFF && lastServerState != null) { - val duration = simulationContext.clock.millis() - lastServerState.second - reportHostSlice( - simulationContext.clock.millis(), - 0, - 0, - 0, - 0, - 0.0, - 0.0, - 0, - server, - submittedVms, - queuedVms, - runningVms, - finishedVms, - duration - ) - } - - logger.info("Host ${server.uid} changed state ${server.state} [${simulationContext.clock.millis()}]") - - lastServerStates[server] = Pair(server.state, simulationContext.clock.millis()) - } - - override suspend fun reportHostSlice( - time: Long, - requestedBurst: Long, - grantedBurst: Long, - overcommissionedBurst: Long, - interferedBurst: Long, - cpuUsage: Double, - cpuDemand: Double, - numberOfDeployedImages: Int, - hostServer: Server, - submittedVms: Long, - queuedVms: Long, - runningVms: Long, - finishedVms: Long, - duration: Long - ) { - // Assume for now that the host is not virtualized and measure the current power draw - val driver = hostServer.services[BareMetalDriver.Key] - val usage = driver.usage.first() - val powerDraw = driver.powerDraw.first() - - queue.put( - Report( - time, - duration, - requestedBurst, - grantedBurst, - overcommissionedBurst, - interferedBurst, - cpuUsage, - cpuDemand, - numberOfDeployedImages, - hostServer, - usage, - powerDraw, - submittedVms, - queuedVms, - runningVms, - finishedVms - ) - ) - } - - override fun close() { - // Busy loop to wait for writer thread to finish - stop.set(true) - writerThread.join() - } - - data class Report( - val time: Long, - val duration: Long, - val requestedBurst: Long, - val grantedBurst: Long, - val overcommissionedBurst: Long, - val interferedBurst: Long, - val cpuUsage: Double, - val cpuDemand: Double, - val numberOfDeployedImages: Int, - val hostServer: Server, - val hostUsage: Double, - val powerDraw: Double, - val submittedVms: Long, - val queuedVms: Long, - val runningVms: Long, - val finishedVms: Long - ) -} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Experiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Experiment.kt new file mode 100644 index 00000000..f3ac2554 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Experiment.kt @@ -0,0 +1,78 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.experiment + +import com.atlarge.opendc.experiments.sc20.runner.ContainerExperimentDescriptor +import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor +import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionContext +import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionListener +import com.atlarge.opendc.experiments.sc20.telemetry.RunEvent +import com.atlarge.opendc.experiments.sc20.telemetry.parquet.ParquetRunEventWriter +import com.atlarge.opendc.format.trace.PerformanceInterferenceModelReader +import java.io.File + +/** + * The global configuration of the experiment. + * + * @param environments The path to the topologies directory. + * @param traces The path to the traces directory. + * @param output The output directory. + * @param performanceInterferenceModel The optional performance interference model that has been specified. + * @param vmPlacements Original VM placement in the trace. + * @param bufferSize The buffer size of the event reporters. + */ +public abstract class Experiment( + val environments: File, + val traces: File, + val output: File, + val performanceInterferenceModel: PerformanceInterferenceModelReader?, + val vmPlacements: Map<String, String>, + val bufferSize: Int +) : ContainerExperimentDescriptor() { + override val parent: ExperimentDescriptor? = null + + override suspend fun invoke(context: ExperimentExecutionContext) { + val writer = ParquetRunEventWriter(File(output, "experiments.parquet"), bufferSize) + try { + val listener = object : ExperimentExecutionListener by context.listener { + override fun descriptorRegistered(descriptor: ExperimentDescriptor) { + if (descriptor is Run) { + writer.write(RunEvent(descriptor, System.currentTimeMillis())) + } + + context.listener.descriptorRegistered(descriptor) + } + } + + val newContext = object : ExperimentExecutionContext by context { + override val listener: ExperimentExecutionListener = listener + } + + super.invoke(newContext) + } finally { + writer.close() + } + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt index e37dea8b..83952d43 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt @@ -22,7 +22,7 @@ * SOFTWARE. */ -package com.atlarge.opendc.experiments.sc20 +package com.atlarge.opendc.experiments.sc20.experiment import com.atlarge.odcsim.Domain import com.atlarge.odcsim.simulationContext @@ -31,14 +31,18 @@ import com.atlarge.opendc.compute.core.ServerEvent import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel import com.atlarge.opendc.compute.core.workload.VmWorkload import com.atlarge.opendc.compute.metal.NODE_CLUSTER +import com.atlarge.opendc.compute.metal.driver.BareMetalDriver import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.compute.virt.HypervisorEvent import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService +import com.atlarge.opendc.compute.virt.service.VirtProvisioningEvent import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy import com.atlarge.opendc.core.failure.CorrelatedFaultInjector import com.atlarge.opendc.core.failure.FailureDomain import com.atlarge.opendc.core.failure.FaultInjector +import com.atlarge.opendc.experiments.sc20.experiment.monitor.ExperimentMonitor +import com.atlarge.opendc.experiments.sc20.trace.Sc20StreamingParquetTraceReader import com.atlarge.opendc.format.environment.EnvironmentReader import com.atlarge.opendc.format.trace.TraceReader import kotlinx.coroutines.ExperimentalCoroutinesApi @@ -66,7 +70,7 @@ private val logger = KotlinLogging.logger {} */ suspend fun createFailureDomain( seed: Int, - failureInterval: Int, + failureInterval: Double, bareMetalProvisioner: ProvisioningService, chan: Channel<Unit> ): Domain { @@ -79,7 +83,13 @@ suspend fun createFailureDomain( for (node in bareMetalProvisioner.nodes()) { val cluster = node.metadata[NODE_CLUSTER] as String val injector = - injectors.getOrPut(cluster) { createFaultInjector(simulationContext.domain, random, failureInterval) } + injectors.getOrPut(cluster) { + createFaultInjector( + simulationContext.domain, + random, + failureInterval + ) + } injector.enqueue(node.metadata["driver"] as FailureDomain) } } @@ -89,13 +99,13 @@ suspend fun createFailureDomain( /** * Obtain the [FaultInjector] to use for the experiments. */ -fun createFaultInjector(domain: Domain, random: Random, failureInterval: Int): FaultInjector { +fun createFaultInjector(domain: Domain, random: Random, failureInterval: Double): FaultInjector { // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 // GRID'5000 return CorrelatedFaultInjector( domain, - iatScale = ln(failureInterval.toDouble()), iatShape = 1.03, // Hours - sizeScale = 1.88, sizeShape = 1.25, + iatScale = ln(failureInterval), iatShape = 1.03, // Hours + sizeScale = ln(2.0), sizeShape = ln(1.0), // Expect 2 machines, with variation of 1 dScale = 9.51, dShape = 3.21, // Minutes random = random ) @@ -104,8 +114,13 @@ fun createFaultInjector(domain: Domain, random: Random, failureInterval: Int): F /** * Create the trace reader from which the VM workloads are read. */ -fun createTraceReader(path: File, performanceInterferenceModel: PerformanceInterferenceModel, vms: List<String>, seed: Int): Sc20ParquetTraceReader { - return Sc20ParquetTraceReader(path, performanceInterferenceModel, vms, Random(seed)) +fun createTraceReader(path: File, performanceInterferenceModel: PerformanceInterferenceModel, vms: List<String>, seed: Int): Sc20StreamingParquetTraceReader { + return Sc20StreamingParquetTraceReader( + path, + performanceInterferenceModel, + vms, + Random(seed) + ) } /** @@ -134,19 +149,21 @@ suspend fun createProvisioner( * Attach the specified monitor to the VM provisioner. */ @OptIn(ExperimentalCoroutinesApi::class) -suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: Sc20Reporter) { +suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: ExperimentMonitor) { val domain = simulationContext.domain + val clock = simulationContext.clock val hypervisors = scheduler.drivers() // Monitor hypervisor events for (hypervisor in hypervisors) { // TODO Do not expose VirtDriver directly but use Hypervisor class. - reporter.reportHostStateChange(hypervisor, (hypervisor as SimpleVirtDriver).server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) + monitor.reportHostStateChange(clock.millis(), hypervisor, (hypervisor as SimpleVirtDriver).server) hypervisor.server.events .onEach { event -> + val time = clock.millis() when (event) { is ServerEvent.StateChanged -> { - reporter.reportHostStateChange(hypervisor, event.server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) + monitor.reportHostStateChange(time, hypervisor, event.server) } } } @@ -154,7 +171,7 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: Sc hypervisor.events .onEach { event -> when (event) { - is HypervisorEvent.SliceFinished -> reporter.reportHostSlice( + is HypervisorEvent.SliceFinished -> monitor.reportHostSlice( simulationContext.clock.millis(), event.requestedBurst, event.grantedBurst, @@ -163,26 +180,36 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: Sc event.cpuUsage, event.cpuDemand, event.numberOfDeployedImages, - event.hostServer, - scheduler.submittedVms, - scheduler.queuedVms, - scheduler.runningVms, - scheduler.finishedVms + event.hostServer ) } } .launchIn(domain) + + val driver = hypervisor.server.services[BareMetalDriver.Key] + driver.powerDraw + .onEach { monitor.reportPowerConsumption(hypervisor.server, it) } + .launchIn(domain) } + + scheduler.events + .onEach { event -> + when (event) { + is VirtProvisioningEvent.MetricsAvailable -> + monitor.reportProvisionerMetrics(clock.millis(), event) + } + } + .launchIn(domain) } /** * Process the trace. */ -suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, reporter: Sc20Reporter, vmPlacements: Map<String, String> = emptyMap()) { +suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, monitor: ExperimentMonitor, vmPlacements: Map<String, String> = emptyMap()) { val domain = simulationContext.domain try { - var submitted = 0L + var submitted = 0 val finished = Channel<Unit>(Channel.CONFLATED) val hypervisors = TreeSet(scheduler.drivers().map { (it as SimpleVirtDriver).server.name }) @@ -215,8 +242,10 @@ suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtP // Monitor server events server.events .onEach { + val time = simulationContext.clock.millis() + if (it is ServerEvent.StateChanged) { - reporter.reportVmStateChange(it.server) + monitor.reportVmStateChange(time, it.server) } delay(1) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolio.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolio.kt new file mode 100644 index 00000000..6a40f5fb --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolio.kt @@ -0,0 +1,90 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.experiment + +import com.atlarge.opendc.experiments.sc20.experiment.model.OperationalPhenomena +import com.atlarge.opendc.experiments.sc20.experiment.model.Topology +import com.atlarge.opendc.experiments.sc20.experiment.model.Workload +import com.atlarge.opendc.experiments.sc20.runner.ContainerExperimentDescriptor + +/** + * A portfolio represents a collection of scenarios are tested. + */ +public abstract class Portfolio( + override val parent: Experiment, + val id: Int, + val name: String +) : ContainerExperimentDescriptor() { + /** + * The topologies to consider. + */ + protected abstract val topologies: List<Topology> + + /** + * The workloads to consider. + */ + protected abstract val workloads: List<Workload> + + /** + * The operational phenomenas to consider. + */ + protected abstract val operationalPhenomenas: List<OperationalPhenomena> + + /** + * The allocation policies to consider. + */ + protected abstract val allocationPolicies: List<String> + + /** + * The number of repetitions to perform. + */ + open val repetitions: Int = 32 + + /** + * Resolve the children of this container. + */ + override val children: Sequence<Scenario> = sequence { + var id = 0 + for (topology in topologies) { + for (workload in workloads) { + for (operationalPhenomena in operationalPhenomenas) { + for (allocationPolicy in allocationPolicies) { + yield( + Scenario( + this@Portfolio, + id++, + repetitions, + topology, + workload, + allocationPolicy, + operationalPhenomena + ) + ) + } + } + } + } + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt new file mode 100644 index 00000000..362144ae --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt @@ -0,0 +1,155 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.experiment + +import com.atlarge.opendc.experiments.sc20.experiment.model.OperationalPhenomena +import com.atlarge.opendc.experiments.sc20.experiment.model.Topology +import com.atlarge.opendc.experiments.sc20.experiment.model.Workload + +public class HorVerPortfolio(parent: Experiment, id: Int) : Portfolio(parent, id, "horizontal_vs_vertical") { + override val topologies = listOf( + Topology("base"), + Topology("rep-vol-hor-hom"), + Topology("rep-vol-hor-het"), + Topology("rep-vol-ver-hom"), + Topology("rep-vol-ver-het"), + Topology("exp-vol-hor-hom"), + Topology("exp-vol-hor-het"), + Topology("exp-vol-ver-hom"), + Topology("exp-vol-ver-het") + ) + + override val workloads = listOf( + Workload("solvinity", 0.1), + Workload("solvinity", 0.25), + Workload("solvinity", 0.5), + Workload("solvinity", 1.0) + ) + + override val operationalPhenomenas = listOf( + OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true) + ) + + override val allocationPolicies = listOf( + "active-servers" + ) +} + +public class MoreVelocityPortfolio(parent: Experiment, id: Int) : Portfolio(parent, id, "more_velocity") { + override val topologies = listOf( + Topology("base"), + Topology("rep-vel-ver-hom"), + Topology("rep-vel-ver-het"), + Topology("exp-vel-ver-hom"), + Topology("exp-vel-ver-het") + ) + + override val workloads = listOf( + Workload("solvinity", 0.1), + Workload("solvinity", 0.25), + Workload("solvinity", 0.5), + Workload("solvinity", 1.0) + ) + + override val operationalPhenomenas = listOf( + OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true) + ) + + override val allocationPolicies = listOf( + "active-servers" + ) +} + +public class MoreHpcPortfolio(parent: Experiment, id: Int) : Portfolio(parent, id, "more_hpc") { + override val topologies = listOf( + Topology("base"), + Topology("exp-vol-hor-hom"), + Topology("exp-vol-ver-hom"), + Topology("exp-vel-ver-hom") + ) + + override val workloads = listOf( + Workload("solvinity", 0.1), + Workload("solvinity", 0.25), + Workload("solvinity", 0.5), + Workload("solvinity", 1.0) + ) + + override val operationalPhenomenas = listOf( + OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true) + ) + + override val allocationPolicies = listOf( + "active-servers" + ) +} + +public class OperationalPhenomenaPortfolio(parent: Experiment, id: Int) : Portfolio(parent, id, "operational_phenomena") { + override val topologies = listOf( + Topology("base") + ) + + override val workloads = listOf( + Workload("solvinity", 0.1), + Workload("solvinity", 0.25), + Workload("solvinity", 0.5), + Workload("solvinity", 1.0) + ) + + override val operationalPhenomenas = listOf( + OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true), + OperationalPhenomena(failureFrequency = 0.0, hasInterference = true), + OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = false), + OperationalPhenomena(failureFrequency = 0.0, hasInterference = false) + ) + + override val allocationPolicies = listOf( + "mem", + "mem-inv", + "core-mem", + "core-mem-inv", + "active-servers", + "active-servers-inv", + "random" + ) +} + +public class TestPortfolio(parent: Experiment, id: Int) : Portfolio(parent, id, "test") { + override val repetitions: Int = 1 + + override val topologies: List<Topology> = listOf( + Topology("base") + ) + + override val workloads: List<Workload> = listOf( + Workload("solvinity", 1.0) + ) + + override val operationalPhenomenas: List<OperationalPhenomena> = listOf( + OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true) + ) + + override val allocationPolicies: List<String> = listOf("active-servers") +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt new file mode 100644 index 00000000..fd3e29c8 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt @@ -0,0 +1,146 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.experiment + +import com.atlarge.odcsim.SimulationEngineProvider +import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy +import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy +import com.atlarge.opendc.compute.virt.service.allocation.NumberOfActiveServersAllocationPolicy +import com.atlarge.opendc.compute.virt.service.allocation.ProvisionedCoresAllocationPolicy +import com.atlarge.opendc.compute.virt.service.allocation.RandomAllocationPolicy +import com.atlarge.opendc.compute.virt.service.allocation.ReplayAllocationPolicy +import com.atlarge.opendc.experiments.sc20.experiment.monitor.ParquetExperimentMonitor +import com.atlarge.opendc.experiments.sc20.runner.TrialExperimentDescriptor +import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionContext +import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader +import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader +import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader +import kotlinx.coroutines.cancel +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.launch +import mu.KotlinLogging +import java.io.File +import java.util.ServiceLoader +import kotlin.random.Random + +/** + * The logger for the experiment scenario. + */ +private val logger = KotlinLogging.logger {} + +/** + * The provider for the simulation engine to use. + */ +private val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() + +/** + * An experiment run represent a single invocation of a trial and is used to distinguish between repetitions of the + * same set of parameters. + */ +public data class Run(override val parent: Scenario, val id: Int, val seed: Int) : TrialExperimentDescriptor() { + override suspend fun invoke(context: ExperimentExecutionContext) { + val experiment = parent.parent.parent + val system = provider("experiment-$id") + val root = system.newDomain("root") + val seeder = Random(seed) + val environment = Sc20ClusterEnvironmentReader(File(experiment.environments, "${parent.topology.name}.txt")) + + val chan = Channel<Unit>(Channel.CONFLATED) + val allocationPolicy = when (parent.allocationPolicy) { + "mem" -> AvailableMemoryAllocationPolicy() + "mem-inv" -> AvailableMemoryAllocationPolicy(true) + "core-mem" -> AvailableCoreMemoryAllocationPolicy() + "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true) + "active-servers" -> NumberOfActiveServersAllocationPolicy() + "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true) + "provisioned-cores" -> ProvisionedCoresAllocationPolicy() + "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true) + "random" -> RandomAllocationPolicy(Random(seeder.nextInt())) + "replay" -> ReplayAllocationPolicy(emptyMap()) + else -> throw IllegalArgumentException("Unknown policy ${parent.allocationPolicy}") + } + + @Suppress("UNCHECKED_CAST") + val rawTraceReaders = context.cache.computeIfAbsent("raw-trace-readers") { mutableMapOf<String, Sc20RawParquetTraceReader>() } as MutableMap<String, Sc20RawParquetTraceReader> + val raw = synchronized(rawTraceReaders) { + val name = parent.workload.name + rawTraceReaders.computeIfAbsent(name) { + logger.info { "Loading trace $name" } + Sc20RawParquetTraceReader(File(experiment.traces, name)) + } + } + val performanceInterferenceModel = experiment.performanceInterferenceModel + ?.takeIf { parent.operationalPhenomena.hasInterference } + ?.construct(seeder) ?: emptyMap() + val trace = Sc20ParquetTraceReader(raw, performanceInterferenceModel, parent.workload, seed) + + val monitor = ParquetExperimentMonitor(this) + + root.launch { + val (bareMetalProvisioner, scheduler) = createProvisioner( + root, + environment, + allocationPolicy + ) + + val failureDomain = if (parent.operationalPhenomena.failureFrequency > 0) { + logger.debug("ENABLING failures") + createFailureDomain( + seeder.nextInt(), + parent.operationalPhenomena.failureFrequency, + bareMetalProvisioner, + chan + ) + } else { + null + } + + attachMonitor(scheduler, monitor) + processTrace( + trace, + scheduler, + chan, + monitor, + experiment.vmPlacements + ) + + logger.debug("SUBMIT=${scheduler.submittedVms}") + logger.debug("FAIL=${scheduler.unscheduledVms}") + logger.debug("QUEUED=${scheduler.queuedVms}") + logger.debug("RUNNING=${scheduler.runningVms}") + logger.debug("FINISHED=${scheduler.finishedVms}") + + failureDomain?.cancel() + scheduler.terminate() + } + + try { + system.run() + } finally { + system.terminate() + monitor.close() + } + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Scenario.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Scenario.kt new file mode 100644 index 00000000..98bc7fc2 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Scenario.kt @@ -0,0 +1,48 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.experiment + +import com.atlarge.opendc.experiments.sc20.experiment.model.OperationalPhenomena +import com.atlarge.opendc.experiments.sc20.experiment.model.Topology +import com.atlarge.opendc.experiments.sc20.experiment.model.Workload +import com.atlarge.opendc.experiments.sc20.runner.ContainerExperimentDescriptor +import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor + +/** + * A scenario represents a single point in the design space (a unique combination of parameters). + */ +public class Scenario( + override val parent: Portfolio, + val id: Int, + val repetitions: Int, + val topology: Topology, + val workload: Workload, + val allocationPolicy: String, + val operationalPhenomena: OperationalPhenomena +) : ContainerExperimentDescriptor() { + override val children: Sequence<ExperimentDescriptor> = sequence { + repeat(repetitions) { i -> yield(Run(this@Scenario, i, i)) } + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/OperationalPhenomena.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/OperationalPhenomena.kt new file mode 100644 index 00000000..af99df84 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/OperationalPhenomena.kt @@ -0,0 +1,33 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.experiment.model + +/** + * Operation phenomena during experiments. + * + * @param failureFrequency The average time between failures in hours. + * @param hasInterference A flag to enable performance interference between VMs. + */ +public data class OperationalPhenomena(val failureFrequency: Double, val hasInterference: Boolean) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Topology.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Topology.kt new file mode 100644 index 00000000..3ed71e09 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Topology.kt @@ -0,0 +1,30 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.experiment.model + +/** + * The datacenter topology on which we test the workload. + */ +public data class Topology(val name: String) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Workload.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Workload.kt new file mode 100644 index 00000000..2dbdf570 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Workload.kt @@ -0,0 +1,30 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.experiment.model + +/** + * A workload that is considered for a scenario. + */ +public class Workload(val name: String, val fraction: Double) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Reporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ExperimentMonitor.kt index 84500417..1f674f00 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Reporter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ExperimentMonitor.kt @@ -22,34 +22,40 @@ * SOFTWARE. */ -package com.atlarge.opendc.experiments.sc20 +package com.atlarge.opendc.experiments.sc20.experiment.monitor import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.virt.driver.VirtDriver +import com.atlarge.opendc.compute.virt.service.VirtProvisioningEvent import java.io.Closeable -interface Sc20Reporter : Closeable { +/** + * A monitor watches the events of an experiment. + */ +interface ExperimentMonitor : Closeable { /** * This method is invoked when the state of a VM changes. */ - suspend fun reportVmStateChange(server: Server) {} + fun reportVmStateChange(time: Long, server: Server) {} /** * This method is invoked when the state of a host changes. */ - suspend fun reportHostStateChange( + fun reportHostStateChange( + time: Long, driver: VirtDriver, - server: Server, - submittedVms: Long, - queuedVms: Long, - runningVms: Long, - finishedVms: Long + server: Server ) {} /** + * Report the power consumption of a host. + */ + fun reportPowerConsumption(host: Server, draw: Double) {} + + /** * This method is invoked for a host for each slice that is finishes. */ - suspend fun reportHostSlice( + fun reportHostSlice( time: Long, requestedBurst: Long, grantedBurst: Long, @@ -59,10 +65,11 @@ interface Sc20Reporter : Closeable { cpuDemand: Double, numberOfDeployedImages: Int, hostServer: Server, - submittedVms: Long, - queuedVms: Long, - runningVms: Long, - finishedVms: Long, duration: Long = 5 * 60 * 1000L ) {} + + /** + * This method is invoked for a provisioner event. + */ + fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) {} } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt new file mode 100644 index 00000000..33978aab --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt @@ -0,0 +1,145 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.experiment.monitor + +import com.atlarge.opendc.compute.core.Server +import com.atlarge.opendc.compute.core.ServerState +import com.atlarge.opendc.compute.virt.driver.VirtDriver +import com.atlarge.opendc.compute.virt.service.VirtProvisioningEvent +import com.atlarge.opendc.experiments.sc20.experiment.Run +import com.atlarge.opendc.experiments.sc20.telemetry.HostEvent +import com.atlarge.opendc.experiments.sc20.telemetry.ProvisionerEvent +import com.atlarge.opendc.experiments.sc20.telemetry.parquet.ParquetHostEventWriter +import com.atlarge.opendc.experiments.sc20.telemetry.parquet.ParquetProvisionerEventWriter +import mu.KotlinLogging +import java.io.File + +/** + * The logger instance to use. + */ +private val logger = KotlinLogging.logger {} + +/** + * An [ExperimentMonitor] that logs the events to a Parquet file. + */ +class ParquetExperimentMonitor(val run: Run) : ExperimentMonitor { + private val partition = "portfolio_id=${run.parent.parent.id}/scenario_id=${run.parent.id}/run_id=${run.id}" + private val hostWriter = ParquetHostEventWriter( + File(run.parent.parent.parent.output, "host-metrics/$partition/data.parquet"), + run.parent.parent.parent.bufferSize + ) + private val provisionerWriter = ParquetProvisionerEventWriter( + File(run.parent.parent.parent.output, "provisioner-metrics/$partition/data.parquet"), + run.parent.parent.parent.bufferSize + ) + private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>() + + override fun reportVmStateChange(time: Long, server: Server) {} + + override fun reportHostStateChange( + time: Long, + driver: VirtDriver, + server: Server + ) { + logger.debug("Host ${server.uid} changed state ${server.state} [$time]") + + val lastServerState = lastServerStates[server] + if (server.state == ServerState.SHUTOFF && lastServerState != null) { + val duration = time - lastServerState.second + reportHostSlice( + time, + 0, + 0, + 0, + 0, + 0.0, + 0.0, + 0, + server, + duration + ) + + lastServerStates.remove(server) + lastPowerConsumption.remove(server) + } else { + lastServerStates[server] = Pair(server.state, time) + } + } + + private val lastPowerConsumption = mutableMapOf<Server, Double>() + + override fun reportPowerConsumption(host: Server, draw: Double) { + lastPowerConsumption[host] = draw + } + + override fun reportHostSlice( + time: Long, + requestedBurst: Long, + grantedBurst: Long, + overcommissionedBurst: Long, + interferedBurst: Long, + cpuUsage: Double, + cpuDemand: Double, + numberOfDeployedImages: Int, + hostServer: Server, + duration: Long + ) { + hostWriter.write( + HostEvent( + time, + duration, + hostServer, + numberOfDeployedImages, + requestedBurst, + grantedBurst, + overcommissionedBurst, + interferedBurst, + cpuUsage, + cpuDemand, + lastPowerConsumption[hostServer] ?: 200.0 + ) + ) + } + + override fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) { + provisionerWriter.write( + ProvisionerEvent( + time, + event.totalHostCount, + event.availableHostCount, + event.totalVmCount, + event.activeVmCount, + event.inactiveVmCount, + event.waitingVmCount, + event.failedVmCount + ) + ) + } + + override fun close() { + hostWriter.close() + provisionerWriter.close() + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt new file mode 100644 index 00000000..f59402d5 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt @@ -0,0 +1,75 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.reporter + +import com.atlarge.opendc.experiments.sc20.experiment.Run +import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor +import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionListener +import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionResult +import me.tongfei.progressbar.ProgressBar +import me.tongfei.progressbar.ProgressBarBuilder + +/** + * A reporter that reports the experiment progress to the console. + */ +public class ConsoleExperimentReporter : ExperimentExecutionListener { + /** + * The active [Run]s. + */ + private val runs: MutableSet<Run> = mutableSetOf() + + /** + * The total number of runs. + */ + private var total = 0 + + /** + * The progress bar to keep track of the progress. + */ + private val pb: ProgressBar = ProgressBarBuilder() + .setTaskName("") + .setInitialMax(1) + .build() + + override fun descriptorRegistered(descriptor: ExperimentDescriptor) { + if (descriptor is Run) { + runs += descriptor + pb.maxHint((++total).toLong()) + } + } + + override fun executionFinished(descriptor: ExperimentDescriptor, result: ExperimentExecutionResult) { + if (descriptor is Run) { + runs -= descriptor + + pb.stepTo(total - runs.size.toLong()) + if (runs.isEmpty()) { + pb.close() + } + } + } + + override fun executionStarted(descriptor: ExperimentDescriptor) {} +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ContainerExperimentDescriptor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ContainerExperimentDescriptor.kt new file mode 100644 index 00000000..dac32586 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ContainerExperimentDescriptor.kt @@ -0,0 +1,68 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.runner + +import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionContext +import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionResult +import kotlinx.coroutines.launch +import kotlinx.coroutines.supervisorScope + +/** + * An abstract [ExperimentDescriptor] specifically for containers. + */ +public abstract class ContainerExperimentDescriptor : ExperimentDescriptor() { + /** + * The child descriptors of this container. + */ + public abstract val children: Sequence<ExperimentDescriptor> + + override val type: Type = Type.CONTAINER + + override suspend fun invoke(context: ExperimentExecutionContext) { + val materializedChildren = children.toList() + for (child in materializedChildren) { + context.listener.descriptorRegistered(child) + } + + supervisorScope { + for (child in materializedChildren) { + if (child.isTrial) { + launch { + val worker = context.scheduler.allocate() + context.listener.executionStarted(child) + try { + worker(child, context) + context.listener.executionFinished(child, ExperimentExecutionResult.Success) + } catch (e: Throwable) { + context.listener.executionFinished(child, ExperimentExecutionResult.Failed(e)) + } + } + } else { + launch { child(context) } + } + } + } + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ExperimentDescriptor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ExperimentDescriptor.kt new file mode 100644 index 00000000..64b6b767 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ExperimentDescriptor.kt @@ -0,0 +1,81 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.runner + +import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionContext +import java.io.Serializable + +/** + * An immutable description of an experiment in the **odcsim* simulation framework, which may be a single atomic trial + * or a composition of multiple trials. + * + * This class represents a dynamic tree-like structure where the children of the nodes are not known at instantiation + * since they might be generated dynamically. + */ +public abstract class ExperimentDescriptor : Serializable { + /** + * The parent of this descriptor, or `null` if it has no parent. + */ + public abstract val parent: ExperimentDescriptor? + + /** + * The type of descriptor. + */ + abstract val type: Type + + /** + * A flag to indicate that this descriptor is a root descriptor. + */ + public open val isRoot: Boolean + get() = parent == null + + /** + * A flag to indicate that this descriptor describes an experiment trial. + */ + val isTrial: Boolean + get() = type == Type.TRIAL + + /** + * Execute this [ExperimentDescriptor]. + * + * @param context The context to execute the descriptor in. + */ + public abstract suspend operator fun invoke(context: ExperimentExecutionContext) + + /** + * The types of experiment descriptors. + */ + enum class Type { + /** + * A composition of multiple experiment descriptions whose invocation happens on a single thread. + */ + CONTAINER, + + /** + * An invocation of a single scenario of an experiment whose invocation may happen on different threads. + */ + TRIAL + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ExperimentRunner.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ExperimentRunner.kt new file mode 100644 index 00000000..77f970fe --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ExperimentRunner.kt @@ -0,0 +1,51 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.runner + +import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionListener + +/** + * An [ExperimentRunner] facilitates discovery and execution of experiments. + */ +public interface ExperimentRunner { + /** + * The unique identifier of this runner. + */ + val id: String + + /** + * The version of this runner. + */ + val version: String? + get() = null + + /** + * Execute the specified experiment represented as [ExperimentDescriptor]. + * + * @param root The experiment to execute. + * @param listener The listener to report events to. + */ + public fun execute(root: ExperimentDescriptor, listener: ExperimentExecutionListener) +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/TrialExperimentDescriptor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/TrialExperimentDescriptor.kt new file mode 100644 index 00000000..cf05416a --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/TrialExperimentDescriptor.kt @@ -0,0 +1,32 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.runner + +/** + * An abstract [ExperimentDescriptor] specifically for trials. + */ +public abstract class TrialExperimentDescriptor : ExperimentDescriptor() { + override val type: Type = Type.TRIAL +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionContext.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionContext.kt new file mode 100644 index 00000000..9a04c491 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionContext.kt @@ -0,0 +1,45 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.runner.execution + +/** + * The execution context of an experiment. + */ +public interface ExperimentExecutionContext { + /** + * The execution listener to use. + */ + public val listener: ExperimentExecutionListener + + /** + * The experiment scheduler to use. + */ + public val scheduler: ExperimentScheduler + + /** + * A cache for objects within a single runner. + */ + public val cache: MutableMap<Any?, Any?> +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionListener.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionListener.kt new file mode 100644 index 00000000..f6df0524 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionListener.kt @@ -0,0 +1,48 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.runner.execution + +import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor + +/** + * Listener to be notified of experiment execution events by experiment runners. + */ +interface ExperimentExecutionListener { + /** + * A method that is invoked when a new [ExperimentDescriptor] is registered. + */ + fun descriptorRegistered(descriptor: ExperimentDescriptor) + + /** + * A method that is invoked when when the execution of a leaf or subtree of the experiment tree has finished, + * regardless of the outcome. + */ + fun executionFinished(descriptor: ExperimentDescriptor, result: ExperimentExecutionResult) + + /** + * A method that is invoked when the execution of a leaf or subtree of the experiment tree is about to be started. + */ + fun executionStarted(descriptor: ExperimentDescriptor) +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionResult.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionResult.kt new file mode 100644 index 00000000..057e1f92 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionResult.kt @@ -0,0 +1,42 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.runner.execution + +import java.io.Serializable + +/** + * The result of executing an experiment. + */ +public sealed class ExperimentExecutionResult : Serializable { + /** + * The experiment executed successfully + */ + public object Success : ExperimentExecutionResult() + + /** + * The experiment failed during execution. + */ + public data class Failed(val throwable: Throwable) : ExperimentExecutionResult() +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentScheduler.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentScheduler.kt new file mode 100644 index 00000000..0346a7f8 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentScheduler.kt @@ -0,0 +1,59 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.runner.execution + +import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor +import java.io.Closeable + +/** + * A interface for scheduling the execution of experiment trials over compute resources (threads/containers/vms) + */ +interface ExperimentScheduler : Closeable { + /** + * Allocate a [Worker] for executing an experiment trial. This method may suspend in case no resources are directly + * available at the moment. + * + * @return The available worker. + */ + suspend fun allocate(): ExperimentScheduler.Worker + + /** + * An isolated worker of an [ExperimentScheduler] that is responsible for executing a single experiment trial. + */ + interface Worker { + /** + * Dispatch the specified [ExperimentDescriptor] to execute some time in the future and return the results of + * the trial. + * + * @param descriptor The descriptor to execute. + * @param context The context to execute the descriptor in. + * @return The results of the experiment trial. + */ + suspend operator fun invoke( + descriptor: ExperimentDescriptor, + context: ExperimentExecutionContext + ): ExperimentExecutionResult + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt new file mode 100644 index 00000000..31632b8c --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt @@ -0,0 +1,85 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.runner.execution + +import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor +import kotlinx.coroutines.asCoroutineDispatcher +import kotlinx.coroutines.launch +import kotlinx.coroutines.supervisorScope +import kotlinx.coroutines.sync.Semaphore +import kotlinx.coroutines.withContext +import java.util.concurrent.Executors + +/** + * An [ExperimentScheduler] that runs experiments using a local thread pool. + * + * @param parallelism The maximum amount of parallel workers (default is the number of available processors). + */ +class ThreadPoolExperimentScheduler(parallelism: Int = Runtime.getRuntime().availableProcessors() + 1) : ExperimentScheduler { + private val dispatcher = Executors.newCachedThreadPool().asCoroutineDispatcher() + private val tickets = Semaphore(parallelism) + + override suspend fun allocate(): ExperimentScheduler.Worker { + tickets.acquire() + return object : ExperimentScheduler.Worker { + override suspend fun invoke( + descriptor: ExperimentDescriptor, + context: ExperimentExecutionContext + ): ExperimentExecutionResult = supervisorScope { + val listener = + object : ExperimentExecutionListener { + override fun descriptorRegistered(descriptor: ExperimentDescriptor) { + launch { context.listener.descriptorRegistered(descriptor) } + } + + override fun executionFinished(descriptor: ExperimentDescriptor, result: ExperimentExecutionResult) { + launch { context.listener.executionFinished(descriptor, result) } + } + + override fun executionStarted(descriptor: ExperimentDescriptor) { + launch { context.listener.executionStarted(descriptor) } + } + } + + val newContext = object : ExperimentExecutionContext by context { + override val listener: ExperimentExecutionListener = listener + } + + try { + withContext(dispatcher) { + descriptor(newContext) + ExperimentExecutionResult.Success + } + } catch (e: Throwable) { + ExperimentExecutionResult.Failed(e) + } finally { + tickets.release() + } + } + } + } + + override fun close() = dispatcher.close() +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt new file mode 100644 index 00000000..3b80276f --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt @@ -0,0 +1,62 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.runner.internal + +import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor +import com.atlarge.opendc.experiments.sc20.runner.ExperimentRunner +import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionContext +import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionListener +import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionResult +import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentScheduler +import kotlinx.coroutines.runBlocking +import java.util.concurrent.ConcurrentHashMap + +/** + * The default implementation of the [ExperimentRunner] interface. + * + * @param scheduler The scheduler to use. + */ +public class DefaultExperimentRunner(val scheduler: ExperimentScheduler) : ExperimentRunner { + override val id: String = "default" + + override val version: String? = "1.0" + + override fun execute(root: ExperimentDescriptor, listener: ExperimentExecutionListener) = runBlocking { + val context = object : ExperimentExecutionContext { + override val listener: ExperimentExecutionListener = listener + override val scheduler: ExperimentScheduler = this@DefaultExperimentRunner.scheduler + override val cache: MutableMap<Any?, Any?> = ConcurrentHashMap() + } + + listener.descriptorRegistered(root) + context.listener.executionStarted(root) + try { + root(context) + context.listener.executionFinished(root, ExperimentExecutionResult.Success) + } catch (e: Throwable) { + context.listener.executionFinished(root, ExperimentExecutionResult.Failed(e)) + } + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/Event.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/Event.kt new file mode 100644 index 00000000..c1e14e2a --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/Event.kt @@ -0,0 +1,35 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.telemetry + +/** + * An event that occurs within the system. + */ +public abstract class Event(val name: String) { + /** + * The time of occurrence of this event. + */ + public abstract val timestamp: Long +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/HostEvent.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/HostEvent.kt new file mode 100644 index 00000000..8e91bca2 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/HostEvent.kt @@ -0,0 +1,44 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.telemetry + +import com.atlarge.opendc.compute.core.Server + +/** + * A periodic report of the host machine metrics. + */ +data class HostEvent( + override val timestamp: Long, + val duration: Long, + val host: Server, + val vmCount: Int, + val requestedBurst: Long, + val grantedBurst: Long, + val overcommissionedBurst: Long, + val interferedBurst: Long, + val cpuUsage: Double, + val cpuDemand: Double, + val powerDraw: Double +) : Event("host-metrics") diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/ProvisionerEvent.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/ProvisionerEvent.kt new file mode 100644 index 00000000..df619632 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/ProvisionerEvent.kt @@ -0,0 +1,39 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.telemetry + +/** + * A periodic report of the provisioner's metrics. + */ +data class ProvisionerEvent( + override val timestamp: Long, + val totalHostCount: Int, + val availableHostCount: Int, + val totalVmCount: Int, + val activeVmCount: Int, + val inactiveVmCount: Int, + val waitingVmCount: Int, + val failedVmCount: Int +) : Event("provisioner-metrics") diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/RunEvent.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/RunEvent.kt new file mode 100644 index 00000000..497d2c3f --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/RunEvent.kt @@ -0,0 +1,35 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.telemetry + +import com.atlarge.opendc.experiments.sc20.experiment.Run + +/** + * A periodic report of the host machine metrics. + */ +data class RunEvent( + val run: Run, + override val timestamp: Long +) : Event("run") diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/VmEvent.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/VmEvent.kt new file mode 100644 index 00000000..7289fb21 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/VmEvent.kt @@ -0,0 +1,43 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.telemetry + +import com.atlarge.opendc.compute.core.Server + +/** + * A periodic report of a virtual machine's metrics. + */ +data class VmEvent( + override val timestamp: Long, + val duration: Long, + val vm: Server, + val host: Server, + val requestedBurst: Long, + val grantedBurst: Long, + val overcommissionedBurst: Long, + val interferedBurst: Long, + val cpuUsage: Double, + val cpuDemand: Double +) : Event("vm-metrics") diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt new file mode 100644 index 00000000..a69bd4b2 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt @@ -0,0 +1,121 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.telemetry.parquet + +import com.atlarge.opendc.experiments.sc20.telemetry.Event +import mu.KotlinLogging +import org.apache.avro.Schema +import org.apache.avro.generic.GenericData +import org.apache.hadoop.fs.Path +import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.metadata.CompressionCodecName +import java.io.Closeable +import java.io.File +import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.BlockingQueue +import kotlin.concurrent.thread + +/** + * The logging instance to use. + */ +private val logger = KotlinLogging.logger {} + +/** + * A writer that writes events in Parquet format. + */ +public open class ParquetEventWriter<in T : Event>( + private val path: File, + private val schema: Schema, + private val converter: (T, GenericData.Record) -> Unit, + private val bufferSize: Int = 4096 +) : Runnable, Closeable { + /** + * The queue of commands to process. + */ + private val queue: BlockingQueue<Action> = ArrayBlockingQueue(bufferSize) + + /** + * The thread that is responsible for writing the Parquet records. + */ + private val writerThread = thread(start = true, name = "parquet-writer") { run() } + + /** + * Write the specified metrics to the database. + */ + public fun write(event: T) { + queue.put(Action.Write(event)) + } + + /** + * Signal the writer to stop. + */ + public override fun close() { + queue.put(Action.Stop) + writerThread.join() + } + + /** + * Start the writer thread. + */ + override fun run() { + val writer = AvroParquetWriter.builder<GenericData.Record>(Path(path.absolutePath)) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withPageSize(4 * 1024 * 1024) // For compression + .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) + .build() + + try { + loop@ while (true) { + val action = queue.take() + when (action) { + is Action.Stop -> break@loop + is Action.Write<*> -> { + val record = GenericData.Record(schema) + @Suppress("UNCHECKED_CAST") + converter(action.event as T, record) + writer.write(record) + } + } + } + } catch (e: Throwable) { + logger.error("Writer failed", e) + } finally { + writer.close() + } + } + + sealed class Action { + /** + * A poison pill that will stop the writer thread. + */ + object Stop : Action() + + /** + * Write the specified metrics to the database. + */ + data class Write<out T : Event>(val event: T) : Action() + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt new file mode 100644 index 00000000..523897b0 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt @@ -0,0 +1,81 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.telemetry.parquet + +import com.atlarge.opendc.experiments.sc20.telemetry.HostEvent +import org.apache.avro.Schema +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericData +import java.io.File + +/** + * A Parquet event writer for [HostEvent]s. + */ +public class ParquetHostEventWriter(path: File, bufferSize: Int) : + ParquetEventWriter<HostEvent>(path, schema, convert, bufferSize) { + + override fun toString(): String = "host-writer" + + companion object { + val convert: (HostEvent, GenericData.Record) -> Unit = { event, record -> + // record.put("portfolio_id", event.run.parent.parent.id) + // record.put("scenario_id", event.run.parent.id) + // record.put("run_id", event.run.id) + record.put("host_id", event.host.name) + record.put("state", event.host.state.name) + record.put("timestamp", event.timestamp) + record.put("duration", event.duration) + record.put("vm_count", event.vmCount) + record.put("requested_burst", event.requestedBurst) + record.put("granted_burst", event.grantedBurst) + record.put("overcommissioned_burst", event.overcommissionedBurst) + record.put("interfered_burst", event.interferedBurst) + record.put("cpu_usage", event.cpuUsage) + record.put("cpu_demand", event.cpuDemand) + record.put("power_draw", event.powerDraw * (1.0 / 12)) + } + + val schema: Schema = SchemaBuilder + .record("host_metrics") + .namespace("com.atlarge.opendc.experiments.sc20") + .fields() + // .name("portfolio_id").type().intType().noDefault() + // .name("scenario_id").type().intType().noDefault() + // .name("run_id").type().intType().noDefault() + .name("timestamp").type().longType().noDefault() + .name("duration").type().longType().noDefault() + .name("host_id").type().stringType().noDefault() + .name("state").type().stringType().noDefault() + .name("vm_count").type().intType().noDefault() + .name("requested_burst").type().longType().noDefault() + .name("granted_burst").type().longType().noDefault() + .name("overcommissioned_burst").type().longType().noDefault() + .name("interfered_burst").type().longType().noDefault() + .name("cpu_usage").type().doubleType().noDefault() + .name("cpu_demand").type().doubleType().noDefault() + .name("power_draw").type().doubleType().noDefault() + .endRecord() + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt new file mode 100644 index 00000000..1f3b0472 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt @@ -0,0 +1,67 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.telemetry.parquet + +import com.atlarge.opendc.experiments.sc20.telemetry.ProvisionerEvent +import org.apache.avro.Schema +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericData +import java.io.File + +/** + * A Parquet event writer for [ProvisionerEvent]s. + */ +public class ParquetProvisionerEventWriter(path: File, bufferSize: Int) : + ParquetEventWriter<ProvisionerEvent>(path, schema, convert, bufferSize) { + + override fun toString(): String = "provisioner-writer" + + companion object { + val convert: (ProvisionerEvent, GenericData.Record) -> Unit = { event, record -> + record.put("timestamp", event.timestamp) + record.put("host_total_count", event.totalHostCount) + record.put("host_available_count", event.availableHostCount) + record.put("vm_total_count", event.totalVmCount) + record.put("vm_active_count", event.activeVmCount) + record.put("vm_inactive_count", event.inactiveVmCount) + record.put("vm_waiting_count", event.waitingVmCount) + record.put("vm_failed_count", event.failedVmCount) + } + + val schema: Schema = SchemaBuilder + .record("provisioner_metrics") + .namespace("com.atlarge.opendc.experiments.sc20") + .fields() + .name("timestamp").type().longType().noDefault() + .name("host_total_count").type().intType().noDefault() + .name("host_available_count").type().intType().noDefault() + .name("vm_total_count").type().intType().noDefault() + .name("vm_active_count").type().intType().noDefault() + .name("vm_inactive_count").type().intType().noDefault() + .name("vm_waiting_count").type().intType().noDefault() + .name("vm_failed_count").type().intType().noDefault() + .endRecord() + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt new file mode 100644 index 00000000..1549b8d2 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt @@ -0,0 +1,78 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.telemetry.parquet + +import com.atlarge.opendc.experiments.sc20.telemetry.RunEvent +import org.apache.avro.Schema +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericData +import java.io.File + +/** + * A Parquet event writer for [RunEvent]s. + */ +public class ParquetRunEventWriter(path: File, bufferSize: Int) : + ParquetEventWriter<RunEvent>(path, schema, convert, bufferSize) { + + override fun toString(): String = "run-writer" + + companion object { + val convert: (RunEvent, GenericData.Record) -> Unit = { event, record -> + val run = event.run + val scenario = run.parent + val portfolio = scenario.parent + record.put("portfolio_id", portfolio.id) + record.put("portfolio_name", portfolio.name) + record.put("scenario_id", scenario.id) + record.put("run_id", run.id) + record.put("repetitions", scenario.repetitions) + record.put("topology", scenario.topology.name) + record.put("workload_name", scenario.workload.name) + record.put("workload_fraction", scenario.workload.fraction) + record.put("allocation_policy", scenario.allocationPolicy) + record.put("failure_frequency", scenario.operationalPhenomena.failureFrequency) + record.put("interference", scenario.operationalPhenomena.hasInterference) + record.put("seed", run.seed) + } + + val schema: Schema = SchemaBuilder + .record("runs") + .namespace("com.atlarge.opendc.experiments.sc20") + .fields() + .name("portfolio_id").type().intType().noDefault() + .name("portfolio_name").type().stringType().noDefault() + .name("scenario_id").type().intType().noDefault() + .name("run_id").type().intType().noDefault() + .name("repetitions").type().intType().noDefault() + .name("topology").type().stringType().noDefault() + .name("workload_name").type().stringType().noDefault() + .name("workload_fraction").type().doubleType().noDefault() + .name("allocation_policy").type().stringType().noDefault() + .name("failure_frequency").type().doubleType().noDefault() + .name("interference").type().booleanType().noDefault() + .name("seed").type().intType().noDefault() + .endRecord() + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt new file mode 100644 index 00000000..ad50bf18 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt @@ -0,0 +1,88 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.trace + +import com.atlarge.opendc.compute.core.image.VmImage +import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel +import com.atlarge.opendc.compute.core.workload.VmWorkload +import com.atlarge.opendc.experiments.sc20.experiment.model.Workload +import com.atlarge.opendc.format.trace.TraceEntry +import com.atlarge.opendc.format.trace.TraceReader +import java.util.TreeSet + +/** + * A [TraceReader] for the internal VM workload trace format. + * + * @param reader The internal trace reader to use. + * @param performanceInterferenceModel The performance model covering the workload in the VM trace. + * @param run The run to which this reader belongs. + */ +@OptIn(ExperimentalStdlibApi::class) +class Sc20ParquetTraceReader( + raw: Sc20RawParquetTraceReader, + performanceInterferenceModel: Map<String, PerformanceInterferenceModel>, + workload: Workload, + seed: Int +) : TraceReader<VmWorkload> { + /** + * The iterator over the actual trace. + */ + private val iterator: Iterator<TraceEntry<VmWorkload>> = + raw.read() + .run { sampleWorkload(this, workload, seed) } + .run { + // Apply performance interference model + if (performanceInterferenceModel.isEmpty()) + this + else { + map { entry -> + val image = entry.workload.image + val id = image.name + val relevantPerformanceInterferenceModelItems = + performanceInterferenceModel[id] ?: PerformanceInterferenceModel(TreeSet()) + + val newImage = + VmImage( + image.uid, + image.name, + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), + image.flopsHistory, + image.maxCores, + image.requiredMemory + ) + val newWorkload = entry.workload.copy(image = newImage) + Sc20RawParquetTraceReader.TraceEntryImpl(entry.submissionTime, newWorkload) + } + } + } + .iterator() + + override fun hasNext(): Boolean = iterator.hasNext() + + override fun next(): TraceEntry<VmWorkload> = iterator.next() + + override fun close() {} +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt new file mode 100644 index 00000000..3b480d33 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt @@ -0,0 +1,167 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.trace + +import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment +import com.atlarge.opendc.compute.core.image.VmImage +import com.atlarge.opendc.compute.core.workload.VmWorkload +import com.atlarge.opendc.core.User +import com.atlarge.opendc.format.trace.TraceEntry +import com.atlarge.opendc.format.trace.TraceReader +import mu.KotlinLogging +import org.apache.avro.generic.GenericData +import org.apache.hadoop.fs.Path +import org.apache.parquet.avro.AvroParquetReader +import java.io.File +import java.util.UUID + +private val logger = KotlinLogging.logger {} + +/** + * A [TraceReader] for the internal VM workload trace format. + * + * @param path The directory of the traces. + */ +@OptIn(ExperimentalStdlibApi::class) +class Sc20RawParquetTraceReader(private val path: File) { + /** + * Read the fragments into memory. + */ + private fun parseFragments(path: File): Map<String, List<FlopsHistoryFragment>> { + val reader = AvroParquetReader.builder<GenericData.Record>(Path(path.absolutePath, "trace.parquet")) + .disableCompatibility() + .build() + + val fragments = mutableMapOf<String, MutableList<FlopsHistoryFragment>>() + + return try { + while (true) { + val record = reader.read() ?: break + + val id = record["id"].toString() + val tick = record["time"] as Long + val duration = record["duration"] as Long + val cores = record["cores"] as Int + val cpuUsage = record["cpuUsage"] as Double + val flops = record["flops"] as Long + + val fragment = FlopsHistoryFragment( + tick, + flops, + duration, + cpuUsage, + cores + ) + + fragments.getOrPut(id) { mutableListOf() }.add(fragment) + } + + fragments + } finally { + reader.close() + } + } + + /** + * Read the metadata into a workload. + */ + private fun parseMeta(path: File, fragments: Map<String, List<FlopsHistoryFragment>>): List<TraceEntryImpl> { + val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(path.absolutePath, "meta.parquet")) + .disableCompatibility() + .build() + + var counter = 0 + val entries = mutableListOf<TraceEntryImpl>() + + return try { + while (true) { + val record = metaReader.read() ?: break + val id = record["id"].toString() + val submissionTime = record["submissionTime"] as Long + val endTime = record["endTime"] as Long + val maxCores = record["maxCores"] as Int + val requiredMemory = record["requiredMemory"] as Long + val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray()) + + logger.info { "VM $id" } + + val vmFragments = fragments.getValue(id).asSequence() + val totalLoad = vmFragments.sumByDouble { it.usage } * 5 * 60 // avg MHz * duration = MFLOPs + val vmWorkload = VmWorkload( + uid, id, + UnnamedUser, + VmImage( + uid, + id, + mapOf( + "submit-time" to submissionTime, + "end-time" to endTime, + "total-load" to totalLoad + ), + vmFragments, + maxCores, + requiredMemory + ) + ) + entries.add(TraceEntryImpl(submissionTime, vmWorkload)) + } + + entries + } finally { + metaReader.close() + } + } + + /** + * The entries in the trace. + */ + private val entries: List<TraceEntryImpl> + + init { + val fragments = parseFragments(path) + entries = parseMeta(path, fragments) + } + + /** + * Read the entries in the trace. + */ + public fun read(): List<TraceEntry<VmWorkload>> = entries + + /** + * An unnamed user. + */ + private object UnnamedUser : User { + override val name: String = "<unnamed>" + override val uid: UUID = UUID.randomUUID() + } + + /** + * An entry in the trace. + */ + internal data class TraceEntryImpl( + override var submissionTime: Long, + override val workload: VmWorkload + ) : TraceEntry<VmWorkload> +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt index 8ae1693c..f6d6e6fd 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2019 atlarge-research + * Copyright (c) 2020 atlarge-research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,7 +22,7 @@ * SOFTWARE. */ -package com.atlarge.opendc.experiments.sc20 +package com.atlarge.opendc.experiments.sc20.trace import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment import com.atlarge.opendc.compute.core.image.VmImage @@ -53,13 +53,13 @@ import kotlin.random.Random private val logger = KotlinLogging.logger {} /** - * A [TraceReader] for the internal VM workload trace format. + * A [TraceReader] for the internal VM workload trace format that streams workloads on the fly. * * @param traceFile The directory of the traces. * @param performanceInterferenceModel The performance model covering the workload in the VM trace. */ @OptIn(ExperimentalStdlibApi::class) -class Sc20ParquetTraceReader( +class Sc20StreamingParquetTraceReader( traceFile: File, performanceInterferenceModel: PerformanceInterferenceModel, selectedVms: List<String>, @@ -82,7 +82,11 @@ class Sc20ParquetTraceReader( if (selectedVms.isEmpty()) null else - FilterCompat.get(FilterApi.userDefined(FilterApi.binaryColumn("id"), SelectedVmFilter(TreeSet(selectedVms)))) + FilterCompat.get(FilterApi.userDefined(FilterApi.binaryColumn("id"), + SelectedVmFilter( + TreeSet(selectedVms) + ) + )) /** * A poisonous fragment. @@ -227,11 +231,12 @@ class Sc20ParquetTraceReader( } val relevantPerformanceInterferenceModelItems = PerformanceInterferenceModel( - performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSet(), + performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSortedSet(), Random(random.nextInt()) ) val vmWorkload = VmWorkload( - uid, "VM Workload $id", UnnamedUser, + uid, "VM Workload $id", + UnnamedUser, VmImage( uid, id, @@ -242,7 +247,10 @@ class Sc20ParquetTraceReader( ) ) - TraceEntryImpl(submissionTime, vmWorkload) + TraceEntryImpl( + submissionTime, + vmWorkload + ) } .sortedBy { it.submissionTime } .toList() diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt index c62f59f9..04cdd302 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt @@ -22,7 +22,7 @@ * SOFTWARE. */ -package com.atlarge.opendc.experiments.sc20 +package com.atlarge.opendc.experiments.sc20.trace import org.apache.avro.SchemaBuilder import org.apache.avro.generic.GenericData @@ -139,7 +139,14 @@ fun main(args: Array<String>) { ) } else { val fragment = - Fragment(vmId, timestamp, flops, traceInterval, cpuUsage, cores) + Fragment( + vmId, + timestamp, + flops, + traceInterval, + cpuUsage, + cores + ) if (last != null) { yield(last!!) } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt new file mode 100644 index 00000000..a8580686 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt @@ -0,0 +1,69 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.trace + +import com.atlarge.opendc.compute.core.workload.VmWorkload +import com.atlarge.opendc.experiments.sc20.experiment.model.Workload +import com.atlarge.opendc.format.trace.TraceEntry +import mu.KotlinLogging +import kotlin.random.Random + +private val logger = KotlinLogging.logger {} + +/** + * Sample the workload for the specified [run]. + */ +fun sampleWorkload(trace: List<TraceEntry<VmWorkload>>, workload: Workload, seed: Int): List<TraceEntry<VmWorkload>> { + return sampleRegularWorkload(trace, workload, seed) +} + +/** + * Sample a regular (non-HPC) workload. + */ +fun sampleRegularWorkload(trace: List<TraceEntry<VmWorkload>>, workload: Workload, seed: Int): List<TraceEntry<VmWorkload>> { + val fraction = workload.fraction + if (fraction >= 1) { + return trace + } + + val shuffled = trace.shuffled(Random(seed)) + val res = mutableListOf<TraceEntry<VmWorkload>>() + val totalLoad = shuffled.sumByDouble { it.workload.image.tags.getValue("total-load") as Double } + var currentLoad = 0.0 + + for (entry in shuffled) { + val entryLoad = entry.workload.image.tags.getValue("total-load") as Double + if ((currentLoad + entryLoad) / totalLoad > fraction) { + break + } + + currentLoad += entryLoad + res += entry + } + + logger.info { "Sampled ${trace.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } + + return res +} diff --git a/opendc/opendc-experiments-sc20/src/main/resources/env/performance-interference.json b/opendc/opendc-experiments-sc20/src/main/resources/env/performance-interference.json deleted file mode 100644 index 87a4f8af..00000000 --- a/opendc/opendc-experiments-sc20/src/main/resources/env/performance-interference.json +++ /dev/null @@ -1,7 +0,0 @@ -[ - { - "vms": [545, 223], - "minServerLoad": 0.84, - "performanceScore": 0.6 - } -] diff --git a/opendc/opendc-experiments-sc20/src/main/resources/env/setup-small.json b/opendc/opendc-experiments-sc20/src/main/resources/env/setup-small.json deleted file mode 100644 index 80b24dba..00000000 --- a/opendc/opendc-experiments-sc20/src/main/resources/env/setup-small.json +++ /dev/null @@ -1,21 +0,0 @@ -{ - "name": "Experimental Setup 2", - "rooms": [ - { - "type": "SERVER", - "objects": [ - { - "type": "RACK", - "machines": [ - {"cpus": [1], "memories": [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]}, - {"cpus": [1], "memories": [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]}, - {"cpus": [1], "memories": [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]}, - {"cpus": [1], "memories": [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]}, - {"cpus": [1], "memories": [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]}, - {"cpus": [1], "memories": [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]} - ] - } - ] - } - ] -} diff --git a/opendc/opendc-experiments-sc20/src/main/resources/env/setup-test.json b/opendc/opendc-experiments-sc20/src/main/resources/env/setup-test.json deleted file mode 100644 index 02a1d25b..00000000 --- a/opendc/opendc-experiments-sc20/src/main/resources/env/setup-test.json +++ /dev/null @@ -1,36 +0,0 @@ -{ - "name": "Experimental Setup 2", - "rooms": [ - { - "type": "SERVER", - "objects": [ - { - "type": "RACK", - "machines": [ - {"cpus": [2], "memories": [1, 1, 1, 1]}, {"cpus": [2], "memories": [1, 1, 1, 1]}, - {"cpus": [2], "memories": [1, 1, 1, 1]}, {"cpus": [2], "memories": [1, 1, 1, 1]}, - {"cpus": [2], "memories": [1, 1, 1, 1]}, {"cpus": [2], "memories": [1, 1, 1, 1]}, - {"cpus": [2], "memories": [1, 1, 1, 1]}, {"cpus": [2], "memories": [1, 1, 1, 1]}, - {"cpus": [2], "memories": [1, 1, 1, 1]}, {"cpus": [2], "memories": [1, 1, 1, 1]}, - {"cpus": [2], "memories": [1, 1, 1, 1]}, {"cpus": [2], "memories": [1, 1, 1, 1]}, - {"cpus": [2], "memories": [1, 1, 1, 1]}, {"cpus": [2], "memories": [1, 1, 1, 1]}, - {"cpus": [2], "memories": [1, 1, 1, 1]}, {"cpus": [2], "memories": [1, 1, 1, 1]} - ] - }, - { - "type": "RACK", - "machines": [ - {"cpus": [1], "memories": [1, 1, 1, 1]}, {"cpus": [1], "memories": [1, 1, 1, 1]}, - {"cpus": [1], "memories": [1, 1, 1, 1]}, {"cpus": [1], "memories": [1, 1, 1, 1]}, - {"cpus": [1], "memories": [1, 1, 1, 1]}, {"cpus": [1], "memories": [1, 1, 1, 1]}, - {"cpus": [1], "memories": [1, 1, 1, 1]}, {"cpus": [1], "memories": [1, 1, 1, 1]}, - {"cpus": [1], "memories": [1, 1, 1, 1]}, {"cpus": [1], "memories": [1, 1, 1, 1]}, - {"cpus": [1], "memories": [1, 1, 1, 1]}, {"cpus": [1], "memories": [1, 1, 1, 1]}, - {"cpus": [1], "memories": [1, 1, 1, 1]}, {"cpus": [1], "memories": [1, 1, 1, 1]}, - {"cpus": [1], "memories": [1, 1, 1, 1]}, {"cpus": [1], "memories": [1, 1, 1, 1]} - ] - } - ] - } - ] -} diff --git a/opendc/opendc-experiments-sc20/src/main/resources/log4j2.xml b/opendc/opendc-experiments-sc20/src/main/resources/log4j2.xml index 77a15e55..f47a6da8 100644 --- a/opendc/opendc-experiments-sc20/src/main/resources/log4j2.xml +++ b/opendc/opendc-experiments-sc20/src/main/resources/log4j2.xml @@ -33,7 +33,10 @@ <Logger name="com.atlarge.odcsim" level="info" additivity="false"> <AppenderRef ref="Console"/> </Logger> - <Logger name="com.atlarge.opendc" level="info" additivity="false"> + <Logger name="com.atlarge.opendc" level="warn" additivity="false"> + <AppenderRef ref="Console"/> + </Logger> + <Logger name="com.atlarge.opendc.experiments.sc20" level="info" additivity="false"> <AppenderRef ref="Console"/> </Logger> <Logger name="org.apache.hadoop" level="warn" additivity="false"> diff --git a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt index 239d018a..abd5c961 100644 --- a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt +++ b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt @@ -31,10 +31,17 @@ import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.workload.VmWorkload import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy +import com.atlarge.opendc.experiments.sc20.experiment.attachMonitor +import com.atlarge.opendc.experiments.sc20.experiment.createFailureDomain +import com.atlarge.opendc.experiments.sc20.experiment.createProvisioner +import com.atlarge.opendc.experiments.sc20.experiment.model.Workload +import com.atlarge.opendc.experiments.sc20.experiment.monitor.ExperimentMonitor +import com.atlarge.opendc.experiments.sc20.experiment.processTrace +import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader +import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader import com.atlarge.opendc.format.environment.EnvironmentReader import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader import com.atlarge.opendc.format.trace.TraceReader -import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.launch @@ -63,7 +70,7 @@ class Sc20IntegrationTest { /** * The monitor used to keep track of the metrics. */ - private lateinit var monitor: TestSc20Reporter + private lateinit var monitor: TestExperimentReporter /** * Setup the experimental environment. @@ -73,7 +80,7 @@ class Sc20IntegrationTest { val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() simulationEngine = provider("test") root = simulationEngine.newDomain("root") - monitor = TestSc20Reporter() + monitor = TestExperimentReporter() } /** @@ -95,19 +102,33 @@ class Sc20IntegrationTest { lateinit var scheduler: SimpleVirtProvisioningService root.launch { - val res = createProvisioner(root, environmentReader, allocationPolicy) + val res = createProvisioner( + root, + environmentReader, + allocationPolicy + ) val bareMetalProvisioner = res.first scheduler = res.second val failureDomain = if (failures) { println("ENABLING failures") - createFailureDomain(seed, 24 * 7, bareMetalProvisioner, chan) + createFailureDomain( + seed, + 24.0 * 7, + bareMetalProvisioner, + chan + ) } else { null } attachMonitor(scheduler, monitor) - processTrace(traceReader, scheduler, chan, monitor) + processTrace( + traceReader, + scheduler, + chan, + monitor + ) println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") @@ -121,8 +142,8 @@ class Sc20IntegrationTest { assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") assertEquals(207379117949, monitor.totalRequestedBurst) - assertEquals(207378478631, monitor.totalGrantedBurst) - assertEquals(639360, monitor.totalOvercommissionedBurst) + assertEquals(207102919834, monitor.totalGrantedBurst) + assertEquals(276198896, monitor.totalOvercommissionedBurst) assertEquals(0, monitor.totalInterferedBurst) } @@ -137,10 +158,12 @@ class Sc20IntegrationTest { * Obtain the trace reader for the test. */ private fun createTestTraceReader(): TraceReader<VmWorkload> { - val performanceInterferenceStream = object {}.javaClass.getResourceAsStream("/env/performance-interference.json") - val performanceInterferenceModel = Sc20PerformanceInterferenceReader(performanceInterferenceStream) - .construct() - return createTraceReader(File("src/test/resources/trace"), performanceInterferenceModel, emptyList(), 0) + return Sc20ParquetTraceReader( + Sc20RawParquetTraceReader(File("src/test/resources/trace")), + emptyMap(), + Workload("test", 1.0), + 0 + ) } /** @@ -151,13 +174,13 @@ class Sc20IntegrationTest { return Sc20ClusterEnvironmentReader(stream) } - class TestSc20Reporter : Sc20Reporter { + class TestExperimentReporter : ExperimentMonitor { var totalRequestedBurst = 0L var totalGrantedBurst = 0L var totalOvercommissionedBurst = 0L var totalInterferedBurst = 0L - override suspend fun reportHostSlice( + override fun reportHostSlice( time: Long, requestedBurst: Long, grantedBurst: Long, @@ -167,10 +190,6 @@ class Sc20IntegrationTest { cpuDemand: Double, numberOfDeployedImages: Int, hostServer: Server, - submittedVms: Long, - queuedVms: Long, - runningVms: Long, - finishedVms: Long, duration: Long ) { totalRequestedBurst += requestedBurst diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/PerformanceInterferenceModelReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/PerformanceInterferenceModelReader.kt index a653e643..407bc0b4 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/PerformanceInterferenceModelReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/PerformanceInterferenceModelReader.kt @@ -26,6 +26,7 @@ package com.atlarge.opendc.format.trace import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel import java.io.Closeable +import kotlin.random.Random /** * An interface for reading descriptions of performance interference models into memory. @@ -34,5 +35,5 @@ interface PerformanceInterferenceModelReader : Closeable { /** * Construct a [PerformanceInterferenceModel]. */ - fun construct(): PerformanceInterferenceModel + fun construct(random: Random): Map<String, PerformanceInterferenceModel> } diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt index 5220af9b..2a8fefeb 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt @@ -125,7 +125,7 @@ class BitbrainsTraceReader( val relevantPerformanceInterferenceModelItems = PerformanceInterferenceModel( - performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId.toString()) }.toSet() + performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId.toString()) }.toSortedSet() ) val vmWorkload = VmWorkload( diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt index 8562cefe..0e8e1fd2 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt @@ -32,6 +32,7 @@ import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue import java.io.InputStream import java.util.TreeSet +import kotlin.random.Random /** * A parser for the JSON performance interference setup files used for the SC20 paper. @@ -42,20 +43,25 @@ import java.util.TreeSet class Sc20PerformanceInterferenceReader(input: InputStream, mapper: ObjectMapper = jacksonObjectMapper()) : PerformanceInterferenceModelReader { /** - * The environment that was read from the file. + * The computed value from the file. */ - private val performanceInterferenceModel: List<PerformanceInterferenceEntry> = mapper.readValue(input) + private val items: Map<String, TreeSet<PerformanceInterferenceModelItem>> - override fun construct(): PerformanceInterferenceModel { - return PerformanceInterferenceModel( - performanceInterferenceModel.map { item -> - PerformanceInterferenceModelItem( - TreeSet(item.vms), - item.minServerLoad, - item.performanceScore - ) - }.toSet() - ) + init { + val entries: List<PerformanceInterferenceEntry> = mapper.readValue(input) + val res = mutableMapOf<String, TreeSet<PerformanceInterferenceModelItem>>() + for (entry in entries) { + val item = PerformanceInterferenceModelItem(TreeSet(entry.vms), entry.minServerLoad, entry.performanceScore) + for (workload in entry.vms) { + res.computeIfAbsent(workload) { TreeSet() }.add(item) + } + } + + items = res + } + + override fun construct(random: Random): Map<String, PerformanceInterferenceModel> { + return items.mapValues { PerformanceInterferenceModel(it.value, Random(random.nextInt())) } } override fun close() {} diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt index c53cd569..076274d5 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt @@ -160,7 +160,7 @@ class Sc20TraceReader( val relevantPerformanceInterferenceModelItems = PerformanceInterferenceModel( - performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId) }.toSet(), + performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId) }.toSortedSet(), Random(random.nextInt()) ) val vmWorkload = VmWorkload( |
