diff options
13 files changed, 895 insertions, 211 deletions
diff --git a/opendc/opendc-experiments-sc20/build.gradle.kts b/opendc/opendc-experiments-sc20/build.gradle.kts index df291039..b7440792 100644 --- a/opendc/opendc-experiments-sc20/build.gradle.kts +++ b/opendc/opendc-experiments-sc20/build.gradle.kts @@ -46,6 +46,7 @@ dependencies { exclude(group = "org.slf4j", module = "slf4j-log4j12") exclude(group = "log4j") } + implementation("com.zaxxer:HikariCP:3.4.5") runtimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:2.13.1") runtimeOnly("org.postgresql:postgresql:42.2.12") runtimeOnly(project(":odcsim:odcsim-engine-omega")) diff --git a/opendc/opendc-experiments-sc20/schema.sql b/opendc/opendc-experiments-sc20/schema.sql index 677240fa..515348c6 100644 --- a/opendc/opendc-experiments-sc20/schema.sql +++ b/opendc/opendc-experiments-sc20/schema.sql @@ -1,13 +1,26 @@ --- A portfolio represents a collection of scenarios are tested. -DROP TABLE IF EXISTS portfolios; +-- An experiment represents a collection of portfolios. +DROP TABLE IF EXISTS experiments CASCADE; +CREATE TABLE experiments +( + id BIGSERIAL PRIMARY KEY NOT NULL, + creation_time TIMESTAMP NOT NULL DEFAULT (now()) +); + +-- A portfolio represents a collection of scenarios tested. +DROP TABLE IF EXISTS portfolios CASCADE; CREATE TABLE portfolios ( - id BIGSERIAL PRIMARY KEY NOT NULL, - name TEXT NOT NULL + id BIGSERIAL PRIMARY KEY NOT NULL, + experiment_id BIGINT NOT NULL, + name TEXT NOT NULL, + + FOREIGN KEY (experiment_id) REFERENCES experiments (id) + ON DELETE CASCADE + ON UPDATE CASCADE ); -- A scenario represents a single point in the design space (a unique combination of parameters) -DROP TABLE IF EXISTS scenarios; +DROP TABLE IF EXISTS scenarios CASCADE; CREATE TABLE scenarios ( id BIGSERIAL PRIMARY KEY NOT NULL, @@ -17,27 +30,30 @@ CREATE TABLE scenarios workload_name TEXT NOT NULL, workload_fraction DOUBLE PRECISION NOT NULL, allocation_policy TEXT NOT NULL, - failures BIT NOT NULL, - interference BIT NOT NULL, + failures BOOLEAN NOT NULL, + interference BOOLEAN NOT NULL, FOREIGN KEY (portfolio_id) REFERENCES portfolios (id) ON DELETE CASCADE ON UPDATE CASCADE + ); +DROP TYPE IF EXISTS run_state CASCADE; CREATE TYPE run_state AS ENUM ('wait', 'active', 'fail', 'ok'); -- An experiment run represent a single invocation of a trial and is used to distinguish between repetitions of the -- same set of parameters. -DROP TABLE IF EXISTS runs; +DROP TABLE IF EXISTS runs CASCADE; CREATE TABLE runs ( - id INTEGER NOT NULL, - scenario_id BIGINT NOT NULL, - seed INTEGER NOT NULL, - state run_state NOT NULL DEFAULT 'wait'::run_state, - start_time TIMESTAMP NOT NULL, - end_time TIMESTAMP, + id INTEGER NOT NULL, + scenario_id BIGINT NOT NULL, + seed INTEGER NOT NULL, + state run_state NOT NULL DEFAULT 'wait'::run_state, + submit_time TIMESTAMP NOT NULL DEFAULT (now()), + start_time TIMESTAMP DEFAULT (NULL), + end_time TIMESTAMP DEFAULT (NULL), PRIMARY KEY (scenario_id, id), FOREIGN KEY (scenario_id) REFERENCES scenarios (id) @@ -46,7 +62,7 @@ CREATE TABLE runs ); -- Metrics of the hypervisors reported per slice -DROP TABLE IF EXISTS host_metrics; +DROP TABLE IF EXISTS host_metrics CASCADE; CREATE TABLE host_metrics ( id BIGSERIAL PRIMARY KEY NOT NULL, @@ -70,10 +86,11 @@ CREATE TABLE host_metrics ON UPDATE CASCADE ); +DROP INDEX IF EXISTS host_metrics_idx; CREATE INDEX host_metrics_idx ON host_metrics (scenario_id, run_id, timestamp, host_id); -- Metrics of the VMs reported per slice -DROP TABLE IF EXISTS vm_metrics; +DROP TABLE IF EXISTS vm_metrics CASCADE; CREATE TABLE vm_metrics ( id BIGSERIAL PRIMARY KEY NOT NULL, @@ -96,27 +113,29 @@ CREATE TABLE vm_metrics ON UPDATE CASCADE ); +DROP INDEX IF EXISTS vm_metrics_idx; CREATE INDEX vm_metrics_idx ON vm_metrics (scenario_id, run_id, timestamp, vm_id); -- Metrics of the provisioner reported per change -DROP TABLE IF EXISTS provisioner_metrics; +DROP TABLE IF EXISTS provisioner_metrics CASCADE; CREATE TABLE provisioner_metrics ( - id BIGSERIAL PRIMARY KEY NOT NULL, - scenario_id BIGINT NOT NULL, - run_id INTEGER NOT NULL, - timestamp TIMESTAMP NOT NULL, - host_total_count INTEGER NOT NULL, - host_available_count INTEGER NOT NULL, - vm_total_count INTEGER NOT NULL, - vm_active_count INTEGER NOT NULL, - vm_inactive_count INTEGER NOT NULL, - vm_waiting_count INTEGER NOT NULL, - vm_failed_count INTEGER NOT NULL, + id BIGSERIAL PRIMARY KEY NOT NULL, + scenario_id BIGINT NOT NULL, + run_id INTEGER NOT NULL, + timestamp TIMESTAMP NOT NULL, + host_total_count INTEGER NOT NULL, + host_available_count INTEGER NOT NULL, + vm_total_count INTEGER NOT NULL, + vm_active_count INTEGER NOT NULL, + vm_inactive_count INTEGER NOT NULL, + vm_waiting_count INTEGER NOT NULL, + vm_failed_count INTEGER NOT NULL, FOREIGN KEY (scenario_id, run_id) REFERENCES runs (scenario_id, id) ON DELETE CASCADE ON UPDATE CASCADE ); +DROP INDEX IF EXISTS provisioner_metrics_idx; CREATE INDEX provisioner_metrics_idx ON provisioner_metrics (scenario_id, run_id, timestamp); diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt new file mode 100644 index 00000000..a9429367 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt @@ -0,0 +1,202 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20 + +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel +import com.atlarge.opendc.compute.core.workload.VmWorkload +import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporterProvider +import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader +import com.atlarge.opendc.experiments.sc20.util.DatabaseHelper +import com.atlarge.opendc.format.environment.EnvironmentReader +import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader +import com.atlarge.opendc.format.trace.TraceReader +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.asCoroutineDispatcher +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withContext +import mu.KotlinLogging +import java.io.Closeable +import java.io.File +import java.util.concurrent.Executors +import java.util.concurrent.atomic.AtomicInteger +import javax.sql.DataSource +import kotlin.random.Random +import kotlin.system.measureTimeMillis + +/** + * The logger for the experiment runner. + */ +private val logger = KotlinLogging.logger {} + +/** + * The experiment runner is responsible for orchestrating the simulation runs of an experiment. + * + * @param portfolios The portfolios to consider. + * @param ds The data source to write the experimental results to. + */ +public class ExperimentRunner( + private val portfolios: List<Portfolio>, + private val ds: DataSource, + private val reporterProvider: ExperimentReporterProvider, + private val environmentPath: File, + private val tracePath: File, + private val performanceInterferenceModel: PerformanceInterferenceModel +) : Closeable { + /** + * The database helper to write the execution plan. + */ + private val helper = DatabaseHelper(ds.connection) + + /** + * The experiment identifier. + */ + private var experimentId = -1L + + /** + * The mapping of portfolios to their ids. + */ + private val portfolioIds = mutableMapOf<Portfolio, Long>() + + /** + * The mapping of scenarios to their ids. + */ + private val scenarioIds = mutableMapOf<Scenario, Long>() + + /** + * Create an execution plan + */ + private fun createPlan(): List<Run> { + val runs = mutableListOf<Run>() + + for (portfolio in portfolios) { + val portfolioId = helper.persist(portfolio, experimentId) + portfolioIds[portfolio] = portfolioId + var scenarios = 0 + var runCount = 0 + + for (scenario in portfolio.scenarios) { + val scenarioId = helper.persist(scenario, portfolioId) + scenarioIds[scenario] = scenarioId + scenarios++ + + for (run in scenario.runs) { + helper.persist(run, scenarioId) + runCount++ + runs.add(run) + } + } + + logger.info { "Portfolio $portfolioId: ${portfolio.name} ($scenarios scenarios, $runCount runs total)" } + } + + return runs + } + + /** + * Create a trace reader for the specified trace. + */ + private fun createTraceReader( + name: String, + performanceInterferenceModel: PerformanceInterferenceModel, + seed: Int + ): TraceReader<VmWorkload> { + return Sc20ParquetTraceReader( + File(tracePath, name), + performanceInterferenceModel, + emptyList(), + Random(seed) + ) + } + + /** + * Create the environment reader for the specified environment. + */ + private fun createEnvironmentReader(name: String): EnvironmentReader { + return Sc20ClusterEnvironmentReader(File(environmentPath, "$name.txt")) + } + + /** + * Run the specified run. + */ + private fun run(run: Run) { + val reporter = reporterProvider.createReporter(ds, experimentId) + val traceReader = createTraceReader(run.scenario.workload.name, performanceInterferenceModel, run.seed) + val environmentReader = createEnvironmentReader(run.scenario.topology.name) + run.scenario(run, reporter, environmentReader, traceReader) + } + + /** + * Run the portfolios. + */ + @OptIn(ExperimentalStdlibApi::class) + public fun run() { + experimentId = helper.createExperiment() + logger.info { "Creating execution plan for experiment $experimentId" } + + val plan = createPlan() + val total = plan.size + val finished = AtomicInteger() + val dispatcher = Executors.newWorkStealingPool().asCoroutineDispatcher() + + runBlocking { + val mainDispatcher = coroutineContext[CoroutineDispatcher.Key]!! + for (run in plan) { + val scenarioId = scenarioIds[run.scenario]!! + launch(dispatcher) { + launch(mainDispatcher) { + helper.startRun(scenarioId, run.id) + } + + logger.info { "[${finished.get()}/$total] Starting run ($scenarioId, ${run.id})" } + + try { + + val duration = measureTimeMillis { + run(run) + } + + finished.incrementAndGet() + logger.info { "[${finished.get()}/$total] Finished run ($scenarioId, ${run.id}) in $duration milliseconds" } + + withContext(mainDispatcher) { + helper.finishRun(scenarioId, run.id, hasFailed = false) + } + } catch (e: Throwable) { + logger.error("A run has failed", e) + finished.incrementAndGet() + withContext(mainDispatcher) { + helper.finishRun(scenarioId, run.id, hasFailed = true) + } + } + } + } + } + } + + override fun close() { + helper.close() + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt index b2fbba39..80a91dec 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt @@ -24,52 +24,27 @@ package com.atlarge.opendc.experiments.sc20 -import com.atlarge.odcsim.SimulationEngineProvider -import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel -import com.atlarge.opendc.compute.core.workload.VmWorkload -import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy -import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy -import com.atlarge.opendc.compute.virt.service.allocation.NumberOfActiveServersAllocationPolicy -import com.atlarge.opendc.compute.virt.service.allocation.ProvisionedCoresAllocationPolicy -import com.atlarge.opendc.compute.virt.service.allocation.RandomAllocationPolicy -import com.atlarge.opendc.compute.virt.service.allocation.ReplayAllocationPolicy import com.atlarge.opendc.experiments.sc20.reporter.ExperimentParquetReporter import com.atlarge.opendc.experiments.sc20.reporter.ExperimentPostgresReporter import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporter -import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader -import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader -import com.atlarge.opendc.format.trace.TraceReader +import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporterProvider import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader import com.atlarge.opendc.format.trace.sc20.Sc20VmPlacementReader -import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper -import com.fasterxml.jackson.module.kotlin.readValue import com.github.ajalt.clikt.core.CliktCommand import com.github.ajalt.clikt.parameters.groups.OptionGroup -import com.github.ajalt.clikt.parameters.groups.default import com.github.ajalt.clikt.parameters.groups.groupChoice -import com.github.ajalt.clikt.parameters.groups.mutuallyExclusiveOptions import com.github.ajalt.clikt.parameters.groups.required import com.github.ajalt.clikt.parameters.options.convert import com.github.ajalt.clikt.parameters.options.default import com.github.ajalt.clikt.parameters.options.defaultLazy -import com.github.ajalt.clikt.parameters.options.flag import com.github.ajalt.clikt.parameters.options.option import com.github.ajalt.clikt.parameters.options.required -import com.github.ajalt.clikt.parameters.types.choice import com.github.ajalt.clikt.parameters.types.file -import com.github.ajalt.clikt.parameters.types.int -import com.github.ajalt.clikt.parameters.types.long -import kotlinx.coroutines.cancel -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking +import com.zaxxer.hikari.HikariDataSource import mu.KotlinLogging import java.io.File -import java.io.FileReader import java.io.InputStream -import java.sql.DriverManager -import java.util.ServiceLoader -import kotlin.random.Random +import javax.sql.DataSource /** * The logger for this experiment. @@ -80,10 +55,15 @@ private val logger = KotlinLogging.logger {} * Represents the command for running the experiment. */ class ExperimentCli : CliktCommand(name = "sc20-experiment") { - private val environment by option("--environment-file", help = "path to the environment file") - .file() + private val environmentPath by option("--environment-path", help = "path to the environment directory") + .file(canBeFile = false) + .required() + + private val tracePath by option("--trace-path", help = "path to the traces directory") + .file(canBeFile = false) .required() - private val performanceInterferenceStream by option("--performance-interference-file", help = "path to the performance interference file") + + private val performanceInterferenceStream by option("--performance-interference-model", help = "path to the performance interference file") .file() .convert { it.inputStream() as InputStream } .defaultLazy { ExperimentCli::class.java.getResourceAsStream("/env/performance-interference.json") } @@ -95,180 +75,53 @@ class ExperimentCli : CliktCommand(name = "sc20-experiment") { } .default(emptyMap()) - private val selectedVms by mutuallyExclusiveOptions( - option("--selected-vms", help = "the VMs to run").convert { parseVMs(it) }, - option("--selected-vms-file").file().convert { parseVMs(FileReader(it).readText()) } - ).default(emptyList()) - - private val seed by option(help = "the random seed") - .int() - .default(0) - private val failures by option("-x", "--failures", help = "enable (correlated) machine failures") - .flag() - private val failureInterval by option(help = "expected number of hours between failures") - .int() - .default(24 * 7) // one week - private val allocationPolicy by option(help = "name of VM allocation policy to use") - .choice( - "mem", "mem-inv", - "core-mem", "core-mem-inv", - "active-servers", "active-servers-inv", - "provisioned-cores", "provisioned-cores-inv", - "random", "replay" - ) - .default("core-mem") - - private val trace by option().groupChoice( - "sc20-parquet" to Trace.Sc20Parquet() - ).required() - private val reporter by option().groupChoice( "parquet" to Reporter.Parquet(), "postgres" to Reporter.Postgres() ).required() - private fun parseVMs(string: String): List<String> { - // Handle case where VM list contains a VM name with an (escaped) single-quote in it - val sanitizedString = string.replace("\\'", "\\\\[") - .replace("'", "\"") - .replace("\\\\[", "'") - val vms: List<String> = jacksonObjectMapper().readValue(sanitizedString) - return vms - } + private val jdbcUrl by option("--jdbc-url", help = "JDBC connection url").required() override fun run() { - logger.info("seed: $seed") - logger.info("failures: $failures") - logger.info("allocation-policy: $allocationPolicy") - - val start = System.currentTimeMillis() - val reporter: ExperimentReporter = reporter.createReporter() - - val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - val system = provider("test") - val root = system.newDomain("root") - - val chan = Channel<Unit>(Channel.CONFLATED) - val allocationPolicy = when (this.allocationPolicy) { - "mem" -> AvailableMemoryAllocationPolicy() - "mem-inv" -> AvailableMemoryAllocationPolicy(true) - "core-mem" -> AvailableCoreMemoryAllocationPolicy() - "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true) - "active-servers" -> NumberOfActiveServersAllocationPolicy() - "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true) - "provisioned-cores" -> ProvisionedCoresAllocationPolicy() - "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true) - "random" -> RandomAllocationPolicy(Random(seed)) - "replay" -> ReplayAllocationPolicy(vmPlacements) - else -> throw IllegalArgumentException("Unknown policy ${this.allocationPolicy}") - } - - val performanceInterferenceModel = try { - Sc20PerformanceInterferenceReader(performanceInterferenceStream).construct() - } catch (e: Throwable) { - reporter.close() - throw e - } - val environmentReader = Sc20ClusterEnvironmentReader(environment) - val traceReader = try { - trace.createTraceReader(performanceInterferenceModel, selectedVms, seed) - } catch (e: Throwable) { - reporter.close() - throw e - } - - root.launch { - val (bareMetalProvisioner, scheduler) = createProvisioner(root, environmentReader, allocationPolicy) - - val failureDomain = if (failures) { - logger.info("ENABLING failures") - createFailureDomain(seed, failureInterval, bareMetalProvisioner, chan) - } else { - null - } - - attachMonitor(scheduler, reporter) - processTrace(traceReader, scheduler, chan, reporter, vmPlacements) - - logger.debug("SUBMIT=${scheduler.submittedVms}") - logger.debug("FAIL=${scheduler.unscheduledVms}") - logger.debug("QUEUED=${scheduler.queuedVms}") - logger.debug("RUNNING=${scheduler.runningVms}") - logger.debug("FINISHED=${scheduler.finishedVms}") + val ds = HikariDataSource() + ds.jdbcUrl = jdbcUrl + ds.addDataSourceProperty("reWriteBatchedInserts", "true") + + val portfolios = listOf( + HorVerPortfolio // , + // MoreVelocityPortfolio, + // MoreHpcPortfolio, + // OperationalPhenomenaPortfolio + ) - failureDomain?.cancel() - scheduler.terminate() - logger.info("Simulation took ${System.currentTimeMillis() - start} milliseconds") - } + val performanceInterferenceModel = Sc20PerformanceInterferenceReader(performanceInterferenceStream) + .construct() - runBlocking { - system.run() - system.terminate() + try { + val runner = ExperimentRunner(portfolios, ds, reporter, environmentPath, tracePath, performanceInterferenceModel) + runner.run() + } finally { + ds.close() } - - // Explicitly close the monitor to flush its buffer - reporter.close() } } /** * An option for specifying the type of reporter to use. */ -internal sealed class Reporter(name: String) : OptionGroup(name) { - /** - * Create the [ExperimentReporter] for this option. - */ - abstract fun createReporter(): ExperimentReporter - +internal sealed class Reporter(name: String) : OptionGroup(name), ExperimentReporterProvider { class Parquet : Reporter("Options for reporting using Parquet") { - private val path by option("--parquet-path", help = "path to where the output should be stored") + private val path by option("--parquet-directory", help = "path to where the output should be stored") .file() - .defaultLazy { File("data/results-${System.currentTimeMillis()}.parquet") } + .defaultLazy { File("data") } - override fun createReporter(): ExperimentReporter = - ExperimentParquetReporter(path) + override fun createReporter(ds: DataSource, experimentId: Long): ExperimentReporter = + ExperimentParquetReporter(File(path, "results-${System.currentTimeMillis()}.parquet")) } class Postgres : Reporter("Options for reporting using PostgreSQL") { - private val url by option("--postgres-url", help = "JDBC connection url").required() - private val experimentId by option(help = "Experiment ID").long().required() - - override fun createReporter(): ExperimentReporter { - val conn = DriverManager.getConnection(url) - return ExperimentPostgresReporter(conn, experimentId) - } - } -} - -/** - * An option for specifying the type of trace to use. - */ -internal sealed class Trace(type: String) : OptionGroup(type) { - /** - * Create a [TraceReader] for this type of trace. - */ - abstract fun createTraceReader(performanceInterferenceModel: PerformanceInterferenceModel, vms: List<String>, seed: Int): TraceReader<VmWorkload> - - class Sc20Parquet : Trace("SC20 Parquet format") { - /** - * Path to trace directory. - */ - private val path by option("--trace-path", help = "path to the trace directory") - .file(canBeFile = false) - .required() - - override fun createTraceReader( - performanceInterferenceModel: PerformanceInterferenceModel, - vms: List<String>, - seed: Int - ): TraceReader<VmWorkload> { - return Sc20ParquetTraceReader( - path, - performanceInterferenceModel, - vms, - Random(seed) - ) - } + override fun createReporter(ds: DataSource, experimentId: Long): ExperimentReporter = + ExperimentPostgresReporter(ds.connection, experimentId) } } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolio.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolio.kt new file mode 100644 index 00000000..34505fce --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolio.kt @@ -0,0 +1,35 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20 + +/** + * A portfolio represents a collection of scenarios are tested. + */ +public abstract class Portfolio(val name: String) { + /** + * The scenarios of this portfolio consists of. + */ + abstract val scenarios: Sequence<Scenario> +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt new file mode 100644 index 00000000..0e612ae0 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt @@ -0,0 +1,163 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20 + +abstract class AbstractSc20Portfolio(name: String) : Portfolio(name) { + abstract val topologies: List<Topology> + abstract val workloads: List<Workload> + abstract val operationalPhenomena: List<Pair<Boolean, Boolean>> + abstract val allocationPolicies: List<String> + + open val repetitions = 2 + + override val scenarios: Sequence<Scenario> = sequence { + for (topology in topologies) { + for (workload in workloads) { + for ((hasFailures, hasInterference) in operationalPhenomena) { + for (allocationPolicy in allocationPolicies) { + yield( + Scenario( + this@AbstractSc20Portfolio, + repetitions, + topology, + workload, + allocationPolicy, + hasFailures, + hasInterference + ) + ) + } + } + } + } + } +} + +object HorVerPortfolio : AbstractSc20Portfolio("horizontal_vs_vertical") { + override val topologies = listOf( + Topology("base"), + Topology("rep-vol-hor-hom"), + Topology("rep-vol-hor-het"), + Topology("rep-vol-ver-hom"), + Topology("rep-vol-ver-het"), + Topology("exp-vol-hor-hom"), + Topology("exp-vol-hor-het"), + Topology("exp-vol-ver-hom"), + Topology("exp-vol-ver-het") + ) + + override val workloads = listOf( + // Workload("solvinity", 0.1), + // Workload("solvinity", 0.25), + Workload("small-parquet", 0.5), + Workload("small-parquet", 1.0) + ) + + override val operationalPhenomena = listOf( + true to true + ) + + override val allocationPolicies = listOf( + "active-servers" + ) +} + +object MoreVelocityPortfolio : AbstractSc20Portfolio("more_velocity") { + override val topologies = listOf( + Topology("base"), + Topology("rep-vel-ver-hom"), + Topology("rep-vel-ver-het"), + Topology("exp-vel-ver-hom"), + Topology("exp-vel-ver-het") + ) + + override val workloads = listOf( + // Workload("solvinity", 0.1), + // Workload("solvinity", 0.25), + Workload("solvinity", 0.5), + Workload("solvinity", 1.0) + ) + + override val operationalPhenomena = listOf( + true to true + ) + + override val allocationPolicies = listOf( + "active-servers" + ) +} + +object MoreHpcPortfolio : AbstractSc20Portfolio("more_velocity") { + override val topologies = listOf( + Topology("base"), + Topology("exp-vol-hor-hom"), + Topology("exp-vol-ver-hom"), + Topology("exp-vel-ver-hom") + ) + + override val workloads = listOf( + // Workload("solvinity", 0.1), + // Workload("solvinity", 0.25), + Workload("solvinity", 0.5), + Workload("solvinity", 1.0) + ) + + override val operationalPhenomena = listOf( + true to true + ) + + override val allocationPolicies = listOf( + "active-servers" + ) +} + +object OperationalPhenomenaPortfolio : AbstractSc20Portfolio("more_velocity") { + override val topologies = listOf( + Topology("base") + ) + + override val workloads = listOf( + // Workload("solvinity", 0.1), + // Workload("solvinity", 0.25), + Workload("solvinity", 1.0) + ) + + override val operationalPhenomena = listOf( + true to true, + false to true, + true to false, + true to true + ) + + override val allocationPolicies = listOf( + "mem", + "mem-inv", + "core-mem", + "core-mem-inv", + "active-servers", + "active-servers-inv", + "random" + ) +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Run.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Run.kt new file mode 100644 index 00000000..b2151b16 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Run.kt @@ -0,0 +1,31 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20 + +/** + * An experiment run represent a single invocation of a trial and is used to distinguish between repetitions of the + * same set of parameters. + */ +public data class Run(val scenario: Scenario, val id: Int, val seed: Int) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Scenario.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Scenario.kt new file mode 100644 index 00000000..66a8babf --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Scenario.kt @@ -0,0 +1,129 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20 + +import com.atlarge.odcsim.SimulationEngineProvider +import com.atlarge.opendc.compute.core.workload.VmWorkload +import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy +import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy +import com.atlarge.opendc.compute.virt.service.allocation.NumberOfActiveServersAllocationPolicy +import com.atlarge.opendc.compute.virt.service.allocation.ProvisionedCoresAllocationPolicy +import com.atlarge.opendc.compute.virt.service.allocation.RandomAllocationPolicy +import com.atlarge.opendc.compute.virt.service.allocation.ReplayAllocationPolicy +import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporter +import com.atlarge.opendc.format.environment.EnvironmentReader +import com.atlarge.opendc.format.trace.TraceReader +import kotlinx.coroutines.cancel +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import mu.KotlinLogging +import java.util.ServiceLoader +import kotlin.random.Random + +/** + * The logger for the experiment scenario. + */ +private val logger = KotlinLogging.logger {} + +/** + * The provider for the simulation engine to use. + */ +private val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() + +/** + * A scenario represents a single point in the design space (a unique combination of parameters). + */ +public class Scenario( + val portfolio: Portfolio, + val repetitions: Int, + val topology: Topology, + val workload: Workload, + val allocationPolicy: String, + val hasFailures: Boolean, + val hasInterference: Boolean, + val failureInterval: Int = 24 * 7 +) { + /** + * The runs this scenario consists of. + */ + public val runs: Sequence<Run> = sequence { + repeat(repetitions) { i -> + yield(Run(this@Scenario, i, i)) + } + } + + /** + * Perform a single run of this scenario. + */ + public operator fun invoke(run: Run, reporter: ExperimentReporter, environment: EnvironmentReader, trace: TraceReader<VmWorkload>) { + val system = provider("experiment-${run.id}") + val root = system.newDomain("root") + val seeder = Random(run.seed) + + val chan = Channel<Unit>(Channel.CONFLATED) + val allocationPolicy = when (this.allocationPolicy) { + "mem" -> AvailableMemoryAllocationPolicy() + "mem-inv" -> AvailableMemoryAllocationPolicy(true) + "core-mem" -> AvailableCoreMemoryAllocationPolicy() + "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true) + "active-servers" -> NumberOfActiveServersAllocationPolicy() + "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true) + "provisioned-cores" -> ProvisionedCoresAllocationPolicy() + "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true) + "random" -> RandomAllocationPolicy(Random(seeder.nextInt())) + "replay" -> ReplayAllocationPolicy(emptyMap()) + else -> throw IllegalArgumentException("Unknown policy ${this.allocationPolicy}") + } + + root.launch { + val (bareMetalProvisioner, scheduler) = createProvisioner(root, environment, allocationPolicy) + + val failureDomain = if (hasFailures) { + logger.debug("ENABLING failures") + createFailureDomain(seeder.nextInt(), failureInterval, bareMetalProvisioner, chan) + } else { + null + } + + attachMonitor(scheduler, reporter) + processTrace(trace, scheduler, chan, reporter, emptyMap()) + + logger.debug("SUBMIT=${scheduler.submittedVms}") + logger.debug("FAIL=${scheduler.unscheduledVms}") + logger.debug("QUEUED=${scheduler.queuedVms}") + logger.debug("RUNNING=${scheduler.runningVms}") + logger.debug("FINISHED=${scheduler.finishedVms}") + + failureDomain?.cancel() + scheduler.terminate() + } + + runBlocking { + system.run() + system.terminate() + } + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Topology.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Topology.kt new file mode 100644 index 00000000..d2be9599 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Topology.kt @@ -0,0 +1,30 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20 + +/** + * The datacenter topology on which we test the workload. + */ +public data class Topology(val name: String) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Workload.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Workload.kt new file mode 100644 index 00000000..4ab5ec8c --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Workload.kt @@ -0,0 +1,30 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20 + +/** + * A workload that is considered for a scenario. + */ +public class Workload(val name: String, val fraction: Double) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporterProvider.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporterProvider.kt new file mode 100644 index 00000000..d0dfd2e8 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporterProvider.kt @@ -0,0 +1,31 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.reporter + +import javax.sql.DataSource + +interface ExperimentReporterProvider { + public fun createReporter(ds: DataSource, experimentId: Long): ExperimentReporter +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt index 8a204ca3..8a204ca3 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/util/DatabaseHelper.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/util/DatabaseHelper.kt new file mode 100644 index 00000000..0292a2e3 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/util/DatabaseHelper.kt @@ -0,0 +1,160 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.util + +import com.atlarge.opendc.experiments.sc20.Portfolio +import com.atlarge.opendc.experiments.sc20.Run +import com.atlarge.opendc.experiments.sc20.Scenario +import java.io.Closeable +import java.sql.Connection +import java.sql.Statement + +/** + * A helper class for writing to the database. + */ +class DatabaseHelper(val conn: Connection) : Closeable { + /** + * Prepared statement to create experiment. + */ + private val createExperiment = conn.prepareStatement("INSERT INTO experiments DEFAULT VALUES", arrayOf("id")) + + /** + * Prepared statement for creating a portfolio. + */ + private val createPortfolio = conn.prepareStatement("INSERT INTO portfolios (experiment_id, name) VALUES (?, ?)", arrayOf("id")) + + /** + * Prepared statement for creating a scenario + */ + private val createScenario = conn.prepareStatement("INSERT INTO scenarios (portfolio_id, repetitions, topology, workload_name, workload_fraction, allocation_policy, failures, interference) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", arrayOf("id")) + + /** + * Prepared statement for creating a run. + */ + private val createRun = conn.prepareStatement("INSERT INTO runs (id, scenario_id, seed) VALUES (?, ?, ?)", arrayOf("id")) + + /** + * Prepared statement for starting a run. + */ + private val startRun = conn.prepareStatement("UPDATE runs SET state = 'active'::run_state,start_time = now() WHERE id = ? AND scenario_id = ?") + + /** + * Prepared statement for finishing a run. + */ + private val finishRun = conn.prepareStatement("UPDATE runs SET state = ?::run_state,end_time = now() WHERE id = ? AND scenario_id = ?") + + /** + * Create a new experiment and return its id. + */ + fun createExperiment(): Long { + val affectedRows = createExperiment.executeUpdate() + check(affectedRows != 0) + return createExperiment.latestId + } + + /** + * Persist a [Portfolio] and return its id. + */ + fun persist(portfolio: Portfolio, experimentId: Long): Long { + createPortfolio.setLong(1, experimentId) + createPortfolio.setString(2, portfolio.name) + + val affectedRows = createPortfolio.executeUpdate() + check(affectedRows != 0) + return createPortfolio.latestId + } + + /** + * Persist a [Scenario] and return its id. + */ + fun persist(scenario: Scenario, portfolioId: Long): Long { + createScenario.setLong(1, portfolioId) + createScenario.setInt(2, scenario.repetitions) + createScenario.setString(3, scenario.topology.name) + createScenario.setString(4, scenario.workload.name) + createScenario.setDouble(5, scenario.workload.fraction) + createScenario.setString(6, scenario.allocationPolicy) + createScenario.setBoolean(7, scenario.hasFailures) + createScenario.setBoolean(8, scenario.hasInterference) + + val affectedRows = createScenario.executeUpdate() + check(affectedRows != 0) + return createScenario.latestId + } + + /** + * Persist a [Run] and return its id. + */ + fun persist(run: Run, scenarioId: Long): Int { + createRun.setInt(1, run.id) + createRun.setLong(2, scenarioId) + createRun.setInt(3, run.seed) + + val affectedRows = createRun.executeUpdate() + check(affectedRows != 0) + return createRun.latestId.toInt() + } + + /** + * Start run. + */ + fun startRun(scenario: Long, run: Int) { + startRun.setInt(1, run) + startRun.setLong(2, scenario) + + val affectedRows = startRun.executeUpdate() + check(affectedRows != 0) + } + + /** + * Finish a run. + */ + fun finishRun(scenario: Long, run: Int, hasFailed: Boolean) { + finishRun.setString(1, if (hasFailed) "fail" else "ok") + finishRun.setInt(2, run) + finishRun.setLong(3, scenario) + + val affectedRows = finishRun.executeUpdate() + check(affectedRows != 0) + } + + /** + * Obtain the latest identifier of the specified statement. + */ + private val Statement.latestId: Long + get() { + val rs = generatedKeys + return try { + check(rs.next()) + rs.getLong(1) + } finally { + rs.close() + } + } + + override fun close() { + conn.close() + } +} |
