diff options
Diffstat (limited to 'opendc')
45 files changed, 1331 insertions, 1320 deletions
diff --git a/opendc/opendc-experiments-sc20/build.gradle.kts b/opendc/opendc-experiments-sc20/build.gradle.kts index 2ba07554..46d99564 100644 --- a/opendc/opendc-experiments-sc20/build.gradle.kts +++ b/opendc/opendc-experiments-sc20/build.gradle.kts @@ -31,27 +31,26 @@ plugins { } application { - mainClassName = "com.atlarge.opendc.experiments.sc20.ExperimentRunnerCliKt" - applicationDefaultJvmArgs = listOf("-Xmx2500M", "-Xms1800M") + mainClassName = "com.atlarge.opendc.experiments.sc20.MainKt" + applicationDefaultJvmArgs = listOf("-Xms2500M") } dependencies { api(project(":opendc:opendc-core")) implementation(project(":opendc:opendc-format")) implementation(kotlin("stdlib")) + implementation("com.github.ajalt:clikt:2.6.0") + implementation("me.tongfei:progressbar:0.8.1") implementation("io.github.microutils:kotlin-logging:1.7.9") + implementation("org.apache.parquet:parquet-avro:1.11.0") implementation("org.apache.hadoop:hadoop-client:3.2.1") { exclude(group = "org.slf4j", module = "slf4j-log4j12") exclude(group = "log4j") } - implementation("com.zaxxer:HikariCP:3.4.5") - implementation("de.bytefish.pgbulkinsert:pgbulkinsert-core:5.1.0") - implementation("de.bytefish.pgbulkinsert:pgbulkinsert-rowwriter:5.1.0") - implementation("me.tongfei:progressbar:0.8.1") + runtimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:2.13.1") - runtimeOnly("org.postgresql:postgresql:42.2.12") runtimeOnly(project(":odcsim:odcsim-engine-omega")) testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}") diff --git a/opendc/opendc-experiments-sc20/schema.sql b/opendc/opendc-experiments-sc20/schema.sql deleted file mode 100644 index 92cb5d1f..00000000 --- a/opendc/opendc-experiments-sc20/schema.sql +++ /dev/null @@ -1,141 +0,0 @@ --- An experiment represents a collection of portfolios. -DROP TABLE IF EXISTS experiments CASCADE; -CREATE TABLE experiments -( - id BIGSERIAL PRIMARY KEY NOT NULL, - creation_time TIMESTAMP NOT NULL DEFAULT (now()) -); - --- A portfolio represents a collection of scenarios tested. -DROP TABLE IF EXISTS portfolios CASCADE; -CREATE TABLE portfolios -( - id BIGSERIAL PRIMARY KEY NOT NULL, - experiment_id BIGINT NOT NULL, - name TEXT NOT NULL, - - FOREIGN KEY (experiment_id) REFERENCES experiments (id) - ON DELETE CASCADE - ON UPDATE CASCADE -); - --- A scenario represents a single point in the design space (a unique combination of parameters) -DROP TABLE IF EXISTS scenarios CASCADE; -CREATE TABLE scenarios -( - id BIGSERIAL PRIMARY KEY NOT NULL, - portfolio_id BIGINT NOT NULL, - repetitions INTEGER NOT NULL, - topology TEXT NOT NULL, - workload_name TEXT NOT NULL, - workload_fraction DOUBLE PRECISION NOT NULL, - allocation_policy TEXT NOT NULL, - failure_frequency DOUBLE PRECISION NOT NULL, - interference BOOLEAN NOT NULL, - - FOREIGN KEY (portfolio_id) REFERENCES portfolios (id) - ON DELETE CASCADE - ON UPDATE CASCADE - -); - -DROP TYPE IF EXISTS run_state CASCADE; -CREATE TYPE run_state AS ENUM ('wait', 'active', 'fail', 'ok'); - --- An experiment run represent a single invocation of a trial and is used to distinguish between repetitions of the --- same set of parameters. -DROP TABLE IF EXISTS runs CASCADE; -CREATE TABLE runs -( - id INTEGER NOT NULL, - scenario_id BIGINT NOT NULL, - seed INTEGER NOT NULL, - state run_state NOT NULL DEFAULT 'wait'::run_state, - submit_time TIMESTAMP NOT NULL DEFAULT (now()), - start_time TIMESTAMP DEFAULT (NULL), - end_time TIMESTAMP DEFAULT (NULL), - - PRIMARY KEY (scenario_id, id), - FOREIGN KEY (scenario_id) REFERENCES scenarios (id) - ON DELETE CASCADE - ON UPDATE CASCADE -); - --- Metrics of the hypervisors reported per slice -DROP TABLE IF EXISTS host_metrics CASCADE; -CREATE TABLE host_metrics -( - id BIGSERIAL PRIMARY KEY NOT NULL, - scenario_id BIGINT NOT NULL, - run_id INTEGER NOT NULL, - host_id TEXT NOT NULL, - state TEXT NOT NULL, - timestamp TIMESTAMP NOT NULL, - duration BIGINT NOT NULL, - vm_count INTEGER NOT NULL, - requested_burst BIGINT NOT NULL, - granted_burst BIGINT NOT NULL, - overcommissioned_burst BIGINT NOT NULL, - interfered_burst BIGINT NOT NULL, - cpu_usage DOUBLE PRECISION NOT NULL, - cpu_demand DOUBLE PRECISION NOT NULL, - power_draw DOUBLE PRECISION NOT NULL, - - FOREIGN KEY (scenario_id, run_id) REFERENCES runs (scenario_id, id) - ON DELETE CASCADE - ON UPDATE CASCADE -); - -DROP INDEX IF EXISTS host_metrics_idx; -CREATE INDEX host_metrics_idx ON host_metrics (scenario_id, run_id, timestamp, host_id); - --- Metrics of the VMs reported per slice -DROP TABLE IF EXISTS vm_metrics CASCADE; -CREATE TABLE vm_metrics -( - id BIGSERIAL PRIMARY KEY NOT NULL, - scenario_id BIGINT NOT NULL, - run_id INTEGER NOT NULL, - vm_id TEXT NOT NULL, - host_id TEXT NOT NULL, - state TEXT NOT NULL, - timestamp TIMESTAMP NOT NULL, - duration BIGINT NOT NULL, - requested_burst BIGINT NOT NULL, - granted_burst BIGINT NOT NULL, - overcommissioned_burst BIGINT NOT NULL, - interfered_burst BIGINT NOT NULL, - cpu_usage DOUBLE PRECISION NOT NULL, - cpu_demand DOUBLE PRECISION NOT NULL, - - FOREIGN KEY (scenario_id, run_id) REFERENCES runs (scenario_id, id) - ON DELETE CASCADE - ON UPDATE CASCADE -); - -DROP INDEX IF EXISTS vm_metrics_idx; -CREATE INDEX vm_metrics_idx ON vm_metrics (scenario_id, run_id, timestamp, vm_id); - --- Metrics of the provisioner reported per change -DROP TABLE IF EXISTS provisioner_metrics CASCADE; -CREATE TABLE provisioner_metrics -( - id BIGSERIAL PRIMARY KEY NOT NULL, - scenario_id BIGINT NOT NULL, - run_id INTEGER NOT NULL, - timestamp TIMESTAMP NOT NULL, - host_total_count INTEGER NOT NULL, - host_available_count INTEGER NOT NULL, - vm_total_count INTEGER NOT NULL, - vm_active_count INTEGER NOT NULL, - vm_inactive_count INTEGER NOT NULL, - vm_waiting_count INTEGER NOT NULL, - vm_failed_count INTEGER NOT NULL, - - FOREIGN KEY (scenario_id, run_id) REFERENCES runs (scenario_id, id) - ON DELETE CASCADE - ON UPDATE CASCADE -); - -DROP INDEX IF EXISTS provisioner_metrics_idx; -CREATE INDEX provisioner_metrics_idx ON provisioner_metrics (scenario_id, run_id, timestamp); diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt deleted file mode 100644 index e6fd504e..00000000 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt +++ /dev/null @@ -1,216 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.experiments.sc20 - -import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel -import com.atlarge.opendc.compute.core.workload.VmWorkload -import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporterProvider -import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader -import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader -import com.atlarge.opendc.experiments.sc20.util.DatabaseHelper -import com.atlarge.opendc.format.environment.EnvironmentReader -import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader -import com.atlarge.opendc.format.trace.TraceReader -import me.tongfei.progressbar.ProgressBar -import mu.KotlinLogging -import java.io.Closeable -import java.io.File -import java.util.concurrent.ExecutorCompletionService -import java.util.concurrent.Executors -import javax.sql.DataSource - -/** - * The logger for the experiment runner. - */ -private val logger = KotlinLogging.logger {} - -/** - * The experiment runner is responsible for orchestrating the simulation runs of an experiment. - * - * @param portfolios The portfolios to consider. - * @param ds The data source to write the experimental results to. - */ -public class ExperimentRunner( - private val portfolios: List<Portfolio>, - private val ds: DataSource, - private val reporterProvider: ExperimentReporterProvider, - private val environmentPath: File, - private val tracePath: File, - private val performanceInterferenceModel: PerformanceInterferenceModel?, - private val parallelism: Int = Runtime.getRuntime().availableProcessors() -) : Closeable { - /** - * The database helper to write the execution plan. - */ - private val helper = DatabaseHelper(ds.connection) - - /** - * The experiment identifier. - */ - private var experimentId = -1L - - /** - * The mapping of portfolios to their ids. - */ - private val portfolioIds = mutableMapOf<Portfolio, Long>() - - /** - * The mapping of scenarios to their ids. - */ - private val scenarioIds = mutableMapOf<Scenario, Long>() - - init { - reporterProvider.init(ds) - } - - /** - * Create an execution plan - */ - private fun createPlan(): List<Run> { - val runs = mutableListOf<Run>() - - for (portfolio in portfolios) { - val portfolioId = helper.persist(portfolio, experimentId) - portfolioIds[portfolio] = portfolioId - var scenarios = 0 - var runCount = 0 - - for (scenario in portfolio.scenarios) { - val scenarioId = helper.persist(scenario, portfolioId) - scenarioIds[scenario] = scenarioId - scenarios++ - - for (run in scenario.runs) { - helper.persist(run, scenarioId) - runCount++ - runs.add(run) - } - } - - logger.info { "Portfolio $portfolioId: ${portfolio.name} ($scenarios scenarios, $runCount runs total)" } - } - - return runs - } - - /** - * The raw parquet trace readers that are shared across simulations. - */ - private val rawTraceReaders = mutableMapOf<String, Sc20RawParquetTraceReader>() - - /** - * Create a trace reader for the specified trace. - */ - private fun createTraceReader( - name: String, - performanceInterferenceModel: PerformanceInterferenceModel?, - run: Run - ): TraceReader<VmWorkload> { - val raw = rawTraceReaders.getValue(name) - return Sc20ParquetTraceReader( - raw, - performanceInterferenceModel, - run - ) - } - - /** - * Create the environment reader for the specified environment. - */ - private fun createEnvironmentReader(name: String): EnvironmentReader { - return Sc20ClusterEnvironmentReader(File(environmentPath, "$name.txt")) - } - - /** - * Run the portfolios. - */ - @OptIn(ExperimentalStdlibApi::class) - public fun run() { - experimentId = helper.createExperiment() - logger.info { "Creating execution plan for experiment $experimentId" } - - val plan = createPlan() - val total = plan.size - val completionService = ExecutorCompletionService<Unit>(Executors.newCachedThreadPool()) - val pb = ProgressBar("Experiment", total.toLong()) - - var running = 0 - - for (run in plan) { - if (running >= parallelism) { - completionService.take() - running-- - } - - val scenarioId = scenarioIds[run.scenario]!! - - rawTraceReaders.computeIfAbsent(run.scenario.workload.name) { name -> - logger.info { "Loading trace $name" } - Sc20RawParquetTraceReader(File(tracePath, name)) - } - - completionService.submit { - pb.extraMessage = "($scenarioId, ${run.id}) START" - - var hasFailed = false - synchronized(helper) { - helper.startRun(scenarioId, run.id) - } - - try { - val reporter = reporterProvider.createReporter(scenarioIds[run.scenario]!!, run.id) - val traceReader = - createTraceReader(run.scenario.workload.name, performanceInterferenceModel, run) - val environmentReader = createEnvironmentReader(run.scenario.topology.name) - - try { - run.scenario(run, reporter, environmentReader, traceReader) - } finally { - reporter.close() - } - - pb.extraMessage = "($scenarioId, ${run.id}) OK" - } catch (e: Throwable) { - logger.error("A run has failed", e) - hasFailed = true - pb.extraMessage = "($scenarioId, ${run.id}) FAIL" - } finally { - synchronized(helper) { - helper.finishRun(scenarioId, run.id, hasFailed = hasFailed) - } - - pb.step() - } - } - - running++ - } - } - - override fun close() { - reporterProvider.close() - helper.close() - } -} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt index 631c1085..e17a145c 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt @@ -24,20 +24,19 @@ package com.atlarge.opendc.experiments.sc20 -import com.atlarge.opendc.experiments.sc20.reporter.ExperimentParquetReporter -import com.atlarge.opendc.experiments.sc20.reporter.ExperimentPostgresReporter -import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporter -import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporterProvider -import com.atlarge.opendc.experiments.sc20.reporter.ParquetHostMetricsWriter -import com.atlarge.opendc.experiments.sc20.reporter.ParquetProvisionerMetricsWriter -import com.atlarge.opendc.experiments.sc20.reporter.PostgresHostMetricsWriter -import com.atlarge.opendc.experiments.sc20.reporter.PostgresProvisionerMetricsWriter +import com.atlarge.opendc.experiments.sc20.experiment.Experiment +import com.atlarge.opendc.experiments.sc20.experiment.HorVerPortfolio +import com.atlarge.opendc.experiments.sc20.experiment.MoreHpcPortfolio +import com.atlarge.opendc.experiments.sc20.experiment.MoreVelocityPortfolio +import com.atlarge.opendc.experiments.sc20.experiment.OperationalPhenomenaPortfolio +import com.atlarge.opendc.experiments.sc20.experiment.Portfolio +import com.atlarge.opendc.experiments.sc20.reporter.ConsoleExperimentReporter +import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor +import com.atlarge.opendc.experiments.sc20.runner.execution.ThreadPoolExperimentScheduler +import com.atlarge.opendc.experiments.sc20.runner.internal.DefaultExperimentRunner import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader import com.atlarge.opendc.format.trace.sc20.Sc20VmPlacementReader import com.github.ajalt.clikt.core.CliktCommand -import com.github.ajalt.clikt.parameters.groups.OptionGroup -import com.github.ajalt.clikt.parameters.groups.groupChoice -import com.github.ajalt.clikt.parameters.groups.required import com.github.ajalt.clikt.parameters.options.convert import com.github.ajalt.clikt.parameters.options.default import com.github.ajalt.clikt.parameters.options.defaultLazy @@ -47,11 +46,9 @@ import com.github.ajalt.clikt.parameters.options.required import com.github.ajalt.clikt.parameters.types.choice import com.github.ajalt.clikt.parameters.types.file import com.github.ajalt.clikt.parameters.types.int -import com.zaxxer.hikari.HikariDataSource import mu.KotlinLogging import java.io.File import java.io.InputStream -import javax.sql.DataSource /** * The logger for this experiment. @@ -63,11 +60,6 @@ private val logger = KotlinLogging.logger {} */ class ExperimentCli : CliktCommand(name = "sc20-experiment") { /** - * The JDBC connection url to use. - */ - private val jdbcUrl by option("--jdbc-url", help = "JDBC connection url").required() - - /** * The path to the directory where the topology descriptions are located. */ private val environmentPath by option("--environment-path", help = "path to the environment directory") @@ -99,22 +91,14 @@ class ExperimentCli : CliktCommand(name = "sc20-experiment") { .default(emptyMap()) /** - * The type of reporter to use. - */ - private val reporter by option().groupChoice( - "parquet" to Reporter.Parquet(), - "postgres" to Reporter.Postgres() - ).required() - - /** * The selected portfolios to run. */ private val portfolios by option("--portfolio") .choice( - "hor-ver" to HorVerPortfolio, - "more-velocity" to MoreVelocityPortfolio, - "more-hpc" to MoreHpcPortfolio, - "operational-phenomena" to OperationalPhenomenaPortfolio, + "hor-ver" to { experiment: Experiment, i: Int -> HorVerPortfolio(experiment, i) } as (Experiment, Int) -> Portfolio, + "more-velocity" to ({ experiment, i -> MoreVelocityPortfolio(experiment, i) }), + "more-hpc" to { experiment, i -> MoreHpcPortfolio(experiment, i) }, + "operational-phenomena" to { experiment, i -> OperationalPhenomenaPortfolio(experiment, i) }, ignoreCase = true ) .multiple() @@ -122,93 +106,47 @@ class ExperimentCli : CliktCommand(name = "sc20-experiment") { /** * The maximum number of worker threads to use. */ - private val workerParallelism by option("--worker-parallelism") + private val parallelism by option("--parallelism") .int() .default(Runtime.getRuntime().availableProcessors()) /** - * The maximum number of host writer threads to use. - */ - private val hostWriterParallelism by option("--host-writer-parallelism") - .int() - .default(8) - - /** - * The maximum number of provisioner writer threads to use. - */ - private val provisionerWriterParallelism by option("--provisioner-writer-parallelism") - .int() - .default(1) - - /** * The buffer size for writing results. */ private val bufferSize by option("--buffer-size") .int() .default(4096) - override fun run() { - val ds = HikariDataSource() - ds.maximumPoolSize = Runtime.getRuntime().availableProcessors() * 3 - ds.jdbcUrl = jdbcUrl - ds.addDataSourceProperty("reWriteBatchedInserts", "true") + /** + * The path to the output directory. + */ + private val output by option("-O", "--output", help = "path to the output directory") + .file(canBeFile = false) + .defaultLazy { File("data") } - reporter.bufferSize = bufferSize - reporter.hostParallelism = hostWriterParallelism - reporter.provisionerParallelism = provisionerWriterParallelism + override fun run() { + logger.info { "Constructing performance interference model" } val performanceInterferenceModel = performanceInterferenceStream?.let { Sc20PerformanceInterferenceReader(it).construct() } - val runner = ExperimentRunner(portfolios, ds, reporter, environmentPath, tracePath, performanceInterferenceModel, workerParallelism) + logger.info { "Creating experiment descriptor" } + val descriptor = object : Experiment(environmentPath, tracePath, output, performanceInterferenceModel, vmPlacements, bufferSize) { + private val descriptor = this + override val children: Sequence<ExperimentDescriptor> = sequence { + for ((i, producer) in portfolios.withIndex()) { + yield(producer(descriptor, i)) + } + } + } + logger.info { "Starting experiment runner [parallelism=$parallelism]" } + val scheduler = ThreadPoolExperimentScheduler(parallelism) + val runner = DefaultExperimentRunner(scheduler) try { - runner.run() + runner.execute(descriptor, ConsoleExperimentReporter()) } finally { - runner.close() - ds.close() - } - } -} - -/** - * An option for specifying the type of reporter to use. - */ -internal sealed class Reporter(name: String) : OptionGroup(name), ExperimentReporterProvider { - var bufferSize = 4096 - var hostParallelism = 8 - var provisionerParallelism = 1 - - class Parquet : Reporter("Options for reporting using Parquet") { - private val path by option("--parquet-directory", help = "path to where the output should be stored") - .file() - .defaultLazy { File("data") } - - override fun createReporter(scenario: Long, run: Int): ExperimentReporter { - val hostWriter = ParquetHostMetricsWriter(File(path, "$scenario-$run-host.parquet"), bufferSize) - val provisionerWriter = ParquetProvisionerMetricsWriter(File(path, "$scenario-$run-provisioner.parquet"), bufferSize) - return ExperimentParquetReporter(scenario, run, hostWriter, provisionerWriter) - } - - override fun close() {} - } - - class Postgres : Reporter("Options for reporting using PostgreSQL") { - lateinit var hostWriter: PostgresHostMetricsWriter - lateinit var provisionerWriter: PostgresProvisionerMetricsWriter - - override fun init(ds: DataSource) { - hostWriter = PostgresHostMetricsWriter(ds, hostParallelism, bufferSize) - provisionerWriter = PostgresProvisionerMetricsWriter(ds, provisionerParallelism, bufferSize) - } - - override fun createReporter(scenario: Long, run: Int): ExperimentReporter { - return ExperimentPostgresReporter(scenario, run, hostWriter, provisionerWriter) - } - - override fun close() { - hostWriter.close() - provisionerWriter.close() + scheduler.close() } } } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Experiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Experiment.kt new file mode 100644 index 00000000..5feb5917 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Experiment.kt @@ -0,0 +1,78 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.experiment + +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel +import com.atlarge.opendc.experiments.sc20.runner.ContainerExperimentDescriptor +import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor +import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionContext +import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionListener +import com.atlarge.opendc.experiments.sc20.telemetry.RunEvent +import com.atlarge.opendc.experiments.sc20.telemetry.parquet.ParquetRunEventWriter +import java.io.File + +/** + * The global configuration of the experiment. + * + * @param environments The path to the topologies directory. + * @param traces The path to the traces directory. + * @param output The output directory. + * @param performanceInterferenceModel The optional performance interference model that has been specified. + * @param vmPlacements Original VM placement in the trace. + * @param bufferSize The buffer size of the event reporters. + */ +public abstract class Experiment( + val environments: File, + val traces: File, + val output: File, + val performanceInterferenceModel: PerformanceInterferenceModel?, + val vmPlacements: Map<String, String>, + val bufferSize: Int +) : ContainerExperimentDescriptor() { + override val parent: ExperimentDescriptor? = null + + override suspend fun invoke(context: ExperimentExecutionContext) { + val writer = ParquetRunEventWriter(File(output, "experiments.parquet"), bufferSize) + try { + val listener = object : ExperimentExecutionListener by context.listener { + override fun descriptorRegistered(descriptor: ExperimentDescriptor) { + if (descriptor is Run) { + writer.write(RunEvent(descriptor, System.currentTimeMillis())) + } + + context.listener.descriptorRegistered(descriptor) + } + } + + val newContext = object : ExperimentExecutionContext by context { + override val listener: ExperimentExecutionListener = listener + } + + super.invoke(newContext) + } finally { + writer.close() + } + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt index 1bc463c3..32dc87ef 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt @@ -22,7 +22,7 @@ * SOFTWARE. */ -package com.atlarge.opendc.experiments.sc20 +package com.atlarge.opendc.experiments.sc20.experiment import com.atlarge.odcsim.Domain import com.atlarge.odcsim.simulationContext @@ -41,7 +41,7 @@ import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy import com.atlarge.opendc.core.failure.CorrelatedFaultInjector import com.atlarge.opendc.core.failure.FailureDomain import com.atlarge.opendc.core.failure.FaultInjector -import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporter +import com.atlarge.opendc.experiments.sc20.experiment.monitor.ExperimentMonitor import com.atlarge.opendc.experiments.sc20.trace.Sc20StreamingParquetTraceReader import com.atlarge.opendc.format.environment.EnvironmentReader import com.atlarge.opendc.format.trace.TraceReader @@ -83,7 +83,13 @@ suspend fun createFailureDomain( for (node in bareMetalProvisioner.nodes()) { val cluster = node.metadata[NODE_CLUSTER] as String val injector = - injectors.getOrPut(cluster) { createFaultInjector(simulationContext.domain, random, failureInterval) } + injectors.getOrPut(cluster) { + createFaultInjector( + simulationContext.domain, + random, + failureInterval + ) + } injector.enqueue(node.metadata["driver"] as FailureDomain) } } @@ -143,7 +149,7 @@ suspend fun createProvisioner( * Attach the specified monitor to the VM provisioner. */ @OptIn(ExperimentalCoroutinesApi::class) -suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: ExperimentReporter) { +suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: ExperimentMonitor) { val domain = simulationContext.domain val clock = simulationContext.clock val hypervisors = scheduler.drivers() @@ -151,13 +157,13 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: Ex // Monitor hypervisor events for (hypervisor in hypervisors) { // TODO Do not expose VirtDriver directly but use Hypervisor class. - reporter.reportHostStateChange(clock.millis(), hypervisor, (hypervisor as SimpleVirtDriver).server) + monitor.reportHostStateChange(clock.millis(), hypervisor, (hypervisor as SimpleVirtDriver).server) hypervisor.server.events .onEach { event -> val time = clock.millis() when (event) { is ServerEvent.StateChanged -> { - reporter.reportHostStateChange(time, hypervisor, event.server) + monitor.reportHostStateChange(time, hypervisor, event.server) } } } @@ -165,7 +171,7 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: Ex hypervisor.events .onEach { event -> when (event) { - is HypervisorEvent.SliceFinished -> reporter.reportHostSlice( + is HypervisorEvent.SliceFinished -> monitor.reportHostSlice( simulationContext.clock.millis(), event.requestedBurst, event.grantedBurst, @@ -182,7 +188,7 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: Ex val driver = hypervisor.server.services[BareMetalDriver.Key] driver.powerDraw - .onEach { reporter.reportPowerConsumption(hypervisor.server, it) } + .onEach { monitor.reportPowerConsumption(hypervisor.server, it) } .launchIn(domain) } @@ -190,7 +196,7 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: Ex .onEach { event -> when (event) { is VirtProvisioningEvent.MetricsAvailable -> - reporter.reportProvisionerMetrics(clock.millis(), event) + monitor.reportProvisionerMetrics(clock.millis(), event) } } .launchIn(domain) @@ -199,7 +205,7 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: Ex /** * Process the trace. */ -suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, reporter: ExperimentReporter, vmPlacements: Map<String, String> = emptyMap()) { +suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, monitor: ExperimentMonitor, vmPlacements: Map<String, String> = emptyMap()) { val domain = simulationContext.domain try { @@ -239,7 +245,7 @@ suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtP val time = simulationContext.clock.millis() if (it is ServerEvent.StateChanged) { - reporter.reportVmStateChange(time, it.server) + monitor.reportVmStateChange(time, it.server) } delay(1) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolio.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolio.kt new file mode 100644 index 00000000..6a40f5fb --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolio.kt @@ -0,0 +1,90 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.experiment + +import com.atlarge.opendc.experiments.sc20.experiment.model.OperationalPhenomena +import com.atlarge.opendc.experiments.sc20.experiment.model.Topology +import com.atlarge.opendc.experiments.sc20.experiment.model.Workload +import com.atlarge.opendc.experiments.sc20.runner.ContainerExperimentDescriptor + +/** + * A portfolio represents a collection of scenarios are tested. + */ +public abstract class Portfolio( + override val parent: Experiment, + val id: Int, + val name: String +) : ContainerExperimentDescriptor() { + /** + * The topologies to consider. + */ + protected abstract val topologies: List<Topology> + + /** + * The workloads to consider. + */ + protected abstract val workloads: List<Workload> + + /** + * The operational phenomenas to consider. + */ + protected abstract val operationalPhenomenas: List<OperationalPhenomena> + + /** + * The allocation policies to consider. + */ + protected abstract val allocationPolicies: List<String> + + /** + * The number of repetitions to perform. + */ + open val repetitions: Int = 32 + + /** + * Resolve the children of this container. + */ + override val children: Sequence<Scenario> = sequence { + var id = 0 + for (topology in topologies) { + for (workload in workloads) { + for (operationalPhenomena in operationalPhenomenas) { + for (allocationPolicy in allocationPolicies) { + yield( + Scenario( + this@Portfolio, + id++, + repetitions, + topology, + workload, + allocationPolicy, + operationalPhenomena + ) + ) + } + } + } + } + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt index 668304b6..4800ceba 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt @@ -22,42 +22,13 @@ * SOFTWARE. */ -package com.atlarge.opendc.experiments.sc20 - -abstract class AbstractSc20Portfolio(name: String) : Portfolio(name) { - abstract val topologies: List<Topology> - abstract val workloads: List<Workload> - abstract val operationalPhenomena: List<Pair<Double, Boolean>> - abstract val allocationPolicies: List<String> - - open val repetitions = 8 - - override val scenarios: Sequence<Scenario> = sequence { - for (topology in topologies) { - for (workload in workloads) { - for ((failureFrequency, hasInterference) in operationalPhenomena) { - for (allocationPolicy in allocationPolicies) { - yield( - Scenario( - this@AbstractSc20Portfolio, - repetitions, - topology, - workload, - allocationPolicy, - failureFrequency, - hasInterference - ) - ) - } - } - } - } - } -} +package com.atlarge.opendc.experiments.sc20.experiment -private val defaultFailureInterval = 24.0 * 7 +import com.atlarge.opendc.experiments.sc20.experiment.model.OperationalPhenomena +import com.atlarge.opendc.experiments.sc20.experiment.model.Topology +import com.atlarge.opendc.experiments.sc20.experiment.model.Workload -object HorVerPortfolio : AbstractSc20Portfolio("horizontal_vs_vertical") { +public class HorVerPortfolio(parent: Experiment, id: Int) : Portfolio(parent, id, "horizontal_vs_vertical") { override val topologies = listOf( Topology("base"), Topology("rep-vol-hor-hom"), @@ -77,8 +48,8 @@ object HorVerPortfolio : AbstractSc20Portfolio("horizontal_vs_vertical") { Workload("solvinity", 1.0) ) - override val operationalPhenomena = listOf( - defaultFailureInterval to true + override val operationalPhenomenas = listOf( + OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true) ) override val allocationPolicies = listOf( @@ -86,7 +57,7 @@ object HorVerPortfolio : AbstractSc20Portfolio("horizontal_vs_vertical") { ) } -object MoreVelocityPortfolio : AbstractSc20Portfolio("more_velocity") { +public class MoreVelocityPortfolio(parent: Experiment, id: Int) : Portfolio(parent, id, "more_velocity") { override val topologies = listOf( Topology("base"), Topology("rep-vel-ver-hom"), @@ -102,8 +73,8 @@ object MoreVelocityPortfolio : AbstractSc20Portfolio("more_velocity") { Workload("solvinity", 1.0) ) - override val operationalPhenomena = listOf( - defaultFailureInterval to true + override val operationalPhenomenas = listOf( + OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true) ) override val allocationPolicies = listOf( @@ -111,7 +82,7 @@ object MoreVelocityPortfolio : AbstractSc20Portfolio("more_velocity") { ) } -object MoreHpcPortfolio : AbstractSc20Portfolio("more_hpc") { +public class MoreHpcPortfolio(parent: Experiment, id: Int) : Portfolio(parent, id, "more_hpc") { override val topologies = listOf( Topology("base"), Topology("exp-vol-hor-hom"), @@ -126,8 +97,8 @@ object MoreHpcPortfolio : AbstractSc20Portfolio("more_hpc") { Workload("solvinity", 1.0) ) - override val operationalPhenomena = listOf( - defaultFailureInterval to true + override val operationalPhenomenas = listOf( + OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true) ) override val allocationPolicies = listOf( @@ -135,7 +106,7 @@ object MoreHpcPortfolio : AbstractSc20Portfolio("more_hpc") { ) } -object OperationalPhenomenaPortfolio : AbstractSc20Portfolio("operational_phenomena") { +public class OperationalPhenomenaPortfolio(parent: Experiment, id: Int) : Portfolio(parent, id, "operational_phenomena") { override val topologies = listOf( Topology("base") ) @@ -146,11 +117,11 @@ object OperationalPhenomenaPortfolio : AbstractSc20Portfolio("operational_phenom Workload("solvinity", 1.0) ) - override val operationalPhenomena = listOf( - defaultFailureInterval to true, - 0.0 to true, - defaultFailureInterval to false, - defaultFailureInterval to true + override val operationalPhenomenas = listOf( + OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true), + OperationalPhenomena(failureFrequency = 0.0, hasInterference = true), + OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = false), + OperationalPhenomena(failureFrequency = 0.0, hasInterference = false) ) override val allocationPolicies = listOf( diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Scenario.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt index 457255cb..6d53fd17 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Scenario.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt @@ -22,24 +22,26 @@ * SOFTWARE. */ -package com.atlarge.opendc.experiments.sc20 +package com.atlarge.opendc.experiments.sc20.experiment import com.atlarge.odcsim.SimulationEngineProvider -import com.atlarge.opendc.compute.core.workload.VmWorkload import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy import com.atlarge.opendc.compute.virt.service.allocation.NumberOfActiveServersAllocationPolicy import com.atlarge.opendc.compute.virt.service.allocation.ProvisionedCoresAllocationPolicy import com.atlarge.opendc.compute.virt.service.allocation.RandomAllocationPolicy import com.atlarge.opendc.compute.virt.service.allocation.ReplayAllocationPolicy -import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporter -import com.atlarge.opendc.format.environment.EnvironmentReader -import com.atlarge.opendc.format.trace.TraceReader +import com.atlarge.opendc.experiments.sc20.experiment.monitor.ParquetExperimentMonitor +import com.atlarge.opendc.experiments.sc20.runner.TrialExperimentDescriptor +import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionContext +import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader +import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader +import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking import mu.KotlinLogging +import java.io.File import java.util.ServiceLoader import kotlin.random.Random @@ -54,36 +56,19 @@ private val logger = KotlinLogging.logger {} private val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() /** - * A scenario represents a single point in the design space (a unique combination of parameters). + * An experiment run represent a single invocation of a trial and is used to distinguish between repetitions of the + * same set of parameters. */ -public class Scenario( - val portfolio: Portfolio, - val repetitions: Int, - val topology: Topology, - val workload: Workload, - val allocationPolicy: String, - val failureFrequency: Double, - val hasInterference: Boolean -) { - /** - * The runs this scenario consists of. - */ - public val runs: Sequence<Run> = sequence { - repeat(repetitions) { i -> - yield(Run(this@Scenario, i, i)) - } - } - - /** - * Perform a single run of this scenario. - */ - public operator fun invoke(run: Run, reporter: ExperimentReporter, environment: EnvironmentReader, trace: TraceReader<VmWorkload>) { - val system = provider("experiment-${run.id}") +public data class Run(override val parent: Scenario, val id: Int, val seed: Int) : TrialExperimentDescriptor() { + override suspend fun invoke(context: ExperimentExecutionContext) { + val experiment = parent.parent.parent + val system = provider("experiment-$id") val root = system.newDomain("root") - val seeder = Random(run.seed) + val seeder = Random(seed) + val environment = Sc20ClusterEnvironmentReader(File(experiment.environments, "${parent.topology.name}.txt")) val chan = Channel<Unit>(Channel.CONFLATED) - val allocationPolicy = when (this.allocationPolicy) { + val allocationPolicy = when (parent.allocationPolicy) { "mem" -> AvailableMemoryAllocationPolicy() "mem-inv" -> AvailableMemoryAllocationPolicy(true) "core-mem" -> AvailableCoreMemoryAllocationPolicy() @@ -94,21 +79,49 @@ public class Scenario( "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true) "random" -> RandomAllocationPolicy(Random(seeder.nextInt())) "replay" -> ReplayAllocationPolicy(emptyMap()) - else -> throw IllegalArgumentException("Unknown policy ${this.allocationPolicy}") + else -> throw IllegalArgumentException("Unknown policy ${parent.allocationPolicy}") } + @Suppress("UNCHECKED_CAST") + val rawTraceReaders = context.cache.computeIfAbsent("raw-trace-readers") { mutableMapOf<String, Sc20RawParquetTraceReader>() } as MutableMap<String, Sc20RawParquetTraceReader> + val raw = synchronized(rawTraceReaders) { + val name = parent.workload.name + rawTraceReaders.computeIfAbsent(name) { + logger.info { "Loading trace $name" } + Sc20RawParquetTraceReader(File(experiment.traces, name)) + } + } + val trace = Sc20ParquetTraceReader(raw, experiment.performanceInterferenceModel, this) + + val monitor = ParquetExperimentMonitor(this) + root.launch { - val (bareMetalProvisioner, scheduler) = createProvisioner(root, environment, allocationPolicy) + val (bareMetalProvisioner, scheduler) = createProvisioner( + root, + environment, + allocationPolicy + ) - val failureDomain = if (failureFrequency > 0) { + val failureDomain = if (parent.operationalPhenomena.failureFrequency > 0) { logger.debug("ENABLING failures") - createFailureDomain(seeder.nextInt(), failureFrequency, bareMetalProvisioner, chan) + createFailureDomain( + seeder.nextInt(), + parent.operationalPhenomena.failureFrequency, + bareMetalProvisioner, + chan + ) } else { null } - attachMonitor(scheduler, reporter) - processTrace(trace, scheduler, chan, reporter, emptyMap()) + attachMonitor(scheduler, monitor) + processTrace( + trace, + scheduler, + chan, + monitor, + experiment.vmPlacements + ) logger.debug("SUBMIT=${scheduler.submittedVms}") logger.debug("FAIL=${scheduler.unscheduledVms}") @@ -120,9 +133,11 @@ public class Scenario( scheduler.terminate() } - runBlocking { + try { system.run() + } finally { system.terminate() + monitor.close() } } } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Scenario.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Scenario.kt new file mode 100644 index 00000000..98bc7fc2 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Scenario.kt @@ -0,0 +1,48 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.experiment + +import com.atlarge.opendc.experiments.sc20.experiment.model.OperationalPhenomena +import com.atlarge.opendc.experiments.sc20.experiment.model.Topology +import com.atlarge.opendc.experiments.sc20.experiment.model.Workload +import com.atlarge.opendc.experiments.sc20.runner.ContainerExperimentDescriptor +import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor + +/** + * A scenario represents a single point in the design space (a unique combination of parameters). + */ +public class Scenario( + override val parent: Portfolio, + val id: Int, + val repetitions: Int, + val topology: Topology, + val workload: Workload, + val allocationPolicy: String, + val operationalPhenomena: OperationalPhenomena +) : ContainerExperimentDescriptor() { + override val children: Sequence<ExperimentDescriptor> = sequence { + repeat(repetitions) { i -> yield(Run(this@Scenario, i, i)) } + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/OperationalPhenomena.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/OperationalPhenomena.kt new file mode 100644 index 00000000..af99df84 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/OperationalPhenomena.kt @@ -0,0 +1,33 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.experiment.model + +/** + * Operation phenomena during experiments. + * + * @param failureFrequency The average time between failures in hours. + * @param hasInterference A flag to enable performance interference between VMs. + */ +public data class OperationalPhenomena(val failureFrequency: Double, val hasInterference: Boolean) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Topology.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Topology.kt index d2be9599..3ed71e09 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Topology.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Topology.kt @@ -22,7 +22,7 @@ * SOFTWARE. */ -package com.atlarge.opendc.experiments.sc20 +package com.atlarge.opendc.experiments.sc20.experiment.model /** * The datacenter topology on which we test the workload. diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Workload.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Workload.kt index 4ab5ec8c..2dbdf570 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Workload.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Workload.kt @@ -22,7 +22,7 @@ * SOFTWARE. */ -package com.atlarge.opendc.experiments.sc20 +package com.atlarge.opendc.experiments.sc20.experiment.model /** * A workload that is considered for a scenario. diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ExperimentMonitor.kt index 049035cc..1f674f00 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ExperimentMonitor.kt @@ -22,7 +22,7 @@ * SOFTWARE. */ -package com.atlarge.opendc.experiments.sc20.reporter +package com.atlarge.opendc.experiments.sc20.experiment.monitor import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.virt.driver.VirtDriver @@ -30,9 +30,9 @@ import com.atlarge.opendc.compute.virt.service.VirtProvisioningEvent import java.io.Closeable /** - * A reporter used by experiments to report metrics. + * A monitor watches the events of an experiment. */ -interface ExperimentReporter : Closeable { +interface ExperimentMonitor : Closeable { /** * This method is invoked when the state of a VM changes. */ diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetExperimentReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt index 9426933e..33978aab 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetExperimentReporter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt @@ -22,23 +22,38 @@ * SOFTWARE. */ -package com.atlarge.opendc.experiments.sc20.reporter +package com.atlarge.opendc.experiments.sc20.experiment.monitor import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.virt.driver.VirtDriver import com.atlarge.opendc.compute.virt.service.VirtProvisioningEvent +import com.atlarge.opendc.experiments.sc20.experiment.Run +import com.atlarge.opendc.experiments.sc20.telemetry.HostEvent +import com.atlarge.opendc.experiments.sc20.telemetry.ProvisionerEvent +import com.atlarge.opendc.experiments.sc20.telemetry.parquet.ParquetHostEventWriter +import com.atlarge.opendc.experiments.sc20.telemetry.parquet.ParquetProvisionerEventWriter import mu.KotlinLogging +import java.io.File +/** + * The logger instance to use. + */ private val logger = KotlinLogging.logger {} -class ExperimentParquetReporter( - val scenario: Long, - val run: Int, - val hostWriter: ParquetHostMetricsWriter, - val provisionerWriter: ParquetProvisionerMetricsWriter -) : - ExperimentReporter { +/** + * An [ExperimentMonitor] that logs the events to a Parquet file. + */ +class ParquetExperimentMonitor(val run: Run) : ExperimentMonitor { + private val partition = "portfolio_id=${run.parent.parent.id}/scenario_id=${run.parent.id}/run_id=${run.id}" + private val hostWriter = ParquetHostEventWriter( + File(run.parent.parent.parent.output, "host-metrics/$partition/data.parquet"), + run.parent.parent.parent.bufferSize + ) + private val provisionerWriter = ParquetProvisionerEventWriter( + File(run.parent.parent.parent.output, "provisioner-metrics/$partition/data.parquet"), + run.parent.parent.parent.bufferSize + ) private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>() override fun reportVmStateChange(time: Long, server: Server) {} @@ -92,7 +107,7 @@ class ExperimentParquetReporter( duration: Long ) { hostWriter.write( - scenario, run, HostMetrics( + HostEvent( time, duration, hostServer, @@ -110,9 +125,7 @@ class ExperimentParquetReporter( override fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) { provisionerWriter.write( - scenario, - run, - ProvisionerMetrics( + ProvisionerEvent( time, event.totalHostCount, event.availableHostCount, diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt new file mode 100644 index 00000000..f59402d5 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt @@ -0,0 +1,75 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.reporter + +import com.atlarge.opendc.experiments.sc20.experiment.Run +import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor +import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionListener +import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionResult +import me.tongfei.progressbar.ProgressBar +import me.tongfei.progressbar.ProgressBarBuilder + +/** + * A reporter that reports the experiment progress to the console. + */ +public class ConsoleExperimentReporter : ExperimentExecutionListener { + /** + * The active [Run]s. + */ + private val runs: MutableSet<Run> = mutableSetOf() + + /** + * The total number of runs. + */ + private var total = 0 + + /** + * The progress bar to keep track of the progress. + */ + private val pb: ProgressBar = ProgressBarBuilder() + .setTaskName("") + .setInitialMax(1) + .build() + + override fun descriptorRegistered(descriptor: ExperimentDescriptor) { + if (descriptor is Run) { + runs += descriptor + pb.maxHint((++total).toLong()) + } + } + + override fun executionFinished(descriptor: ExperimentDescriptor, result: ExperimentExecutionResult) { + if (descriptor is Run) { + runs -= descriptor + + pb.stepTo(total - runs.size.toLong()) + if (runs.isEmpty()) { + pb.close() + } + } + } + + override fun executionStarted(descriptor: ExperimentDescriptor) {} +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetHostMetricsWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetHostMetricsWriter.kt deleted file mode 100644 index 33839191..00000000 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetHostMetricsWriter.kt +++ /dev/null @@ -1,73 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.experiments.sc20.reporter - -import org.apache.avro.Schema -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData -import java.io.File - -private val schema: Schema = SchemaBuilder - .record("host_metrics") - .namespace("com.atlarge.opendc.experiments.sc20") - .fields() - .name("scenario_id").type().longType().noDefault() - .name("run_id").type().intType().noDefault() - .name("timestamp").type().longType().noDefault() - .name("duration").type().longType().noDefault() - .name("host_id").type().stringType().noDefault() - .name("state").type().stringType().noDefault() - .name("vm_count").type().intType().noDefault() - .name("requested_burst").type().longType().noDefault() - .name("granted_burst").type().longType().noDefault() - .name("overcommissioned_burst").type().longType().noDefault() - .name("interfered_burst").type().longType().noDefault() - .name("cpu_usage").type().doubleType().noDefault() - .name("cpu_demand").type().doubleType().noDefault() - .name("power_draw").type().doubleType().noDefault() - .endRecord() - -public class ParquetHostMetricsWriter(path: File, batchSize: Int) : - ParquetMetricsWriter<HostMetrics>(path, schema, batchSize) { - - override fun persist(action: Action.Write<HostMetrics>, row: GenericData.Record) { - row.put("scenario_id", action.scenario) - row.put("run_id", action.run) - row.put("host_id", action.metrics.host.name) - row.put("state", action.metrics.host.state.name) - row.put("timestamp", action.metrics.time) - row.put("duration", action.metrics.duration) - row.put("vm_count", action.metrics.vmCount) - row.put("requested_burst", action.metrics.requestedBurst) - row.put("granted_burst", action.metrics.grantedBurst) - row.put("overcommissioned_burst", action.metrics.overcommissionedBurst) - row.put("interfered_burst", action.metrics.interferedBurst) - row.put("cpu_usage", action.metrics.cpuUsage) - row.put("cpu_demand", action.metrics.cpuDemand) - row.put("power_draw", action.metrics.powerDraw) - } - - override fun toString(): String = "host-writer" -} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetProvisionerMetricsWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetProvisionerMetricsWriter.kt deleted file mode 100644 index 0c74b23e..00000000 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetProvisionerMetricsWriter.kt +++ /dev/null @@ -1,65 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.experiments.sc20.reporter - -import org.apache.avro.Schema -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData -import java.io.File - -private val schema: Schema = SchemaBuilder - .record("host_metrics") - .namespace("com.atlarge.opendc.experiments.sc20") - .fields() - .name("scenario_id").type().longType().noDefault() - .name("run_id").type().intType().noDefault() - .name("timestamp").type().longType().noDefault() - .name("host_total_count").type().intType().noDefault() - .name("host_available_count").type().intType().noDefault() - .name("vm_total_count").type().intType().noDefault() - .name("vm_active_count").type().intType().noDefault() - .name("vm_inactive_count").type().intType().noDefault() - .name("vm_waiting_count").type().intType().noDefault() - .name("vm_failed_count").type().intType().noDefault() - .endRecord() - -public class ParquetProvisionerMetricsWriter(path: File, batchSize: Int) : - ParquetMetricsWriter<ProvisionerMetrics>(path, schema, batchSize) { - - override fun persist(action: Action.Write<ProvisionerMetrics>, row: GenericData.Record) { - row.put("scenario_id", action.scenario) - row.put("run_id", action.run) - row.put("timestamp", action.metrics.time) - row.put("host_total_count", action.metrics.totalHostCount) - row.put("host_available_count", action.metrics.availableHostCount) - row.put("vm_total_count", action.metrics.totalVmCount) - row.put("vm_active_count", action.metrics.activeVmCount) - row.put("vm_inactive_count", action.metrics.inactiveVmCount) - row.put("vm_waiting_count", action.metrics.waitingVmCount) - row.put("vm_failed_count", action.metrics.failedVmCount) - } - - override fun toString(): String = "host-writer" -} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt deleted file mode 100644 index 595b9777..00000000 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt +++ /dev/null @@ -1,128 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.experiments.sc20.reporter - -import com.atlarge.opendc.compute.core.Server -import com.atlarge.opendc.compute.core.ServerState -import com.atlarge.opendc.compute.virt.driver.VirtDriver -import com.atlarge.opendc.compute.virt.service.VirtProvisioningEvent -import mu.KotlinLogging - -private val logger = KotlinLogging.logger {} - -class ExperimentPostgresReporter( - val scenario: Long, - val run: Int, - val hostWriter: PostgresHostMetricsWriter, - val provisionerWriter: PostgresProvisionerMetricsWriter -) : ExperimentReporter { - private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>() - - override fun reportVmStateChange(time: Long, server: Server) {} - - override fun reportHostStateChange( - time: Long, - driver: VirtDriver, - server: Server - ) { - val lastServerState = lastServerStates[server] - logger.debug("Host ${server.uid} changed state ${server.state} [$time]") - - if (server.state == ServerState.SHUTOFF && lastServerState != null) { - val duration = time - lastServerState.second - reportHostSlice( - time, - 0, - 0, - 0, - 0, - 0.0, - 0.0, - 0, - server, - duration - ) - - lastServerStates.remove(server) - lastPowerConsumption.remove(server) - } else { - lastServerStates[server] = Pair(server.state, time) - } - } - - private val lastPowerConsumption = mutableMapOf<Server, Double>() - - override fun reportPowerConsumption(host: Server, draw: Double) { - lastPowerConsumption[host] = draw - } - - override fun reportHostSlice( - time: Long, - requestedBurst: Long, - grantedBurst: Long, - overcommissionedBurst: Long, - interferedBurst: Long, - cpuUsage: Double, - cpuDemand: Double, - numberOfDeployedImages: Int, - hostServer: Server, - duration: Long - ) { - hostWriter.write( - scenario, run, HostMetrics( - time, - duration, - hostServer, - numberOfDeployedImages, - requestedBurst, - grantedBurst, - overcommissionedBurst, - interferedBurst, - cpuUsage, - cpuDemand, - lastPowerConsumption[hostServer] ?: 200.0 - ) - ) - } - - override fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) { - provisionerWriter.write( - scenario, - run, - ProvisionerMetrics( - time, - event.totalHostCount, - event.availableHostCount, - event.totalVmCount, - event.activeVmCount, - event.inactiveVmCount, - event.waitingVmCount, - event.failedVmCount - ) - ) - } - - override fun close() {} -} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresHostMetricsWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresHostMetricsWriter.kt deleted file mode 100644 index 57e665ae..00000000 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresHostMetricsWriter.kt +++ /dev/null @@ -1,75 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.experiments.sc20.reporter - -import de.bytefish.pgbulkinsert.row.SimpleRow -import de.bytefish.pgbulkinsert.row.SimpleRowWriter -import javax.sql.DataSource - -private val table: SimpleRowWriter.Table = SimpleRowWriter.Table( - "host_metrics", - *arrayOf( - "scenario_id", - "run_id", - "host_id", - "state", - "timestamp", - "duration", - "vm_count", - "requested_burst", - "granted_burst", - "overcommissioned_burst", - "interfered_burst", - "cpu_usage", - "cpu_demand", - "power_draw" - ) -) - -/** - * A [PostgresMetricsWriter] for persisting [HostMetrics]. - */ -public class PostgresHostMetricsWriter(ds: DataSource, parallelism: Int, batchSize: Int) : - PostgresMetricsWriter<HostMetrics>(ds, table, parallelism, batchSize) { - - override fun persist(action: Action.Write<HostMetrics>, row: SimpleRow) { - row.setLong("scenario_id", action.scenario) - row.setInteger("run_id", action.run) - row.setText("host_id", action.metrics.host.name) - row.setText("state", action.metrics.host.state.name) - row.setLong("timestamp", action.metrics.time) - row.setLong("duration", action.metrics.duration) - row.setInteger("vm_count", action.metrics.vmCount) - row.setLong("requested_burst", action.metrics.requestedBurst) - row.setLong("granted_burst", action.metrics.grantedBurst) - row.setLong("overcommissioned_burst", action.metrics.overcommissionedBurst) - row.setLong("interfered_burst", action.metrics.interferedBurst) - row.setDouble("cpu_usage", action.metrics.cpuUsage) - row.setDouble("cpu_demand", action.metrics.cpuDemand) - row.setDouble("power_draw", action.metrics.powerDraw) - } - - override fun toString(): String = "host-writer" -} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt deleted file mode 100644 index bee01e51..00000000 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt +++ /dev/null @@ -1,124 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.experiments.sc20.reporter - -import de.bytefish.pgbulkinsert.row.SimpleRow -import de.bytefish.pgbulkinsert.row.SimpleRowWriter -import org.postgresql.PGConnection -import java.io.Closeable -import java.util.concurrent.ArrayBlockingQueue -import java.util.concurrent.BlockingQueue -import java.util.concurrent.Executors -import java.util.concurrent.TimeUnit -import javax.sql.DataSource - -/** - * The experiment writer is a separate thread that is responsible for writing the results to the - * database. - */ -public abstract class PostgresMetricsWriter<T>( - private val ds: DataSource, - private val table: SimpleRowWriter.Table, - private val parallelism: Int = 8, - private val bufferSize: Int = 4096 -) : Runnable, Closeable { - /** - * The queue of commands to process. - */ - private val queue: BlockingQueue<Action> = ArrayBlockingQueue(parallelism * bufferSize) - - /** - * The executor service to use. - */ - private val executorService = Executors.newFixedThreadPool(parallelism) - - /** - * Write the specified metrics to the database. - */ - public fun write(scenario: Long, run: Int, metrics: T) { - queue.put(Action.Write(scenario, run, metrics)) - } - - /** - * Signal the writer to stop. - */ - public override fun close() { - repeat(parallelism) { - queue.put(Action.Stop) - } - executorService.shutdown() - executorService.awaitTermination(5, TimeUnit.MINUTES) - } - - /** - * Persist the specified metrics to the given [row]. - */ - public abstract fun persist(action: Action.Write<T>, row: SimpleRow) - - init { - repeat(parallelism) { - executorService.submit { run() } - } - } - - /** - * Start the writer thread. - */ - override fun run() { - val conn = ds.connection - - val writer = SimpleRowWriter(table) - writer.open(conn.unwrap(PGConnection::class.java)) - - try { - - loop@ while (true) { - val action = queue.take() - when (action) { - is Action.Stop -> break@loop - is Action.Write<*> -> writer.startRow { - @Suppress("UNCHECKED_CAST") - persist(action as Action.Write<T>, it) - } - } - } - } finally { - writer.close() - conn.close() - } - } - - sealed class Action { - /** - * A poison pill that will stop the writer thread. - */ - object Stop : Action() - - /** - * Write the specified metrics to the database. - */ - data class Write<out T>(val scenario: Long, val run: Int, val metrics: T) : Action() - } -} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresProvisionerMetricsWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresProvisionerMetricsWriter.kt deleted file mode 100644 index 17788112..00000000 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresProvisionerMetricsWriter.kt +++ /dev/null @@ -1,67 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.experiments.sc20.reporter - -import de.bytefish.pgbulkinsert.row.SimpleRow -import de.bytefish.pgbulkinsert.row.SimpleRowWriter -import javax.sql.DataSource - -private val table: SimpleRowWriter.Table = SimpleRowWriter.Table( - "provisioner_metrics", - *arrayOf( - "scenario_id", - "run_id", - "timestamp", - "host_total_count", - "host_available_count", - "vm_total_count", - "vm_active_count", - "vm_inactive_count", - "vm_waiting_count", - "vm_failed_count" - ) -) - -/** - * A [PostgresMetricsWriter] for persisting [ProvisionerMetrics]. - */ -public class PostgresProvisionerMetricsWriter(ds: DataSource, parallelism: Int, batchSize: Int) : - PostgresMetricsWriter<ProvisionerMetrics>(ds, table, parallelism, batchSize) { - - override fun persist(action: Action.Write<ProvisionerMetrics>, row: SimpleRow) { - row.setLong("scenario_id", action.scenario) - row.setInteger("run_id", action.run) - row.setLong("timestamp", action.metrics.time) - row.setInteger("host_total_count", action.metrics.totalHostCount) - row.setInteger("host_available_count", action.metrics.availableHostCount) - row.setInteger("vm_total_count", action.metrics.totalVmCount) - row.setInteger("vm_active_count", action.metrics.activeVmCount) - row.setInteger("vm_inactive_count", action.metrics.inactiveVmCount) - row.setInteger("vm_waiting_count", action.metrics.waitingVmCount) - row.setInteger("vm_failed_count", action.metrics.failedVmCount) - } - - override fun toString(): String = "provisioner-writer" -} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ContainerExperimentDescriptor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ContainerExperimentDescriptor.kt new file mode 100644 index 00000000..dac32586 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ContainerExperimentDescriptor.kt @@ -0,0 +1,68 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.runner + +import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionContext +import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionResult +import kotlinx.coroutines.launch +import kotlinx.coroutines.supervisorScope + +/** + * An abstract [ExperimentDescriptor] specifically for containers. + */ +public abstract class ContainerExperimentDescriptor : ExperimentDescriptor() { + /** + * The child descriptors of this container. + */ + public abstract val children: Sequence<ExperimentDescriptor> + + override val type: Type = Type.CONTAINER + + override suspend fun invoke(context: ExperimentExecutionContext) { + val materializedChildren = children.toList() + for (child in materializedChildren) { + context.listener.descriptorRegistered(child) + } + + supervisorScope { + for (child in materializedChildren) { + if (child.isTrial) { + launch { + val worker = context.scheduler.allocate() + context.listener.executionStarted(child) + try { + worker(child, context) + context.listener.executionFinished(child, ExperimentExecutionResult.Success) + } catch (e: Throwable) { + context.listener.executionFinished(child, ExperimentExecutionResult.Failed(e)) + } + } + } else { + launch { child(context) } + } + } + } + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ExperimentDescriptor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ExperimentDescriptor.kt new file mode 100644 index 00000000..64b6b767 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ExperimentDescriptor.kt @@ -0,0 +1,81 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.runner + +import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionContext +import java.io.Serializable + +/** + * An immutable description of an experiment in the **odcsim* simulation framework, which may be a single atomic trial + * or a composition of multiple trials. + * + * This class represents a dynamic tree-like structure where the children of the nodes are not known at instantiation + * since they might be generated dynamically. + */ +public abstract class ExperimentDescriptor : Serializable { + /** + * The parent of this descriptor, or `null` if it has no parent. + */ + public abstract val parent: ExperimentDescriptor? + + /** + * The type of descriptor. + */ + abstract val type: Type + + /** + * A flag to indicate that this descriptor is a root descriptor. + */ + public open val isRoot: Boolean + get() = parent == null + + /** + * A flag to indicate that this descriptor describes an experiment trial. + */ + val isTrial: Boolean + get() = type == Type.TRIAL + + /** + * Execute this [ExperimentDescriptor]. + * + * @param context The context to execute the descriptor in. + */ + public abstract suspend operator fun invoke(context: ExperimentExecutionContext) + + /** + * The types of experiment descriptors. + */ + enum class Type { + /** + * A composition of multiple experiment descriptions whose invocation happens on a single thread. + */ + CONTAINER, + + /** + * An invocation of a single scenario of an experiment whose invocation may happen on different threads. + */ + TRIAL + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ExperimentRunner.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ExperimentRunner.kt new file mode 100644 index 00000000..77f970fe --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ExperimentRunner.kt @@ -0,0 +1,51 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.runner + +import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionListener + +/** + * An [ExperimentRunner] facilitates discovery and execution of experiments. + */ +public interface ExperimentRunner { + /** + * The unique identifier of this runner. + */ + val id: String + + /** + * The version of this runner. + */ + val version: String? + get() = null + + /** + * Execute the specified experiment represented as [ExperimentDescriptor]. + * + * @param root The experiment to execute. + * @param listener The listener to report events to. + */ + public fun execute(root: ExperimentDescriptor, listener: ExperimentExecutionListener) +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Run.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/TrialExperimentDescriptor.kt index b2151b16..cf05416a 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Run.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/TrialExperimentDescriptor.kt @@ -22,10 +22,11 @@ * SOFTWARE. */ -package com.atlarge.opendc.experiments.sc20 +package com.atlarge.opendc.experiments.sc20.runner /** - * An experiment run represent a single invocation of a trial and is used to distinguish between repetitions of the - * same set of parameters. + * An abstract [ExperimentDescriptor] specifically for trials. */ -public data class Run(val scenario: Scenario, val id: Int, val seed: Int) +public abstract class TrialExperimentDescriptor : ExperimentDescriptor() { + override val type: Type = Type.TRIAL +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionContext.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionContext.kt new file mode 100644 index 00000000..9a04c491 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionContext.kt @@ -0,0 +1,45 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.runner.execution + +/** + * The execution context of an experiment. + */ +public interface ExperimentExecutionContext { + /** + * The execution listener to use. + */ + public val listener: ExperimentExecutionListener + + /** + * The experiment scheduler to use. + */ + public val scheduler: ExperimentScheduler + + /** + * A cache for objects within a single runner. + */ + public val cache: MutableMap<Any?, Any?> +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionListener.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionListener.kt new file mode 100644 index 00000000..f6df0524 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionListener.kt @@ -0,0 +1,48 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.runner.execution + +import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor + +/** + * Listener to be notified of experiment execution events by experiment runners. + */ +interface ExperimentExecutionListener { + /** + * A method that is invoked when a new [ExperimentDescriptor] is registered. + */ + fun descriptorRegistered(descriptor: ExperimentDescriptor) + + /** + * A method that is invoked when when the execution of a leaf or subtree of the experiment tree has finished, + * regardless of the outcome. + */ + fun executionFinished(descriptor: ExperimentDescriptor, result: ExperimentExecutionResult) + + /** + * A method that is invoked when the execution of a leaf or subtree of the experiment tree is about to be started. + */ + fun executionStarted(descriptor: ExperimentDescriptor) +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporterProvider.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionResult.kt index 8f42cdd4..057e1f92 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporterProvider.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionResult.kt @@ -22,19 +22,21 @@ * SOFTWARE. */ -package com.atlarge.opendc.experiments.sc20.reporter +package com.atlarge.opendc.experiments.sc20.runner.execution -import java.io.Closeable -import javax.sql.DataSource +import java.io.Serializable -interface ExperimentReporterProvider : Closeable { +/** + * The result of executing an experiment. + */ +public sealed class ExperimentExecutionResult : Serializable { /** - * Initialize the provider with the specified data source. + * The experiment executed successfully */ - public fun init(ds: DataSource) {} + public object Success : ExperimentExecutionResult() /** - * Create a reporter for a single run. + * The experiment failed during execution. */ - public fun createReporter(scenario: Long, run: Int): ExperimentReporter + public data class Failed(val throwable: Throwable) : ExperimentExecutionResult() } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentScheduler.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentScheduler.kt new file mode 100644 index 00000000..0346a7f8 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentScheduler.kt @@ -0,0 +1,59 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.runner.execution + +import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor +import java.io.Closeable + +/** + * A interface for scheduling the execution of experiment trials over compute resources (threads/containers/vms) + */ +interface ExperimentScheduler : Closeable { + /** + * Allocate a [Worker] for executing an experiment trial. This method may suspend in case no resources are directly + * available at the moment. + * + * @return The available worker. + */ + suspend fun allocate(): ExperimentScheduler.Worker + + /** + * An isolated worker of an [ExperimentScheduler] that is responsible for executing a single experiment trial. + */ + interface Worker { + /** + * Dispatch the specified [ExperimentDescriptor] to execute some time in the future and return the results of + * the trial. + * + * @param descriptor The descriptor to execute. + * @param context The context to execute the descriptor in. + * @return The results of the experiment trial. + */ + suspend operator fun invoke( + descriptor: ExperimentDescriptor, + context: ExperimentExecutionContext + ): ExperimentExecutionResult + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt new file mode 100644 index 00000000..31632b8c --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt @@ -0,0 +1,85 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.runner.execution + +import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor +import kotlinx.coroutines.asCoroutineDispatcher +import kotlinx.coroutines.launch +import kotlinx.coroutines.supervisorScope +import kotlinx.coroutines.sync.Semaphore +import kotlinx.coroutines.withContext +import java.util.concurrent.Executors + +/** + * An [ExperimentScheduler] that runs experiments using a local thread pool. + * + * @param parallelism The maximum amount of parallel workers (default is the number of available processors). + */ +class ThreadPoolExperimentScheduler(parallelism: Int = Runtime.getRuntime().availableProcessors() + 1) : ExperimentScheduler { + private val dispatcher = Executors.newCachedThreadPool().asCoroutineDispatcher() + private val tickets = Semaphore(parallelism) + + override suspend fun allocate(): ExperimentScheduler.Worker { + tickets.acquire() + return object : ExperimentScheduler.Worker { + override suspend fun invoke( + descriptor: ExperimentDescriptor, + context: ExperimentExecutionContext + ): ExperimentExecutionResult = supervisorScope { + val listener = + object : ExperimentExecutionListener { + override fun descriptorRegistered(descriptor: ExperimentDescriptor) { + launch { context.listener.descriptorRegistered(descriptor) } + } + + override fun executionFinished(descriptor: ExperimentDescriptor, result: ExperimentExecutionResult) { + launch { context.listener.executionFinished(descriptor, result) } + } + + override fun executionStarted(descriptor: ExperimentDescriptor) { + launch { context.listener.executionStarted(descriptor) } + } + } + + val newContext = object : ExperimentExecutionContext by context { + override val listener: ExperimentExecutionListener = listener + } + + try { + withContext(dispatcher) { + descriptor(newContext) + ExperimentExecutionResult.Success + } + } catch (e: Throwable) { + ExperimentExecutionResult.Failed(e) + } finally { + tickets.release() + } + } + } + } + + override fun close() = dispatcher.close() +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt new file mode 100644 index 00000000..3b80276f --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt @@ -0,0 +1,62 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.runner.internal + +import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor +import com.atlarge.opendc.experiments.sc20.runner.ExperimentRunner +import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionContext +import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionListener +import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionResult +import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentScheduler +import kotlinx.coroutines.runBlocking +import java.util.concurrent.ConcurrentHashMap + +/** + * The default implementation of the [ExperimentRunner] interface. + * + * @param scheduler The scheduler to use. + */ +public class DefaultExperimentRunner(val scheduler: ExperimentScheduler) : ExperimentRunner { + override val id: String = "default" + + override val version: String? = "1.0" + + override fun execute(root: ExperimentDescriptor, listener: ExperimentExecutionListener) = runBlocking { + val context = object : ExperimentExecutionContext { + override val listener: ExperimentExecutionListener = listener + override val scheduler: ExperimentScheduler = this@DefaultExperimentRunner.scheduler + override val cache: MutableMap<Any?, Any?> = ConcurrentHashMap() + } + + listener.descriptorRegistered(root) + context.listener.executionStarted(root) + try { + root(context) + context.listener.executionFinished(root, ExperimentExecutionResult.Success) + } catch (e: Throwable) { + context.listener.executionFinished(root, ExperimentExecutionResult.Failed(e)) + } + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolio.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/Event.kt index 34505fce..c1e14e2a 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolio.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/Event.kt @@ -22,14 +22,14 @@ * SOFTWARE. */ -package com.atlarge.opendc.experiments.sc20 +package com.atlarge.opendc.experiments.sc20.telemetry /** - * A portfolio represents a collection of scenarios are tested. + * An event that occurs within the system. */ -public abstract class Portfolio(val name: String) { +public abstract class Event(val name: String) { /** - * The scenarios of this portfolio consists of. + * The time of occurrence of this event. */ - abstract val scenarios: Sequence<Scenario> + public abstract val timestamp: Long } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/HostMetrics.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/HostEvent.kt index 061f6cce..8e91bca2 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/HostMetrics.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/HostEvent.kt @@ -22,15 +22,15 @@ * SOFTWARE. */ -package com.atlarge.opendc.experiments.sc20.reporter +package com.atlarge.opendc.experiments.sc20.telemetry import com.atlarge.opendc.compute.core.Server /** * A periodic report of the host machine metrics. */ -data class HostMetrics( - val time: Long, +data class HostEvent( + override val timestamp: Long, val duration: Long, val host: Server, val vmCount: Int, @@ -41,4 +41,4 @@ data class HostMetrics( val cpuUsage: Double, val cpuDemand: Double, val powerDraw: Double -) +) : Event("host-metrics") diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ProvisionerMetrics.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/ProvisionerEvent.kt index 966662cd..df619632 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ProvisionerMetrics.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/ProvisionerEvent.kt @@ -22,13 +22,13 @@ * SOFTWARE. */ -package com.atlarge.opendc.experiments.sc20.reporter +package com.atlarge.opendc.experiments.sc20.telemetry /** * A periodic report of the provisioner's metrics. */ -data class ProvisionerMetrics( - val time: Long, +data class ProvisionerEvent( + override val timestamp: Long, val totalHostCount: Int, val availableHostCount: Int, val totalVmCount: Int, @@ -36,4 +36,4 @@ data class ProvisionerMetrics( val inactiveVmCount: Int, val waitingVmCount: Int, val failedVmCount: Int -) +) : Event("provisioner-metrics") diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/RunEvent.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/RunEvent.kt new file mode 100644 index 00000000..497d2c3f --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/RunEvent.kt @@ -0,0 +1,35 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.telemetry + +import com.atlarge.opendc.experiments.sc20.experiment.Run + +/** + * A periodic report of the host machine metrics. + */ +data class RunEvent( + val run: Run, + override val timestamp: Long +) : Event("run") diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/VmMetrics.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/VmEvent.kt index 5f963206..7289fb21 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/VmMetrics.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/VmEvent.kt @@ -22,15 +22,15 @@ * SOFTWARE. */ -package com.atlarge.opendc.experiments.sc20.reporter +package com.atlarge.opendc.experiments.sc20.telemetry import com.atlarge.opendc.compute.core.Server /** * A periodic report of a virtual machine's metrics. */ -data class VmMetrics( - val time: Long, +data class VmEvent( + override val timestamp: Long, val duration: Long, val vm: Server, val host: Server, @@ -40,4 +40,4 @@ data class VmMetrics( val interferedBurst: Long, val cpuUsage: Double, val cpuDemand: Double -) +) : Event("vm-metrics") diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetMetricsWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt index e82e9e47..a69bd4b2 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetMetricsWriter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt @@ -22,8 +22,9 @@ * SOFTWARE. */ -package com.atlarge.opendc.experiments.sc20.reporter +package com.atlarge.opendc.experiments.sc20.telemetry.parquet +import com.atlarge.opendc.experiments.sc20.telemetry.Event import mu.KotlinLogging import org.apache.avro.Schema import org.apache.avro.generic.GenericData @@ -36,24 +37,35 @@ import java.util.concurrent.ArrayBlockingQueue import java.util.concurrent.BlockingQueue import kotlin.concurrent.thread +/** + * The logging instance to use. + */ private val logger = KotlinLogging.logger {} -public abstract class ParquetMetricsWriter<T>( +/** + * A writer that writes events in Parquet format. + */ +public open class ParquetEventWriter<in T : Event>( private val path: File, private val schema: Schema, + private val converter: (T, GenericData.Record) -> Unit, private val bufferSize: Int = 4096 ) : Runnable, Closeable { /** * The queue of commands to process. */ private val queue: BlockingQueue<Action> = ArrayBlockingQueue(bufferSize) + + /** + * The thread that is responsible for writing the Parquet records. + */ private val writerThread = thread(start = true, name = "parquet-writer") { run() } /** * Write the specified metrics to the database. */ - public fun write(scenario: Long, run: Int, metrics: T) { - queue.put(Action.Write(scenario, run, metrics)) + public fun write(event: T) { + queue.put(Action.Write(event)) } /** @@ -65,11 +77,6 @@ public abstract class ParquetMetricsWriter<T>( } /** - * Persist the specified metrics to the given [row]. - */ - public abstract fun persist(action: Action.Write<T>, row: GenericData.Record) - - /** * Start the writer thread. */ override fun run() { @@ -88,7 +95,7 @@ public abstract class ParquetMetricsWriter<T>( is Action.Write<*> -> { val record = GenericData.Record(schema) @Suppress("UNCHECKED_CAST") - persist(action as Action.Write<T>, record) + converter(action.event as T, record) writer.write(record) } } @@ -109,6 +116,6 @@ public abstract class ParquetMetricsWriter<T>( /** * Write the specified metrics to the database. */ - data class Write<out T>(val scenario: Long, val run: Int, val metrics: T) : Action() + data class Write<out T : Event>(val event: T) : Action() } } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt new file mode 100644 index 00000000..7e5ad911 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt @@ -0,0 +1,81 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.telemetry.parquet + +import com.atlarge.opendc.experiments.sc20.telemetry.HostEvent +import org.apache.avro.Schema +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericData +import java.io.File + +/** + * A Parquet event writer for [HostEvent]s. + */ +public class ParquetHostEventWriter(path: File, bufferSize: Int) : + ParquetEventWriter<HostEvent>(path, schema, convert, bufferSize) { + + override fun toString(): String = "host-writer" + + companion object { + val convert: (HostEvent, GenericData.Record) -> Unit = { event, record -> + // record.put("portfolio_id", event.run.parent.parent.id) + // record.put("scenario_id", event.run.parent.id) + // record.put("run_id", event.run.id) + record.put("host_id", event.host.name) + record.put("state", event.host.state.name) + record.put("timestamp", event.timestamp) + record.put("duration", event.duration) + record.put("vm_count", event.vmCount) + record.put("requested_burst", event.requestedBurst) + record.put("granted_burst", event.grantedBurst) + record.put("overcommissioned_burst", event.overcommissionedBurst) + record.put("interfered_burst", event.interferedBurst) + record.put("cpu_usage", event.cpuUsage) + record.put("cpu_demand", event.cpuDemand) + record.put("power_draw", event.powerDraw) + } + + val schema: Schema = SchemaBuilder + .record("host_metrics") + .namespace("com.atlarge.opendc.experiments.sc20") + .fields() + // .name("portfolio_id").type().intType().noDefault() + // .name("scenario_id").type().intType().noDefault() + // .name("run_id").type().intType().noDefault() + .name("timestamp").type().longType().noDefault() + .name("duration").type().longType().noDefault() + .name("host_id").type().stringType().noDefault() + .name("state").type().stringType().noDefault() + .name("vm_count").type().intType().noDefault() + .name("requested_burst").type().longType().noDefault() + .name("granted_burst").type().longType().noDefault() + .name("overcommissioned_burst").type().longType().noDefault() + .name("interfered_burst").type().longType().noDefault() + .name("cpu_usage").type().doubleType().noDefault() + .name("cpu_demand").type().doubleType().noDefault() + .name("power_draw").type().doubleType().noDefault() + .endRecord() + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt new file mode 100644 index 00000000..1f3b0472 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt @@ -0,0 +1,67 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.telemetry.parquet + +import com.atlarge.opendc.experiments.sc20.telemetry.ProvisionerEvent +import org.apache.avro.Schema +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericData +import java.io.File + +/** + * A Parquet event writer for [ProvisionerEvent]s. + */ +public class ParquetProvisionerEventWriter(path: File, bufferSize: Int) : + ParquetEventWriter<ProvisionerEvent>(path, schema, convert, bufferSize) { + + override fun toString(): String = "provisioner-writer" + + companion object { + val convert: (ProvisionerEvent, GenericData.Record) -> Unit = { event, record -> + record.put("timestamp", event.timestamp) + record.put("host_total_count", event.totalHostCount) + record.put("host_available_count", event.availableHostCount) + record.put("vm_total_count", event.totalVmCount) + record.put("vm_active_count", event.activeVmCount) + record.put("vm_inactive_count", event.inactiveVmCount) + record.put("vm_waiting_count", event.waitingVmCount) + record.put("vm_failed_count", event.failedVmCount) + } + + val schema: Schema = SchemaBuilder + .record("provisioner_metrics") + .namespace("com.atlarge.opendc.experiments.sc20") + .fields() + .name("timestamp").type().longType().noDefault() + .name("host_total_count").type().intType().noDefault() + .name("host_available_count").type().intType().noDefault() + .name("vm_total_count").type().intType().noDefault() + .name("vm_active_count").type().intType().noDefault() + .name("vm_inactive_count").type().intType().noDefault() + .name("vm_waiting_count").type().intType().noDefault() + .name("vm_failed_count").type().intType().noDefault() + .endRecord() + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt new file mode 100644 index 00000000..1549b8d2 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt @@ -0,0 +1,78 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.telemetry.parquet + +import com.atlarge.opendc.experiments.sc20.telemetry.RunEvent +import org.apache.avro.Schema +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericData +import java.io.File + +/** + * A Parquet event writer for [RunEvent]s. + */ +public class ParquetRunEventWriter(path: File, bufferSize: Int) : + ParquetEventWriter<RunEvent>(path, schema, convert, bufferSize) { + + override fun toString(): String = "run-writer" + + companion object { + val convert: (RunEvent, GenericData.Record) -> Unit = { event, record -> + val run = event.run + val scenario = run.parent + val portfolio = scenario.parent + record.put("portfolio_id", portfolio.id) + record.put("portfolio_name", portfolio.name) + record.put("scenario_id", scenario.id) + record.put("run_id", run.id) + record.put("repetitions", scenario.repetitions) + record.put("topology", scenario.topology.name) + record.put("workload_name", scenario.workload.name) + record.put("workload_fraction", scenario.workload.fraction) + record.put("allocation_policy", scenario.allocationPolicy) + record.put("failure_frequency", scenario.operationalPhenomena.failureFrequency) + record.put("interference", scenario.operationalPhenomena.hasInterference) + record.put("seed", run.seed) + } + + val schema: Schema = SchemaBuilder + .record("runs") + .namespace("com.atlarge.opendc.experiments.sc20") + .fields() + .name("portfolio_id").type().intType().noDefault() + .name("portfolio_name").type().stringType().noDefault() + .name("scenario_id").type().intType().noDefault() + .name("run_id").type().intType().noDefault() + .name("repetitions").type().intType().noDefault() + .name("topology").type().stringType().noDefault() + .name("workload_name").type().stringType().noDefault() + .name("workload_fraction").type().doubleType().noDefault() + .name("allocation_policy").type().stringType().noDefault() + .name("failure_frequency").type().doubleType().noDefault() + .name("interference").type().booleanType().noDefault() + .name("seed").type().intType().noDefault() + .endRecord() + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt index 28026fde..96b6b426 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt @@ -28,8 +28,7 @@ import com.atlarge.opendc.compute.core.image.VmImage import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel import com.atlarge.opendc.compute.core.workload.VmWorkload -import com.atlarge.opendc.experiments.sc20.Run -import com.atlarge.opendc.experiments.sc20.sampleWorkload +import com.atlarge.opendc.experiments.sc20.experiment.Run import com.atlarge.opendc.format.trace.TraceEntry import com.atlarge.opendc.format.trace.TraceReader import kotlin.random.Random diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/WorkloadSampler.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt index 99634e1b..e03c59bc 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/WorkloadSampler.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt @@ -22,9 +22,10 @@ * SOFTWARE. */ -package com.atlarge.opendc.experiments.sc20 +package com.atlarge.opendc.experiments.sc20.trace import com.atlarge.opendc.compute.core.workload.VmWorkload +import com.atlarge.opendc.experiments.sc20.experiment.Run import com.atlarge.opendc.format.trace.TraceEntry import mu.KotlinLogging import kotlin.random.Random @@ -42,7 +43,7 @@ fun sampleWorkload(trace: List<TraceEntry<VmWorkload>>, run: Run): List<TraceEnt * Sample a regular (non-HPC) workload. */ fun sampleRegularWorkload(trace: List<TraceEntry<VmWorkload>>, run: Run): List<TraceEntry<VmWorkload>> { - val fraction = run.scenario.workload.fraction + val fraction = run.parent.workload.fraction if (fraction >= 1) { return trace } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/util/DatabaseHelper.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/util/DatabaseHelper.kt deleted file mode 100644 index 4fcfdb6b..00000000 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/util/DatabaseHelper.kt +++ /dev/null @@ -1,160 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.experiments.sc20.util - -import com.atlarge.opendc.experiments.sc20.Portfolio -import com.atlarge.opendc.experiments.sc20.Run -import com.atlarge.opendc.experiments.sc20.Scenario -import java.io.Closeable -import java.sql.Connection -import java.sql.Statement - -/** - * A helper class for writing to the database. - */ -class DatabaseHelper(val conn: Connection) : Closeable { - /** - * Prepared statement to create experiment. - */ - private val createExperiment = conn.prepareStatement("INSERT INTO experiments DEFAULT VALUES", arrayOf("id")) - - /** - * Prepared statement for creating a portfolio. - */ - private val createPortfolio = conn.prepareStatement("INSERT INTO portfolios (experiment_id, name) VALUES (?, ?)", arrayOf("id")) - - /** - * Prepared statement for creating a scenario - */ - private val createScenario = conn.prepareStatement("INSERT INTO scenarios (portfolio_id, repetitions, topology, workload_name, workload_fraction, allocation_policy, failure_frequency, interference) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", arrayOf("id")) - - /** - * Prepared statement for creating a run. - */ - private val createRun = conn.prepareStatement("INSERT INTO runs (id, scenario_id, seed) VALUES (?, ?, ?)", arrayOf("id")) - - /** - * Prepared statement for starting a run. - */ - private val startRun = conn.prepareStatement("UPDATE runs SET state = 'active'::run_state,start_time = now() WHERE id = ? AND scenario_id = ?") - - /** - * Prepared statement for finishing a run. - */ - private val finishRun = conn.prepareStatement("UPDATE runs SET state = ?::run_state,end_time = now() WHERE id = ? AND scenario_id = ?") - - /** - * Create a new experiment and return its id. - */ - fun createExperiment(): Long { - val affectedRows = createExperiment.executeUpdate() - check(affectedRows != 0) - return createExperiment.latestId - } - - /** - * Persist a [Portfolio] and return its id. - */ - fun persist(portfolio: Portfolio, experimentId: Long): Long { - createPortfolio.setLong(1, experimentId) - createPortfolio.setString(2, portfolio.name) - - val affectedRows = createPortfolio.executeUpdate() - check(affectedRows != 0) - return createPortfolio.latestId - } - - /** - * Persist a [Scenario] and return its id. - */ - fun persist(scenario: Scenario, portfolioId: Long): Long { - createScenario.setLong(1, portfolioId) - createScenario.setInt(2, scenario.repetitions) - createScenario.setString(3, scenario.topology.name) - createScenario.setString(4, scenario.workload.name) - createScenario.setDouble(5, scenario.workload.fraction) - createScenario.setString(6, scenario.allocationPolicy) - createScenario.setDouble(7, scenario.failureFrequency) - createScenario.setBoolean(8, scenario.hasInterference) - - val affectedRows = createScenario.executeUpdate() - check(affectedRows != 0) - return createScenario.latestId - } - - /** - * Persist a [Run] and return its id. - */ - fun persist(run: Run, scenarioId: Long): Int { - createRun.setInt(1, run.id) - createRun.setLong(2, scenarioId) - createRun.setInt(3, run.seed) - - val affectedRows = createRun.executeUpdate() - check(affectedRows != 0) - return createRun.latestId.toInt() - } - - /** - * Start run. - */ - fun startRun(scenario: Long, run: Int) { - startRun.setInt(1, run) - startRun.setLong(2, scenario) - - val affectedRows = startRun.executeUpdate() - check(affectedRows != 0) - } - - /** - * Finish a run. - */ - fun finishRun(scenario: Long, run: Int, hasFailed: Boolean) { - finishRun.setString(1, if (hasFailed) "fail" else "ok") - finishRun.setInt(2, run) - finishRun.setLong(3, scenario) - - val affectedRows = finishRun.executeUpdate() - check(affectedRows != 0) - } - - /** - * Obtain the latest identifier of the specified statement. - */ - private val Statement.latestId: Long - get() { - val rs = generatedKeys - return try { - check(rs.next()) - rs.getLong(1) - } finally { - rs.close() - } - } - - override fun close() { - conn.close() - } -} diff --git a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt index fd617115..ae3d6db1 100644 --- a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt +++ b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt @@ -31,7 +31,12 @@ import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.workload.VmWorkload import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy -import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporter +import com.atlarge.opendc.experiments.sc20.experiment.attachMonitor +import com.atlarge.opendc.experiments.sc20.experiment.createFailureDomain +import com.atlarge.opendc.experiments.sc20.experiment.createProvisioner +import com.atlarge.opendc.experiments.sc20.experiment.createTraceReader +import com.atlarge.opendc.experiments.sc20.experiment.monitor.ExperimentMonitor +import com.atlarge.opendc.experiments.sc20.experiment.processTrace import com.atlarge.opendc.format.environment.EnvironmentReader import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader import com.atlarge.opendc.format.trace.TraceReader @@ -96,19 +101,33 @@ class Sc20IntegrationTest { lateinit var scheduler: SimpleVirtProvisioningService root.launch { - val res = createProvisioner(root, environmentReader, allocationPolicy) + val res = createProvisioner( + root, + environmentReader, + allocationPolicy + ) val bareMetalProvisioner = res.first scheduler = res.second val failureDomain = if (failures) { println("ENABLING failures") - createFailureDomain(seed, 24.0 * 7, bareMetalProvisioner, chan) + createFailureDomain( + seed, + 24.0 * 7, + bareMetalProvisioner, + chan + ) } else { null } attachMonitor(scheduler, monitor) - processTrace(traceReader, scheduler, chan, monitor) + processTrace( + traceReader, + scheduler, + chan, + monitor + ) println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") @@ -141,7 +160,12 @@ class Sc20IntegrationTest { val performanceInterferenceStream = object {}.javaClass.getResourceAsStream("/env/performance-interference.json") val performanceInterferenceModel = Sc20PerformanceInterferenceReader(performanceInterferenceStream) .construct() - return createTraceReader(File("src/test/resources/trace"), performanceInterferenceModel, emptyList(), 0) + return createTraceReader( + File("src/test/resources/trace"), + performanceInterferenceModel, + emptyList(), + 0 + ) } /** @@ -152,7 +176,7 @@ class Sc20IntegrationTest { return Sc20ClusterEnvironmentReader(stream) } - class TestExperimentReporter : ExperimentReporter { + class TestExperimentReporter : ExperimentMonitor { var totalRequestedBurst = 0L var totalGrantedBurst = 0L var totalOvercommissionedBurst = 0L |
