From 924179b45b4e7e1ac848fd852fe39d927ca0d85a Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 13 May 2020 01:28:18 +0200 Subject: feat: Add experiment orchestrator in Kotlin --- opendc/opendc-experiments-sc20/build.gradle.kts | 1 + opendc/opendc-experiments-sc20/schema.sql | 75 +++-- .../opendc/experiments/sc20/ExperimentRunner.kt | 202 ++++++++++++++ .../opendc/experiments/sc20/ExperimentRunnerCli.kt | 219 +++------------ .../atlarge/opendc/experiments/sc20/Portfolio.kt | 35 +++ .../atlarge/opendc/experiments/sc20/Portfolios.kt | 163 +++++++++++ .../com/atlarge/opendc/experiments/sc20/Run.kt | 31 +++ .../atlarge/opendc/experiments/sc20/Scenario.kt | 129 +++++++++ .../atlarge/opendc/experiments/sc20/Topology.kt | 30 ++ .../atlarge/opendc/experiments/sc20/Workload.kt | 30 ++ .../sc20/reporter/ExperimentReporterProvider.kt | 31 +++ .../sc20/trace/Sc20ParquetTraceReader.kt | 301 --------------------- .../sc20/trace/Sc20StreamingParquetTraceReader.kt | 301 +++++++++++++++++++++ .../opendc/experiments/sc20/util/DatabaseHelper.kt | 160 +++++++++++ 14 files changed, 1196 insertions(+), 512 deletions(-) create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolio.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Run.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Scenario.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Topology.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Workload.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporterProvider.kt delete mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/util/DatabaseHelper.kt diff --git a/opendc/opendc-experiments-sc20/build.gradle.kts b/opendc/opendc-experiments-sc20/build.gradle.kts index df291039..b7440792 100644 --- a/opendc/opendc-experiments-sc20/build.gradle.kts +++ b/opendc/opendc-experiments-sc20/build.gradle.kts @@ -46,6 +46,7 @@ dependencies { exclude(group = "org.slf4j", module = "slf4j-log4j12") exclude(group = "log4j") } + implementation("com.zaxxer:HikariCP:3.4.5") runtimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:2.13.1") runtimeOnly("org.postgresql:postgresql:42.2.12") runtimeOnly(project(":odcsim:odcsim-engine-omega")) diff --git a/opendc/opendc-experiments-sc20/schema.sql b/opendc/opendc-experiments-sc20/schema.sql index 677240fa..515348c6 100644 --- a/opendc/opendc-experiments-sc20/schema.sql +++ b/opendc/opendc-experiments-sc20/schema.sql @@ -1,13 +1,26 @@ --- A portfolio represents a collection of scenarios are tested. -DROP TABLE IF EXISTS portfolios; +-- An experiment represents a collection of portfolios. +DROP TABLE IF EXISTS experiments CASCADE; +CREATE TABLE experiments +( + id BIGSERIAL PRIMARY KEY NOT NULL, + creation_time TIMESTAMP NOT NULL DEFAULT (now()) +); + +-- A portfolio represents a collection of scenarios tested. +DROP TABLE IF EXISTS portfolios CASCADE; CREATE TABLE portfolios ( - id BIGSERIAL PRIMARY KEY NOT NULL, - name TEXT NOT NULL + id BIGSERIAL PRIMARY KEY NOT NULL, + experiment_id BIGINT NOT NULL, + name TEXT NOT NULL, + + FOREIGN KEY (experiment_id) REFERENCES experiments (id) + ON DELETE CASCADE + ON UPDATE CASCADE ); -- A scenario represents a single point in the design space (a unique combination of parameters) -DROP TABLE IF EXISTS scenarios; +DROP TABLE IF EXISTS scenarios CASCADE; CREATE TABLE scenarios ( id BIGSERIAL PRIMARY KEY NOT NULL, @@ -17,27 +30,30 @@ CREATE TABLE scenarios workload_name TEXT NOT NULL, workload_fraction DOUBLE PRECISION NOT NULL, allocation_policy TEXT NOT NULL, - failures BIT NOT NULL, - interference BIT NOT NULL, + failures BOOLEAN NOT NULL, + interference BOOLEAN NOT NULL, FOREIGN KEY (portfolio_id) REFERENCES portfolios (id) ON DELETE CASCADE ON UPDATE CASCADE + ); +DROP TYPE IF EXISTS run_state CASCADE; CREATE TYPE run_state AS ENUM ('wait', 'active', 'fail', 'ok'); -- An experiment run represent a single invocation of a trial and is used to distinguish between repetitions of the -- same set of parameters. -DROP TABLE IF EXISTS runs; +DROP TABLE IF EXISTS runs CASCADE; CREATE TABLE runs ( - id INTEGER NOT NULL, - scenario_id BIGINT NOT NULL, - seed INTEGER NOT NULL, - state run_state NOT NULL DEFAULT 'wait'::run_state, - start_time TIMESTAMP NOT NULL, - end_time TIMESTAMP, + id INTEGER NOT NULL, + scenario_id BIGINT NOT NULL, + seed INTEGER NOT NULL, + state run_state NOT NULL DEFAULT 'wait'::run_state, + submit_time TIMESTAMP NOT NULL DEFAULT (now()), + start_time TIMESTAMP DEFAULT (NULL), + end_time TIMESTAMP DEFAULT (NULL), PRIMARY KEY (scenario_id, id), FOREIGN KEY (scenario_id) REFERENCES scenarios (id) @@ -46,7 +62,7 @@ CREATE TABLE runs ); -- Metrics of the hypervisors reported per slice -DROP TABLE IF EXISTS host_metrics; +DROP TABLE IF EXISTS host_metrics CASCADE; CREATE TABLE host_metrics ( id BIGSERIAL PRIMARY KEY NOT NULL, @@ -70,10 +86,11 @@ CREATE TABLE host_metrics ON UPDATE CASCADE ); +DROP INDEX IF EXISTS host_metrics_idx; CREATE INDEX host_metrics_idx ON host_metrics (scenario_id, run_id, timestamp, host_id); -- Metrics of the VMs reported per slice -DROP TABLE IF EXISTS vm_metrics; +DROP TABLE IF EXISTS vm_metrics CASCADE; CREATE TABLE vm_metrics ( id BIGSERIAL PRIMARY KEY NOT NULL, @@ -96,27 +113,29 @@ CREATE TABLE vm_metrics ON UPDATE CASCADE ); +DROP INDEX IF EXISTS vm_metrics_idx; CREATE INDEX vm_metrics_idx ON vm_metrics (scenario_id, run_id, timestamp, vm_id); -- Metrics of the provisioner reported per change -DROP TABLE IF EXISTS provisioner_metrics; +DROP TABLE IF EXISTS provisioner_metrics CASCADE; CREATE TABLE provisioner_metrics ( - id BIGSERIAL PRIMARY KEY NOT NULL, - scenario_id BIGINT NOT NULL, - run_id INTEGER NOT NULL, - timestamp TIMESTAMP NOT NULL, - host_total_count INTEGER NOT NULL, - host_available_count INTEGER NOT NULL, - vm_total_count INTEGER NOT NULL, - vm_active_count INTEGER NOT NULL, - vm_inactive_count INTEGER NOT NULL, - vm_waiting_count INTEGER NOT NULL, - vm_failed_count INTEGER NOT NULL, + id BIGSERIAL PRIMARY KEY NOT NULL, + scenario_id BIGINT NOT NULL, + run_id INTEGER NOT NULL, + timestamp TIMESTAMP NOT NULL, + host_total_count INTEGER NOT NULL, + host_available_count INTEGER NOT NULL, + vm_total_count INTEGER NOT NULL, + vm_active_count INTEGER NOT NULL, + vm_inactive_count INTEGER NOT NULL, + vm_waiting_count INTEGER NOT NULL, + vm_failed_count INTEGER NOT NULL, FOREIGN KEY (scenario_id, run_id) REFERENCES runs (scenario_id, id) ON DELETE CASCADE ON UPDATE CASCADE ); +DROP INDEX IF EXISTS provisioner_metrics_idx; CREATE INDEX provisioner_metrics_idx ON provisioner_metrics (scenario_id, run_id, timestamp); diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt new file mode 100644 index 00000000..a9429367 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt @@ -0,0 +1,202 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20 + +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel +import com.atlarge.opendc.compute.core.workload.VmWorkload +import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporterProvider +import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader +import com.atlarge.opendc.experiments.sc20.util.DatabaseHelper +import com.atlarge.opendc.format.environment.EnvironmentReader +import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader +import com.atlarge.opendc.format.trace.TraceReader +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.asCoroutineDispatcher +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withContext +import mu.KotlinLogging +import java.io.Closeable +import java.io.File +import java.util.concurrent.Executors +import java.util.concurrent.atomic.AtomicInteger +import javax.sql.DataSource +import kotlin.random.Random +import kotlin.system.measureTimeMillis + +/** + * The logger for the experiment runner. + */ +private val logger = KotlinLogging.logger {} + +/** + * The experiment runner is responsible for orchestrating the simulation runs of an experiment. + * + * @param portfolios The portfolios to consider. + * @param ds The data source to write the experimental results to. + */ +public class ExperimentRunner( + private val portfolios: List, + private val ds: DataSource, + private val reporterProvider: ExperimentReporterProvider, + private val environmentPath: File, + private val tracePath: File, + private val performanceInterferenceModel: PerformanceInterferenceModel +) : Closeable { + /** + * The database helper to write the execution plan. + */ + private val helper = DatabaseHelper(ds.connection) + + /** + * The experiment identifier. + */ + private var experimentId = -1L + + /** + * The mapping of portfolios to their ids. + */ + private val portfolioIds = mutableMapOf() + + /** + * The mapping of scenarios to their ids. + */ + private val scenarioIds = mutableMapOf() + + /** + * Create an execution plan + */ + private fun createPlan(): List { + val runs = mutableListOf() + + for (portfolio in portfolios) { + val portfolioId = helper.persist(portfolio, experimentId) + portfolioIds[portfolio] = portfolioId + var scenarios = 0 + var runCount = 0 + + for (scenario in portfolio.scenarios) { + val scenarioId = helper.persist(scenario, portfolioId) + scenarioIds[scenario] = scenarioId + scenarios++ + + for (run in scenario.runs) { + helper.persist(run, scenarioId) + runCount++ + runs.add(run) + } + } + + logger.info { "Portfolio $portfolioId: ${portfolio.name} ($scenarios scenarios, $runCount runs total)" } + } + + return runs + } + + /** + * Create a trace reader for the specified trace. + */ + private fun createTraceReader( + name: String, + performanceInterferenceModel: PerformanceInterferenceModel, + seed: Int + ): TraceReader { + return Sc20ParquetTraceReader( + File(tracePath, name), + performanceInterferenceModel, + emptyList(), + Random(seed) + ) + } + + /** + * Create the environment reader for the specified environment. + */ + private fun createEnvironmentReader(name: String): EnvironmentReader { + return Sc20ClusterEnvironmentReader(File(environmentPath, "$name.txt")) + } + + /** + * Run the specified run. + */ + private fun run(run: Run) { + val reporter = reporterProvider.createReporter(ds, experimentId) + val traceReader = createTraceReader(run.scenario.workload.name, performanceInterferenceModel, run.seed) + val environmentReader = createEnvironmentReader(run.scenario.topology.name) + run.scenario(run, reporter, environmentReader, traceReader) + } + + /** + * Run the portfolios. + */ + @OptIn(ExperimentalStdlibApi::class) + public fun run() { + experimentId = helper.createExperiment() + logger.info { "Creating execution plan for experiment $experimentId" } + + val plan = createPlan() + val total = plan.size + val finished = AtomicInteger() + val dispatcher = Executors.newWorkStealingPool().asCoroutineDispatcher() + + runBlocking { + val mainDispatcher = coroutineContext[CoroutineDispatcher.Key]!! + for (run in plan) { + val scenarioId = scenarioIds[run.scenario]!! + launch(dispatcher) { + launch(mainDispatcher) { + helper.startRun(scenarioId, run.id) + } + + logger.info { "[${finished.get()}/$total] Starting run ($scenarioId, ${run.id})" } + + try { + + val duration = measureTimeMillis { + run(run) + } + + finished.incrementAndGet() + logger.info { "[${finished.get()}/$total] Finished run ($scenarioId, ${run.id}) in $duration milliseconds" } + + withContext(mainDispatcher) { + helper.finishRun(scenarioId, run.id, hasFailed = false) + } + } catch (e: Throwable) { + logger.error("A run has failed", e) + finished.incrementAndGet() + withContext(mainDispatcher) { + helper.finishRun(scenarioId, run.id, hasFailed = true) + } + } + } + } + } + } + + override fun close() { + helper.close() + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt index b2fbba39..80a91dec 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt @@ -24,52 +24,27 @@ package com.atlarge.opendc.experiments.sc20 -import com.atlarge.odcsim.SimulationEngineProvider -import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel -import com.atlarge.opendc.compute.core.workload.VmWorkload -import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy -import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy -import com.atlarge.opendc.compute.virt.service.allocation.NumberOfActiveServersAllocationPolicy -import com.atlarge.opendc.compute.virt.service.allocation.ProvisionedCoresAllocationPolicy -import com.atlarge.opendc.compute.virt.service.allocation.RandomAllocationPolicy -import com.atlarge.opendc.compute.virt.service.allocation.ReplayAllocationPolicy import com.atlarge.opendc.experiments.sc20.reporter.ExperimentParquetReporter import com.atlarge.opendc.experiments.sc20.reporter.ExperimentPostgresReporter import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporter -import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader -import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader -import com.atlarge.opendc.format.trace.TraceReader +import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporterProvider import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader import com.atlarge.opendc.format.trace.sc20.Sc20VmPlacementReader -import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper -import com.fasterxml.jackson.module.kotlin.readValue import com.github.ajalt.clikt.core.CliktCommand import com.github.ajalt.clikt.parameters.groups.OptionGroup -import com.github.ajalt.clikt.parameters.groups.default import com.github.ajalt.clikt.parameters.groups.groupChoice -import com.github.ajalt.clikt.parameters.groups.mutuallyExclusiveOptions import com.github.ajalt.clikt.parameters.groups.required import com.github.ajalt.clikt.parameters.options.convert import com.github.ajalt.clikt.parameters.options.default import com.github.ajalt.clikt.parameters.options.defaultLazy -import com.github.ajalt.clikt.parameters.options.flag import com.github.ajalt.clikt.parameters.options.option import com.github.ajalt.clikt.parameters.options.required -import com.github.ajalt.clikt.parameters.types.choice import com.github.ajalt.clikt.parameters.types.file -import com.github.ajalt.clikt.parameters.types.int -import com.github.ajalt.clikt.parameters.types.long -import kotlinx.coroutines.cancel -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking +import com.zaxxer.hikari.HikariDataSource import mu.KotlinLogging import java.io.File -import java.io.FileReader import java.io.InputStream -import java.sql.DriverManager -import java.util.ServiceLoader -import kotlin.random.Random +import javax.sql.DataSource /** * The logger for this experiment. @@ -80,10 +55,15 @@ private val logger = KotlinLogging.logger {} * Represents the command for running the experiment. */ class ExperimentCli : CliktCommand(name = "sc20-experiment") { - private val environment by option("--environment-file", help = "path to the environment file") - .file() + private val environmentPath by option("--environment-path", help = "path to the environment directory") + .file(canBeFile = false) + .required() + + private val tracePath by option("--trace-path", help = "path to the traces directory") + .file(canBeFile = false) .required() - private val performanceInterferenceStream by option("--performance-interference-file", help = "path to the performance interference file") + + private val performanceInterferenceStream by option("--performance-interference-model", help = "path to the performance interference file") .file() .convert { it.inputStream() as InputStream } .defaultLazy { ExperimentCli::class.java.getResourceAsStream("/env/performance-interference.json") } @@ -95,180 +75,53 @@ class ExperimentCli : CliktCommand(name = "sc20-experiment") { } .default(emptyMap()) - private val selectedVms by mutuallyExclusiveOptions( - option("--selected-vms", help = "the VMs to run").convert { parseVMs(it) }, - option("--selected-vms-file").file().convert { parseVMs(FileReader(it).readText()) } - ).default(emptyList()) - - private val seed by option(help = "the random seed") - .int() - .default(0) - private val failures by option("-x", "--failures", help = "enable (correlated) machine failures") - .flag() - private val failureInterval by option(help = "expected number of hours between failures") - .int() - .default(24 * 7) // one week - private val allocationPolicy by option(help = "name of VM allocation policy to use") - .choice( - "mem", "mem-inv", - "core-mem", "core-mem-inv", - "active-servers", "active-servers-inv", - "provisioned-cores", "provisioned-cores-inv", - "random", "replay" - ) - .default("core-mem") - - private val trace by option().groupChoice( - "sc20-parquet" to Trace.Sc20Parquet() - ).required() - private val reporter by option().groupChoice( "parquet" to Reporter.Parquet(), "postgres" to Reporter.Postgres() ).required() - private fun parseVMs(string: String): List { - // Handle case where VM list contains a VM name with an (escaped) single-quote in it - val sanitizedString = string.replace("\\'", "\\\\[") - .replace("'", "\"") - .replace("\\\\[", "'") - val vms: List = jacksonObjectMapper().readValue(sanitizedString) - return vms - } + private val jdbcUrl by option("--jdbc-url", help = "JDBC connection url").required() override fun run() { - logger.info("seed: $seed") - logger.info("failures: $failures") - logger.info("allocation-policy: $allocationPolicy") - - val start = System.currentTimeMillis() - val reporter: ExperimentReporter = reporter.createReporter() - - val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - val system = provider("test") - val root = system.newDomain("root") - - val chan = Channel(Channel.CONFLATED) - val allocationPolicy = when (this.allocationPolicy) { - "mem" -> AvailableMemoryAllocationPolicy() - "mem-inv" -> AvailableMemoryAllocationPolicy(true) - "core-mem" -> AvailableCoreMemoryAllocationPolicy() - "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true) - "active-servers" -> NumberOfActiveServersAllocationPolicy() - "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true) - "provisioned-cores" -> ProvisionedCoresAllocationPolicy() - "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true) - "random" -> RandomAllocationPolicy(Random(seed)) - "replay" -> ReplayAllocationPolicy(vmPlacements) - else -> throw IllegalArgumentException("Unknown policy ${this.allocationPolicy}") - } - - val performanceInterferenceModel = try { - Sc20PerformanceInterferenceReader(performanceInterferenceStream).construct() - } catch (e: Throwable) { - reporter.close() - throw e - } - val environmentReader = Sc20ClusterEnvironmentReader(environment) - val traceReader = try { - trace.createTraceReader(performanceInterferenceModel, selectedVms, seed) - } catch (e: Throwable) { - reporter.close() - throw e - } - - root.launch { - val (bareMetalProvisioner, scheduler) = createProvisioner(root, environmentReader, allocationPolicy) - - val failureDomain = if (failures) { - logger.info("ENABLING failures") - createFailureDomain(seed, failureInterval, bareMetalProvisioner, chan) - } else { - null - } - - attachMonitor(scheduler, reporter) - processTrace(traceReader, scheduler, chan, reporter, vmPlacements) - - logger.debug("SUBMIT=${scheduler.submittedVms}") - logger.debug("FAIL=${scheduler.unscheduledVms}") - logger.debug("QUEUED=${scheduler.queuedVms}") - logger.debug("RUNNING=${scheduler.runningVms}") - logger.debug("FINISHED=${scheduler.finishedVms}") + val ds = HikariDataSource() + ds.jdbcUrl = jdbcUrl + ds.addDataSourceProperty("reWriteBatchedInserts", "true") + + val portfolios = listOf( + HorVerPortfolio // , + // MoreVelocityPortfolio, + // MoreHpcPortfolio, + // OperationalPhenomenaPortfolio + ) - failureDomain?.cancel() - scheduler.terminate() - logger.info("Simulation took ${System.currentTimeMillis() - start} milliseconds") - } + val performanceInterferenceModel = Sc20PerformanceInterferenceReader(performanceInterferenceStream) + .construct() - runBlocking { - system.run() - system.terminate() + try { + val runner = ExperimentRunner(portfolios, ds, reporter, environmentPath, tracePath, performanceInterferenceModel) + runner.run() + } finally { + ds.close() } - - // Explicitly close the monitor to flush its buffer - reporter.close() } } /** * An option for specifying the type of reporter to use. */ -internal sealed class Reporter(name: String) : OptionGroup(name) { - /** - * Create the [ExperimentReporter] for this option. - */ - abstract fun createReporter(): ExperimentReporter - +internal sealed class Reporter(name: String) : OptionGroup(name), ExperimentReporterProvider { class Parquet : Reporter("Options for reporting using Parquet") { - private val path by option("--parquet-path", help = "path to where the output should be stored") + private val path by option("--parquet-directory", help = "path to where the output should be stored") .file() - .defaultLazy { File("data/results-${System.currentTimeMillis()}.parquet") } + .defaultLazy { File("data") } - override fun createReporter(): ExperimentReporter = - ExperimentParquetReporter(path) + override fun createReporter(ds: DataSource, experimentId: Long): ExperimentReporter = + ExperimentParquetReporter(File(path, "results-${System.currentTimeMillis()}.parquet")) } class Postgres : Reporter("Options for reporting using PostgreSQL") { - private val url by option("--postgres-url", help = "JDBC connection url").required() - private val experimentId by option(help = "Experiment ID").long().required() - - override fun createReporter(): ExperimentReporter { - val conn = DriverManager.getConnection(url) - return ExperimentPostgresReporter(conn, experimentId) - } - } -} - -/** - * An option for specifying the type of trace to use. - */ -internal sealed class Trace(type: String) : OptionGroup(type) { - /** - * Create a [TraceReader] for this type of trace. - */ - abstract fun createTraceReader(performanceInterferenceModel: PerformanceInterferenceModel, vms: List, seed: Int): TraceReader - - class Sc20Parquet : Trace("SC20 Parquet format") { - /** - * Path to trace directory. - */ - private val path by option("--trace-path", help = "path to the trace directory") - .file(canBeFile = false) - .required() - - override fun createTraceReader( - performanceInterferenceModel: PerformanceInterferenceModel, - vms: List, - seed: Int - ): TraceReader { - return Sc20ParquetTraceReader( - path, - performanceInterferenceModel, - vms, - Random(seed) - ) - } + override fun createReporter(ds: DataSource, experimentId: Long): ExperimentReporter = + ExperimentPostgresReporter(ds.connection, experimentId) } } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolio.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolio.kt new file mode 100644 index 00000000..34505fce --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolio.kt @@ -0,0 +1,35 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20 + +/** + * A portfolio represents a collection of scenarios are tested. + */ +public abstract class Portfolio(val name: String) { + /** + * The scenarios of this portfolio consists of. + */ + abstract val scenarios: Sequence +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt new file mode 100644 index 00000000..0e612ae0 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt @@ -0,0 +1,163 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20 + +abstract class AbstractSc20Portfolio(name: String) : Portfolio(name) { + abstract val topologies: List + abstract val workloads: List + abstract val operationalPhenomena: List> + abstract val allocationPolicies: List + + open val repetitions = 2 + + override val scenarios: Sequence = sequence { + for (topology in topologies) { + for (workload in workloads) { + for ((hasFailures, hasInterference) in operationalPhenomena) { + for (allocationPolicy in allocationPolicies) { + yield( + Scenario( + this@AbstractSc20Portfolio, + repetitions, + topology, + workload, + allocationPolicy, + hasFailures, + hasInterference + ) + ) + } + } + } + } + } +} + +object HorVerPortfolio : AbstractSc20Portfolio("horizontal_vs_vertical") { + override val topologies = listOf( + Topology("base"), + Topology("rep-vol-hor-hom"), + Topology("rep-vol-hor-het"), + Topology("rep-vol-ver-hom"), + Topology("rep-vol-ver-het"), + Topology("exp-vol-hor-hom"), + Topology("exp-vol-hor-het"), + Topology("exp-vol-ver-hom"), + Topology("exp-vol-ver-het") + ) + + override val workloads = listOf( + // Workload("solvinity", 0.1), + // Workload("solvinity", 0.25), + Workload("small-parquet", 0.5), + Workload("small-parquet", 1.0) + ) + + override val operationalPhenomena = listOf( + true to true + ) + + override val allocationPolicies = listOf( + "active-servers" + ) +} + +object MoreVelocityPortfolio : AbstractSc20Portfolio("more_velocity") { + override val topologies = listOf( + Topology("base"), + Topology("rep-vel-ver-hom"), + Topology("rep-vel-ver-het"), + Topology("exp-vel-ver-hom"), + Topology("exp-vel-ver-het") + ) + + override val workloads = listOf( + // Workload("solvinity", 0.1), + // Workload("solvinity", 0.25), + Workload("solvinity", 0.5), + Workload("solvinity", 1.0) + ) + + override val operationalPhenomena = listOf( + true to true + ) + + override val allocationPolicies = listOf( + "active-servers" + ) +} + +object MoreHpcPortfolio : AbstractSc20Portfolio("more_velocity") { + override val topologies = listOf( + Topology("base"), + Topology("exp-vol-hor-hom"), + Topology("exp-vol-ver-hom"), + Topology("exp-vel-ver-hom") + ) + + override val workloads = listOf( + // Workload("solvinity", 0.1), + // Workload("solvinity", 0.25), + Workload("solvinity", 0.5), + Workload("solvinity", 1.0) + ) + + override val operationalPhenomena = listOf( + true to true + ) + + override val allocationPolicies = listOf( + "active-servers" + ) +} + +object OperationalPhenomenaPortfolio : AbstractSc20Portfolio("more_velocity") { + override val topologies = listOf( + Topology("base") + ) + + override val workloads = listOf( + // Workload("solvinity", 0.1), + // Workload("solvinity", 0.25), + Workload("solvinity", 1.0) + ) + + override val operationalPhenomena = listOf( + true to true, + false to true, + true to false, + true to true + ) + + override val allocationPolicies = listOf( + "mem", + "mem-inv", + "core-mem", + "core-mem-inv", + "active-servers", + "active-servers-inv", + "random" + ) +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Run.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Run.kt new file mode 100644 index 00000000..b2151b16 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Run.kt @@ -0,0 +1,31 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20 + +/** + * An experiment run represent a single invocation of a trial and is used to distinguish between repetitions of the + * same set of parameters. + */ +public data class Run(val scenario: Scenario, val id: Int, val seed: Int) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Scenario.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Scenario.kt new file mode 100644 index 00000000..66a8babf --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Scenario.kt @@ -0,0 +1,129 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20 + +import com.atlarge.odcsim.SimulationEngineProvider +import com.atlarge.opendc.compute.core.workload.VmWorkload +import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy +import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy +import com.atlarge.opendc.compute.virt.service.allocation.NumberOfActiveServersAllocationPolicy +import com.atlarge.opendc.compute.virt.service.allocation.ProvisionedCoresAllocationPolicy +import com.atlarge.opendc.compute.virt.service.allocation.RandomAllocationPolicy +import com.atlarge.opendc.compute.virt.service.allocation.ReplayAllocationPolicy +import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporter +import com.atlarge.opendc.format.environment.EnvironmentReader +import com.atlarge.opendc.format.trace.TraceReader +import kotlinx.coroutines.cancel +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import mu.KotlinLogging +import java.util.ServiceLoader +import kotlin.random.Random + +/** + * The logger for the experiment scenario. + */ +private val logger = KotlinLogging.logger {} + +/** + * The provider for the simulation engine to use. + */ +private val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() + +/** + * A scenario represents a single point in the design space (a unique combination of parameters). + */ +public class Scenario( + val portfolio: Portfolio, + val repetitions: Int, + val topology: Topology, + val workload: Workload, + val allocationPolicy: String, + val hasFailures: Boolean, + val hasInterference: Boolean, + val failureInterval: Int = 24 * 7 +) { + /** + * The runs this scenario consists of. + */ + public val runs: Sequence = sequence { + repeat(repetitions) { i -> + yield(Run(this@Scenario, i, i)) + } + } + + /** + * Perform a single run of this scenario. + */ + public operator fun invoke(run: Run, reporter: ExperimentReporter, environment: EnvironmentReader, trace: TraceReader) { + val system = provider("experiment-${run.id}") + val root = system.newDomain("root") + val seeder = Random(run.seed) + + val chan = Channel(Channel.CONFLATED) + val allocationPolicy = when (this.allocationPolicy) { + "mem" -> AvailableMemoryAllocationPolicy() + "mem-inv" -> AvailableMemoryAllocationPolicy(true) + "core-mem" -> AvailableCoreMemoryAllocationPolicy() + "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true) + "active-servers" -> NumberOfActiveServersAllocationPolicy() + "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true) + "provisioned-cores" -> ProvisionedCoresAllocationPolicy() + "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true) + "random" -> RandomAllocationPolicy(Random(seeder.nextInt())) + "replay" -> ReplayAllocationPolicy(emptyMap()) + else -> throw IllegalArgumentException("Unknown policy ${this.allocationPolicy}") + } + + root.launch { + val (bareMetalProvisioner, scheduler) = createProvisioner(root, environment, allocationPolicy) + + val failureDomain = if (hasFailures) { + logger.debug("ENABLING failures") + createFailureDomain(seeder.nextInt(), failureInterval, bareMetalProvisioner, chan) + } else { + null + } + + attachMonitor(scheduler, reporter) + processTrace(trace, scheduler, chan, reporter, emptyMap()) + + logger.debug("SUBMIT=${scheduler.submittedVms}") + logger.debug("FAIL=${scheduler.unscheduledVms}") + logger.debug("QUEUED=${scheduler.queuedVms}") + logger.debug("RUNNING=${scheduler.runningVms}") + logger.debug("FINISHED=${scheduler.finishedVms}") + + failureDomain?.cancel() + scheduler.terminate() + } + + runBlocking { + system.run() + system.terminate() + } + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Topology.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Topology.kt new file mode 100644 index 00000000..d2be9599 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Topology.kt @@ -0,0 +1,30 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20 + +/** + * The datacenter topology on which we test the workload. + */ +public data class Topology(val name: String) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Workload.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Workload.kt new file mode 100644 index 00000000..4ab5ec8c --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Workload.kt @@ -0,0 +1,30 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20 + +/** + * A workload that is considered for a scenario. + */ +public class Workload(val name: String, val fraction: Double) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporterProvider.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporterProvider.kt new file mode 100644 index 00000000..d0dfd2e8 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporterProvider.kt @@ -0,0 +1,31 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.reporter + +import javax.sql.DataSource + +interface ExperimentReporterProvider { + public fun createReporter(ds: DataSource, experimentId: Long): ExperimentReporter +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt deleted file mode 100644 index 8a204ca3..00000000 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt +++ /dev/null @@ -1,301 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.experiments.sc20.trace - -import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment -import com.atlarge.opendc.compute.core.image.VmImage -import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL -import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel -import com.atlarge.opendc.compute.core.workload.VmWorkload -import com.atlarge.opendc.core.User -import com.atlarge.opendc.format.trace.TraceEntry -import com.atlarge.opendc.format.trace.TraceReader -import mu.KotlinLogging -import org.apache.avro.generic.GenericData -import org.apache.hadoop.fs.Path -import org.apache.parquet.avro.AvroParquetReader -import org.apache.parquet.filter2.compat.FilterCompat -import org.apache.parquet.filter2.predicate.FilterApi -import org.apache.parquet.filter2.predicate.Statistics -import org.apache.parquet.filter2.predicate.UserDefinedPredicate -import org.apache.parquet.io.api.Binary -import java.io.File -import java.io.Serializable -import java.util.SortedSet -import java.util.TreeSet -import java.util.UUID -import java.util.concurrent.ArrayBlockingQueue -import kotlin.concurrent.thread -import kotlin.random.Random - -private val logger = KotlinLogging.logger {} - -/** - * A [TraceReader] for the internal VM workload trace format. - * - * @param traceFile The directory of the traces. - * @param performanceInterferenceModel The performance model covering the workload in the VM trace. - */ -@OptIn(ExperimentalStdlibApi::class) -class Sc20ParquetTraceReader( - traceFile: File, - performanceInterferenceModel: PerformanceInterferenceModel, - selectedVms: List, - random: Random -) : TraceReader { - /** - * The internal iterator to use for this reader. - */ - private val iterator: Iterator> - - /** - * The intermediate buffer to store the read records in. - */ - private val queue = ArrayBlockingQueue>(1024) - - /** - * An optional filter for filtering the selected VMs - */ - private val filter = - if (selectedVms.isEmpty()) - null - else - FilterCompat.get(FilterApi.userDefined(FilterApi.binaryColumn("id"), - SelectedVmFilter( - TreeSet(selectedVms) - ) - )) - - /** - * A poisonous fragment. - */ - private val poison = Pair("\u0000", FlopsHistoryFragment(0, 0, 0, 0.0, 0)) - - /** - * The thread to read the records in. - */ - private val readerThread = thread(start = true, name = "sc20-reader") { - val reader = AvroParquetReader.builder(Path(traceFile.absolutePath, "trace.parquet")) - .disableCompatibility() - .run { if (filter != null) withFilter(filter) else this } - .build() - - try { - while (true) { - val record = reader.read() - - if (record == null) { - queue.put(poison) - break - } - - val id = record["id"].toString() - val tick = record["time"] as Long - val duration = record["duration"] as Long - val cores = record["cores"] as Int - val cpuUsage = record["cpuUsage"] as Double - val flops = record["flops"] as Long - - val fragment = FlopsHistoryFragment( - tick, - flops, - duration, - cpuUsage, - cores - ) - - queue.put(id to fragment) - } - } catch (e: InterruptedException) { - // Do not rethrow this - } finally { - reader.close() - } - } - - /** - * Fill the buffers with the VMs - */ - private fun pull(buffers: Map>>) { - if (!hasNext) { - return - } - - val fragments = mutableListOf>() - queue.drainTo(fragments) - - for ((id, fragment) in fragments) { - if (id == poison.first) { - hasNext = false - return - } - buffers[id]?.forEach { it.add(fragment) } - } - } - - /** - * A flag to indicate whether the reader has more entries. - */ - private var hasNext: Boolean = true - - /** - * Initialize the reader. - */ - init { - val takenIds = mutableSetOf() - val entries = mutableMapOf() - val buffers = mutableMapOf>>() - - val metaReader = AvroParquetReader.builder(Path(traceFile.absolutePath, "meta.parquet")) - .disableCompatibility() - .run { if (filter != null) withFilter(filter) else this } - .build() - - while (true) { - val record = metaReader.read() ?: break - val id = record["id"].toString() - entries[id] = record - } - - metaReader.close() - - val selection = if (selectedVms.isEmpty()) entries.keys else selectedVms - - // Create the entry iterator - iterator = selection.asSequence() - .mapNotNull { entries[it] } - .mapIndexed { index, record -> - val id = record["id"].toString() - val submissionTime = record["submissionTime"] as Long - val endTime = record["endTime"] as Long - val maxCores = record["maxCores"] as Int - val requiredMemory = record["requiredMemory"] as Long - val uid = UUID.nameUUIDFromBytes("$id-$index".toByteArray()) - - assert(uid !in takenIds) - takenIds += uid - - logger.info("Processing VM $id") - - val internalBuffer = mutableListOf() - val externalBuffer = mutableListOf() - buffers.getOrPut(id) { mutableListOf() }.add(externalBuffer) - val fragments = sequence { - repeat@while (true) { - if (externalBuffer.isEmpty()) { - if (hasNext) { - pull(buffers) - continue - } else { - break - } - } - - internalBuffer.addAll(externalBuffer) - externalBuffer.clear() - - for (fragment in internalBuffer) { - yield(fragment) - - if (fragment.tick >= endTime) { - break@repeat - } - } - - internalBuffer.clear() - } - - buffers.remove(id) - } - val relevantPerformanceInterferenceModelItems = - PerformanceInterferenceModel( - performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSet(), - Random(random.nextInt()) - ) - val vmWorkload = VmWorkload( - uid, "VM Workload $id", - UnnamedUser, - VmImage( - uid, - id, - mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), - fragments, - maxCores, - requiredMemory - ) - ) - - TraceEntryImpl( - submissionTime, - vmWorkload - ) - } - .sortedBy { it.submissionTime } - .toList() - .iterator() - } - - override fun hasNext(): Boolean = iterator.hasNext() - - override fun next(): TraceEntry = iterator.next() - - override fun close() { - readerThread.interrupt() - } - - private class SelectedVmFilter(val selectedVms: SortedSet) : UserDefinedPredicate(), Serializable { - override fun keep(value: Binary?): Boolean = value != null && selectedVms.contains(value.toStringUsingUTF8()) - - override fun canDrop(statistics: Statistics): Boolean { - val min = statistics.min - val max = statistics.max - - return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isEmpty() - } - - override fun inverseCanDrop(statistics: Statistics): Boolean { - val min = statistics.min - val max = statistics.max - - return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isNotEmpty() - } - } - - /** - * An unnamed user. - */ - private object UnnamedUser : User { - override val name: String = "" - override val uid: UUID = UUID.randomUUID() - } - - /** - * An entry in the trace. - */ - private data class TraceEntryImpl( - override var submissionTime: Long, - override val workload: VmWorkload - ) : TraceEntry -} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt new file mode 100644 index 00000000..8a204ca3 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt @@ -0,0 +1,301 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.trace + +import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment +import com.atlarge.opendc.compute.core.image.VmImage +import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel +import com.atlarge.opendc.compute.core.workload.VmWorkload +import com.atlarge.opendc.core.User +import com.atlarge.opendc.format.trace.TraceEntry +import com.atlarge.opendc.format.trace.TraceReader +import mu.KotlinLogging +import org.apache.avro.generic.GenericData +import org.apache.hadoop.fs.Path +import org.apache.parquet.avro.AvroParquetReader +import org.apache.parquet.filter2.compat.FilterCompat +import org.apache.parquet.filter2.predicate.FilterApi +import org.apache.parquet.filter2.predicate.Statistics +import org.apache.parquet.filter2.predicate.UserDefinedPredicate +import org.apache.parquet.io.api.Binary +import java.io.File +import java.io.Serializable +import java.util.SortedSet +import java.util.TreeSet +import java.util.UUID +import java.util.concurrent.ArrayBlockingQueue +import kotlin.concurrent.thread +import kotlin.random.Random + +private val logger = KotlinLogging.logger {} + +/** + * A [TraceReader] for the internal VM workload trace format. + * + * @param traceFile The directory of the traces. + * @param performanceInterferenceModel The performance model covering the workload in the VM trace. + */ +@OptIn(ExperimentalStdlibApi::class) +class Sc20ParquetTraceReader( + traceFile: File, + performanceInterferenceModel: PerformanceInterferenceModel, + selectedVms: List, + random: Random +) : TraceReader { + /** + * The internal iterator to use for this reader. + */ + private val iterator: Iterator> + + /** + * The intermediate buffer to store the read records in. + */ + private val queue = ArrayBlockingQueue>(1024) + + /** + * An optional filter for filtering the selected VMs + */ + private val filter = + if (selectedVms.isEmpty()) + null + else + FilterCompat.get(FilterApi.userDefined(FilterApi.binaryColumn("id"), + SelectedVmFilter( + TreeSet(selectedVms) + ) + )) + + /** + * A poisonous fragment. + */ + private val poison = Pair("\u0000", FlopsHistoryFragment(0, 0, 0, 0.0, 0)) + + /** + * The thread to read the records in. + */ + private val readerThread = thread(start = true, name = "sc20-reader") { + val reader = AvroParquetReader.builder(Path(traceFile.absolutePath, "trace.parquet")) + .disableCompatibility() + .run { if (filter != null) withFilter(filter) else this } + .build() + + try { + while (true) { + val record = reader.read() + + if (record == null) { + queue.put(poison) + break + } + + val id = record["id"].toString() + val tick = record["time"] as Long + val duration = record["duration"] as Long + val cores = record["cores"] as Int + val cpuUsage = record["cpuUsage"] as Double + val flops = record["flops"] as Long + + val fragment = FlopsHistoryFragment( + tick, + flops, + duration, + cpuUsage, + cores + ) + + queue.put(id to fragment) + } + } catch (e: InterruptedException) { + // Do not rethrow this + } finally { + reader.close() + } + } + + /** + * Fill the buffers with the VMs + */ + private fun pull(buffers: Map>>) { + if (!hasNext) { + return + } + + val fragments = mutableListOf>() + queue.drainTo(fragments) + + for ((id, fragment) in fragments) { + if (id == poison.first) { + hasNext = false + return + } + buffers[id]?.forEach { it.add(fragment) } + } + } + + /** + * A flag to indicate whether the reader has more entries. + */ + private var hasNext: Boolean = true + + /** + * Initialize the reader. + */ + init { + val takenIds = mutableSetOf() + val entries = mutableMapOf() + val buffers = mutableMapOf>>() + + val metaReader = AvroParquetReader.builder(Path(traceFile.absolutePath, "meta.parquet")) + .disableCompatibility() + .run { if (filter != null) withFilter(filter) else this } + .build() + + while (true) { + val record = metaReader.read() ?: break + val id = record["id"].toString() + entries[id] = record + } + + metaReader.close() + + val selection = if (selectedVms.isEmpty()) entries.keys else selectedVms + + // Create the entry iterator + iterator = selection.asSequence() + .mapNotNull { entries[it] } + .mapIndexed { index, record -> + val id = record["id"].toString() + val submissionTime = record["submissionTime"] as Long + val endTime = record["endTime"] as Long + val maxCores = record["maxCores"] as Int + val requiredMemory = record["requiredMemory"] as Long + val uid = UUID.nameUUIDFromBytes("$id-$index".toByteArray()) + + assert(uid !in takenIds) + takenIds += uid + + logger.info("Processing VM $id") + + val internalBuffer = mutableListOf() + val externalBuffer = mutableListOf() + buffers.getOrPut(id) { mutableListOf() }.add(externalBuffer) + val fragments = sequence { + repeat@while (true) { + if (externalBuffer.isEmpty()) { + if (hasNext) { + pull(buffers) + continue + } else { + break + } + } + + internalBuffer.addAll(externalBuffer) + externalBuffer.clear() + + for (fragment in internalBuffer) { + yield(fragment) + + if (fragment.tick >= endTime) { + break@repeat + } + } + + internalBuffer.clear() + } + + buffers.remove(id) + } + val relevantPerformanceInterferenceModelItems = + PerformanceInterferenceModel( + performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSet(), + Random(random.nextInt()) + ) + val vmWorkload = VmWorkload( + uid, "VM Workload $id", + UnnamedUser, + VmImage( + uid, + id, + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), + fragments, + maxCores, + requiredMemory + ) + ) + + TraceEntryImpl( + submissionTime, + vmWorkload + ) + } + .sortedBy { it.submissionTime } + .toList() + .iterator() + } + + override fun hasNext(): Boolean = iterator.hasNext() + + override fun next(): TraceEntry = iterator.next() + + override fun close() { + readerThread.interrupt() + } + + private class SelectedVmFilter(val selectedVms: SortedSet) : UserDefinedPredicate(), Serializable { + override fun keep(value: Binary?): Boolean = value != null && selectedVms.contains(value.toStringUsingUTF8()) + + override fun canDrop(statistics: Statistics): Boolean { + val min = statistics.min + val max = statistics.max + + return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isEmpty() + } + + override fun inverseCanDrop(statistics: Statistics): Boolean { + val min = statistics.min + val max = statistics.max + + return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isNotEmpty() + } + } + + /** + * An unnamed user. + */ + private object UnnamedUser : User { + override val name: String = "" + override val uid: UUID = UUID.randomUUID() + } + + /** + * An entry in the trace. + */ + private data class TraceEntryImpl( + override var submissionTime: Long, + override val workload: VmWorkload + ) : TraceEntry +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/util/DatabaseHelper.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/util/DatabaseHelper.kt new file mode 100644 index 00000000..0292a2e3 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/util/DatabaseHelper.kt @@ -0,0 +1,160 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.util + +import com.atlarge.opendc.experiments.sc20.Portfolio +import com.atlarge.opendc.experiments.sc20.Run +import com.atlarge.opendc.experiments.sc20.Scenario +import java.io.Closeable +import java.sql.Connection +import java.sql.Statement + +/** + * A helper class for writing to the database. + */ +class DatabaseHelper(val conn: Connection) : Closeable { + /** + * Prepared statement to create experiment. + */ + private val createExperiment = conn.prepareStatement("INSERT INTO experiments DEFAULT VALUES", arrayOf("id")) + + /** + * Prepared statement for creating a portfolio. + */ + private val createPortfolio = conn.prepareStatement("INSERT INTO portfolios (experiment_id, name) VALUES (?, ?)", arrayOf("id")) + + /** + * Prepared statement for creating a scenario + */ + private val createScenario = conn.prepareStatement("INSERT INTO scenarios (portfolio_id, repetitions, topology, workload_name, workload_fraction, allocation_policy, failures, interference) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", arrayOf("id")) + + /** + * Prepared statement for creating a run. + */ + private val createRun = conn.prepareStatement("INSERT INTO runs (id, scenario_id, seed) VALUES (?, ?, ?)", arrayOf("id")) + + /** + * Prepared statement for starting a run. + */ + private val startRun = conn.prepareStatement("UPDATE runs SET state = 'active'::run_state,start_time = now() WHERE id = ? AND scenario_id = ?") + + /** + * Prepared statement for finishing a run. + */ + private val finishRun = conn.prepareStatement("UPDATE runs SET state = ?::run_state,end_time = now() WHERE id = ? AND scenario_id = ?") + + /** + * Create a new experiment and return its id. + */ + fun createExperiment(): Long { + val affectedRows = createExperiment.executeUpdate() + check(affectedRows != 0) + return createExperiment.latestId + } + + /** + * Persist a [Portfolio] and return its id. + */ + fun persist(portfolio: Portfolio, experimentId: Long): Long { + createPortfolio.setLong(1, experimentId) + createPortfolio.setString(2, portfolio.name) + + val affectedRows = createPortfolio.executeUpdate() + check(affectedRows != 0) + return createPortfolio.latestId + } + + /** + * Persist a [Scenario] and return its id. + */ + fun persist(scenario: Scenario, portfolioId: Long): Long { + createScenario.setLong(1, portfolioId) + createScenario.setInt(2, scenario.repetitions) + createScenario.setString(3, scenario.topology.name) + createScenario.setString(4, scenario.workload.name) + createScenario.setDouble(5, scenario.workload.fraction) + createScenario.setString(6, scenario.allocationPolicy) + createScenario.setBoolean(7, scenario.hasFailures) + createScenario.setBoolean(8, scenario.hasInterference) + + val affectedRows = createScenario.executeUpdate() + check(affectedRows != 0) + return createScenario.latestId + } + + /** + * Persist a [Run] and return its id. + */ + fun persist(run: Run, scenarioId: Long): Int { + createRun.setInt(1, run.id) + createRun.setLong(2, scenarioId) + createRun.setInt(3, run.seed) + + val affectedRows = createRun.executeUpdate() + check(affectedRows != 0) + return createRun.latestId.toInt() + } + + /** + * Start run. + */ + fun startRun(scenario: Long, run: Int) { + startRun.setInt(1, run) + startRun.setLong(2, scenario) + + val affectedRows = startRun.executeUpdate() + check(affectedRows != 0) + } + + /** + * Finish a run. + */ + fun finishRun(scenario: Long, run: Int, hasFailed: Boolean) { + finishRun.setString(1, if (hasFailed) "fail" else "ok") + finishRun.setInt(2, run) + finishRun.setLong(3, scenario) + + val affectedRows = finishRun.executeUpdate() + check(affectedRows != 0) + } + + /** + * Obtain the latest identifier of the specified statement. + */ + private val Statement.latestId: Long + get() { + val rs = generatedKeys + return try { + check(rs.next()) + rs.getLong(1) + } finally { + rs.close() + } + } + + override fun close() { + conn.close() + } +} -- cgit v1.2.3