summaryrefslogtreecommitdiff
path: root/opendc
diff options
context:
space:
mode:
Diffstat (limited to 'opendc')
-rw-r--r--opendc/opendc-experiments-sc20/build.gradle.kts13
-rw-r--r--opendc/opendc-experiments-sc20/schema.sql141
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt216
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt (renamed from opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt)136
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Experiment.kt78
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt (renamed from opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt)28
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolio.kt90
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt (renamed from opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt)67
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt (renamed from opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Scenario.kt)93
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Scenario.kt48
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/OperationalPhenomena.kt33
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Topology.kt (renamed from opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Topology.kt)2
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Workload.kt (renamed from opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Workload.kt)2
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ExperimentMonitor.kt (renamed from opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporter.kt)6
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt (renamed from opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetExperimentReporter.kt)37
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt75
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetHostMetricsWriter.kt73
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetProvisionerMetricsWriter.kt65
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt128
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresHostMetricsWriter.kt75
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt124
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresProvisionerMetricsWriter.kt67
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ContainerExperimentDescriptor.kt68
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ExperimentDescriptor.kt81
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ExperimentRunner.kt51
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/TrialExperimentDescriptor.kt (renamed from opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Run.kt)9
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionContext.kt45
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionListener.kt48
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionResult.kt (renamed from opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporterProvider.kt)18
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentScheduler.kt59
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt85
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt62
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/Event.kt (renamed from opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolio.kt)10
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/HostEvent.kt (renamed from opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/HostMetrics.kt)8
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/ProvisionerEvent.kt (renamed from opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ProvisionerMetrics.kt)8
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/RunEvent.kt35
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/VmEvent.kt (renamed from opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/VmMetrics.kt)8
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt (renamed from opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetMetricsWriter.kt)29
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt81
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt67
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt78
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt3
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt (renamed from opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/WorkloadSampler.kt)5
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/util/DatabaseHelper.kt160
-rw-r--r--opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt36
45 files changed, 1331 insertions, 1320 deletions
diff --git a/opendc/opendc-experiments-sc20/build.gradle.kts b/opendc/opendc-experiments-sc20/build.gradle.kts
index 2ba07554..46d99564 100644
--- a/opendc/opendc-experiments-sc20/build.gradle.kts
+++ b/opendc/opendc-experiments-sc20/build.gradle.kts
@@ -31,27 +31,26 @@ plugins {
}
application {
- mainClassName = "com.atlarge.opendc.experiments.sc20.ExperimentRunnerCliKt"
- applicationDefaultJvmArgs = listOf("-Xmx2500M", "-Xms1800M")
+ mainClassName = "com.atlarge.opendc.experiments.sc20.MainKt"
+ applicationDefaultJvmArgs = listOf("-Xms2500M")
}
dependencies {
api(project(":opendc:opendc-core"))
implementation(project(":opendc:opendc-format"))
implementation(kotlin("stdlib"))
+
implementation("com.github.ajalt:clikt:2.6.0")
+ implementation("me.tongfei:progressbar:0.8.1")
implementation("io.github.microutils:kotlin-logging:1.7.9")
+
implementation("org.apache.parquet:parquet-avro:1.11.0")
implementation("org.apache.hadoop:hadoop-client:3.2.1") {
exclude(group = "org.slf4j", module = "slf4j-log4j12")
exclude(group = "log4j")
}
- implementation("com.zaxxer:HikariCP:3.4.5")
- implementation("de.bytefish.pgbulkinsert:pgbulkinsert-core:5.1.0")
- implementation("de.bytefish.pgbulkinsert:pgbulkinsert-rowwriter:5.1.0")
- implementation("me.tongfei:progressbar:0.8.1")
+
runtimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:2.13.1")
- runtimeOnly("org.postgresql:postgresql:42.2.12")
runtimeOnly(project(":odcsim:odcsim-engine-omega"))
testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}")
diff --git a/opendc/opendc-experiments-sc20/schema.sql b/opendc/opendc-experiments-sc20/schema.sql
deleted file mode 100644
index 92cb5d1f..00000000
--- a/opendc/opendc-experiments-sc20/schema.sql
+++ /dev/null
@@ -1,141 +0,0 @@
--- An experiment represents a collection of portfolios.
-DROP TABLE IF EXISTS experiments CASCADE;
-CREATE TABLE experiments
-(
- id BIGSERIAL PRIMARY KEY NOT NULL,
- creation_time TIMESTAMP NOT NULL DEFAULT (now())
-);
-
--- A portfolio represents a collection of scenarios tested.
-DROP TABLE IF EXISTS portfolios CASCADE;
-CREATE TABLE portfolios
-(
- id BIGSERIAL PRIMARY KEY NOT NULL,
- experiment_id BIGINT NOT NULL,
- name TEXT NOT NULL,
-
- FOREIGN KEY (experiment_id) REFERENCES experiments (id)
- ON DELETE CASCADE
- ON UPDATE CASCADE
-);
-
--- A scenario represents a single point in the design space (a unique combination of parameters)
-DROP TABLE IF EXISTS scenarios CASCADE;
-CREATE TABLE scenarios
-(
- id BIGSERIAL PRIMARY KEY NOT NULL,
- portfolio_id BIGINT NOT NULL,
- repetitions INTEGER NOT NULL,
- topology TEXT NOT NULL,
- workload_name TEXT NOT NULL,
- workload_fraction DOUBLE PRECISION NOT NULL,
- allocation_policy TEXT NOT NULL,
- failure_frequency DOUBLE PRECISION NOT NULL,
- interference BOOLEAN NOT NULL,
-
- FOREIGN KEY (portfolio_id) REFERENCES portfolios (id)
- ON DELETE CASCADE
- ON UPDATE CASCADE
-
-);
-
-DROP TYPE IF EXISTS run_state CASCADE;
-CREATE TYPE run_state AS ENUM ('wait', 'active', 'fail', 'ok');
-
--- An experiment run represent a single invocation of a trial and is used to distinguish between repetitions of the
--- same set of parameters.
-DROP TABLE IF EXISTS runs CASCADE;
-CREATE TABLE runs
-(
- id INTEGER NOT NULL,
- scenario_id BIGINT NOT NULL,
- seed INTEGER NOT NULL,
- state run_state NOT NULL DEFAULT 'wait'::run_state,
- submit_time TIMESTAMP NOT NULL DEFAULT (now()),
- start_time TIMESTAMP DEFAULT (NULL),
- end_time TIMESTAMP DEFAULT (NULL),
-
- PRIMARY KEY (scenario_id, id),
- FOREIGN KEY (scenario_id) REFERENCES scenarios (id)
- ON DELETE CASCADE
- ON UPDATE CASCADE
-);
-
--- Metrics of the hypervisors reported per slice
-DROP TABLE IF EXISTS host_metrics CASCADE;
-CREATE TABLE host_metrics
-(
- id BIGSERIAL PRIMARY KEY NOT NULL,
- scenario_id BIGINT NOT NULL,
- run_id INTEGER NOT NULL,
- host_id TEXT NOT NULL,
- state TEXT NOT NULL,
- timestamp TIMESTAMP NOT NULL,
- duration BIGINT NOT NULL,
- vm_count INTEGER NOT NULL,
- requested_burst BIGINT NOT NULL,
- granted_burst BIGINT NOT NULL,
- overcommissioned_burst BIGINT NOT NULL,
- interfered_burst BIGINT NOT NULL,
- cpu_usage DOUBLE PRECISION NOT NULL,
- cpu_demand DOUBLE PRECISION NOT NULL,
- power_draw DOUBLE PRECISION NOT NULL,
-
- FOREIGN KEY (scenario_id, run_id) REFERENCES runs (scenario_id, id)
- ON DELETE CASCADE
- ON UPDATE CASCADE
-);
-
-DROP INDEX IF EXISTS host_metrics_idx;
-CREATE INDEX host_metrics_idx ON host_metrics (scenario_id, run_id, timestamp, host_id);
-
--- Metrics of the VMs reported per slice
-DROP TABLE IF EXISTS vm_metrics CASCADE;
-CREATE TABLE vm_metrics
-(
- id BIGSERIAL PRIMARY KEY NOT NULL,
- scenario_id BIGINT NOT NULL,
- run_id INTEGER NOT NULL,
- vm_id TEXT NOT NULL,
- host_id TEXT NOT NULL,
- state TEXT NOT NULL,
- timestamp TIMESTAMP NOT NULL,
- duration BIGINT NOT NULL,
- requested_burst BIGINT NOT NULL,
- granted_burst BIGINT NOT NULL,
- overcommissioned_burst BIGINT NOT NULL,
- interfered_burst BIGINT NOT NULL,
- cpu_usage DOUBLE PRECISION NOT NULL,
- cpu_demand DOUBLE PRECISION NOT NULL,
-
- FOREIGN KEY (scenario_id, run_id) REFERENCES runs (scenario_id, id)
- ON DELETE CASCADE
- ON UPDATE CASCADE
-);
-
-DROP INDEX IF EXISTS vm_metrics_idx;
-CREATE INDEX vm_metrics_idx ON vm_metrics (scenario_id, run_id, timestamp, vm_id);
-
--- Metrics of the provisioner reported per change
-DROP TABLE IF EXISTS provisioner_metrics CASCADE;
-CREATE TABLE provisioner_metrics
-(
- id BIGSERIAL PRIMARY KEY NOT NULL,
- scenario_id BIGINT NOT NULL,
- run_id INTEGER NOT NULL,
- timestamp TIMESTAMP NOT NULL,
- host_total_count INTEGER NOT NULL,
- host_available_count INTEGER NOT NULL,
- vm_total_count INTEGER NOT NULL,
- vm_active_count INTEGER NOT NULL,
- vm_inactive_count INTEGER NOT NULL,
- vm_waiting_count INTEGER NOT NULL,
- vm_failed_count INTEGER NOT NULL,
-
- FOREIGN KEY (scenario_id, run_id) REFERENCES runs (scenario_id, id)
- ON DELETE CASCADE
- ON UPDATE CASCADE
-);
-
-DROP INDEX IF EXISTS provisioner_metrics_idx;
-CREATE INDEX provisioner_metrics_idx ON provisioner_metrics (scenario_id, run_id, timestamp);
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt
deleted file mode 100644
index e6fd504e..00000000
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2020 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package com.atlarge.opendc.experiments.sc20
-
-import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
-import com.atlarge.opendc.compute.core.workload.VmWorkload
-import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporterProvider
-import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader
-import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader
-import com.atlarge.opendc.experiments.sc20.util.DatabaseHelper
-import com.atlarge.opendc.format.environment.EnvironmentReader
-import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
-import com.atlarge.opendc.format.trace.TraceReader
-import me.tongfei.progressbar.ProgressBar
-import mu.KotlinLogging
-import java.io.Closeable
-import java.io.File
-import java.util.concurrent.ExecutorCompletionService
-import java.util.concurrent.Executors
-import javax.sql.DataSource
-
-/**
- * The logger for the experiment runner.
- */
-private val logger = KotlinLogging.logger {}
-
-/**
- * The experiment runner is responsible for orchestrating the simulation runs of an experiment.
- *
- * @param portfolios The portfolios to consider.
- * @param ds The data source to write the experimental results to.
- */
-public class ExperimentRunner(
- private val portfolios: List<Portfolio>,
- private val ds: DataSource,
- private val reporterProvider: ExperimentReporterProvider,
- private val environmentPath: File,
- private val tracePath: File,
- private val performanceInterferenceModel: PerformanceInterferenceModel?,
- private val parallelism: Int = Runtime.getRuntime().availableProcessors()
-) : Closeable {
- /**
- * The database helper to write the execution plan.
- */
- private val helper = DatabaseHelper(ds.connection)
-
- /**
- * The experiment identifier.
- */
- private var experimentId = -1L
-
- /**
- * The mapping of portfolios to their ids.
- */
- private val portfolioIds = mutableMapOf<Portfolio, Long>()
-
- /**
- * The mapping of scenarios to their ids.
- */
- private val scenarioIds = mutableMapOf<Scenario, Long>()
-
- init {
- reporterProvider.init(ds)
- }
-
- /**
- * Create an execution plan
- */
- private fun createPlan(): List<Run> {
- val runs = mutableListOf<Run>()
-
- for (portfolio in portfolios) {
- val portfolioId = helper.persist(portfolio, experimentId)
- portfolioIds[portfolio] = portfolioId
- var scenarios = 0
- var runCount = 0
-
- for (scenario in portfolio.scenarios) {
- val scenarioId = helper.persist(scenario, portfolioId)
- scenarioIds[scenario] = scenarioId
- scenarios++
-
- for (run in scenario.runs) {
- helper.persist(run, scenarioId)
- runCount++
- runs.add(run)
- }
- }
-
- logger.info { "Portfolio $portfolioId: ${portfolio.name} ($scenarios scenarios, $runCount runs total)" }
- }
-
- return runs
- }
-
- /**
- * The raw parquet trace readers that are shared across simulations.
- */
- private val rawTraceReaders = mutableMapOf<String, Sc20RawParquetTraceReader>()
-
- /**
- * Create a trace reader for the specified trace.
- */
- private fun createTraceReader(
- name: String,
- performanceInterferenceModel: PerformanceInterferenceModel?,
- run: Run
- ): TraceReader<VmWorkload> {
- val raw = rawTraceReaders.getValue(name)
- return Sc20ParquetTraceReader(
- raw,
- performanceInterferenceModel,
- run
- )
- }
-
- /**
- * Create the environment reader for the specified environment.
- */
- private fun createEnvironmentReader(name: String): EnvironmentReader {
- return Sc20ClusterEnvironmentReader(File(environmentPath, "$name.txt"))
- }
-
- /**
- * Run the portfolios.
- */
- @OptIn(ExperimentalStdlibApi::class)
- public fun run() {
- experimentId = helper.createExperiment()
- logger.info { "Creating execution plan for experiment $experimentId" }
-
- val plan = createPlan()
- val total = plan.size
- val completionService = ExecutorCompletionService<Unit>(Executors.newCachedThreadPool())
- val pb = ProgressBar("Experiment", total.toLong())
-
- var running = 0
-
- for (run in plan) {
- if (running >= parallelism) {
- completionService.take()
- running--
- }
-
- val scenarioId = scenarioIds[run.scenario]!!
-
- rawTraceReaders.computeIfAbsent(run.scenario.workload.name) { name ->
- logger.info { "Loading trace $name" }
- Sc20RawParquetTraceReader(File(tracePath, name))
- }
-
- completionService.submit {
- pb.extraMessage = "($scenarioId, ${run.id}) START"
-
- var hasFailed = false
- synchronized(helper) {
- helper.startRun(scenarioId, run.id)
- }
-
- try {
- val reporter = reporterProvider.createReporter(scenarioIds[run.scenario]!!, run.id)
- val traceReader =
- createTraceReader(run.scenario.workload.name, performanceInterferenceModel, run)
- val environmentReader = createEnvironmentReader(run.scenario.topology.name)
-
- try {
- run.scenario(run, reporter, environmentReader, traceReader)
- } finally {
- reporter.close()
- }
-
- pb.extraMessage = "($scenarioId, ${run.id}) OK"
- } catch (e: Throwable) {
- logger.error("A run has failed", e)
- hasFailed = true
- pb.extraMessage = "($scenarioId, ${run.id}) FAIL"
- } finally {
- synchronized(helper) {
- helper.finishRun(scenarioId, run.id, hasFailed = hasFailed)
- }
-
- pb.step()
- }
- }
-
- running++
- }
- }
-
- override fun close() {
- reporterProvider.close()
- helper.close()
- }
-}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt
index 631c1085..e17a145c 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt
@@ -24,20 +24,19 @@
package com.atlarge.opendc.experiments.sc20
-import com.atlarge.opendc.experiments.sc20.reporter.ExperimentParquetReporter
-import com.atlarge.opendc.experiments.sc20.reporter.ExperimentPostgresReporter
-import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporter
-import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporterProvider
-import com.atlarge.opendc.experiments.sc20.reporter.ParquetHostMetricsWriter
-import com.atlarge.opendc.experiments.sc20.reporter.ParquetProvisionerMetricsWriter
-import com.atlarge.opendc.experiments.sc20.reporter.PostgresHostMetricsWriter
-import com.atlarge.opendc.experiments.sc20.reporter.PostgresProvisionerMetricsWriter
+import com.atlarge.opendc.experiments.sc20.experiment.Experiment
+import com.atlarge.opendc.experiments.sc20.experiment.HorVerPortfolio
+import com.atlarge.opendc.experiments.sc20.experiment.MoreHpcPortfolio
+import com.atlarge.opendc.experiments.sc20.experiment.MoreVelocityPortfolio
+import com.atlarge.opendc.experiments.sc20.experiment.OperationalPhenomenaPortfolio
+import com.atlarge.opendc.experiments.sc20.experiment.Portfolio
+import com.atlarge.opendc.experiments.sc20.reporter.ConsoleExperimentReporter
+import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor
+import com.atlarge.opendc.experiments.sc20.runner.execution.ThreadPoolExperimentScheduler
+import com.atlarge.opendc.experiments.sc20.runner.internal.DefaultExperimentRunner
import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader
import com.atlarge.opendc.format.trace.sc20.Sc20VmPlacementReader
import com.github.ajalt.clikt.core.CliktCommand
-import com.github.ajalt.clikt.parameters.groups.OptionGroup
-import com.github.ajalt.clikt.parameters.groups.groupChoice
-import com.github.ajalt.clikt.parameters.groups.required
import com.github.ajalt.clikt.parameters.options.convert
import com.github.ajalt.clikt.parameters.options.default
import com.github.ajalt.clikt.parameters.options.defaultLazy
@@ -47,11 +46,9 @@ import com.github.ajalt.clikt.parameters.options.required
import com.github.ajalt.clikt.parameters.types.choice
import com.github.ajalt.clikt.parameters.types.file
import com.github.ajalt.clikt.parameters.types.int
-import com.zaxxer.hikari.HikariDataSource
import mu.KotlinLogging
import java.io.File
import java.io.InputStream
-import javax.sql.DataSource
/**
* The logger for this experiment.
@@ -63,11 +60,6 @@ private val logger = KotlinLogging.logger {}
*/
class ExperimentCli : CliktCommand(name = "sc20-experiment") {
/**
- * The JDBC connection url to use.
- */
- private val jdbcUrl by option("--jdbc-url", help = "JDBC connection url").required()
-
- /**
* The path to the directory where the topology descriptions are located.
*/
private val environmentPath by option("--environment-path", help = "path to the environment directory")
@@ -99,22 +91,14 @@ class ExperimentCli : CliktCommand(name = "sc20-experiment") {
.default(emptyMap())
/**
- * The type of reporter to use.
- */
- private val reporter by option().groupChoice(
- "parquet" to Reporter.Parquet(),
- "postgres" to Reporter.Postgres()
- ).required()
-
- /**
* The selected portfolios to run.
*/
private val portfolios by option("--portfolio")
.choice(
- "hor-ver" to HorVerPortfolio,
- "more-velocity" to MoreVelocityPortfolio,
- "more-hpc" to MoreHpcPortfolio,
- "operational-phenomena" to OperationalPhenomenaPortfolio,
+ "hor-ver" to { experiment: Experiment, i: Int -> HorVerPortfolio(experiment, i) } as (Experiment, Int) -> Portfolio,
+ "more-velocity" to ({ experiment, i -> MoreVelocityPortfolio(experiment, i) }),
+ "more-hpc" to { experiment, i -> MoreHpcPortfolio(experiment, i) },
+ "operational-phenomena" to { experiment, i -> OperationalPhenomenaPortfolio(experiment, i) },
ignoreCase = true
)
.multiple()
@@ -122,93 +106,47 @@ class ExperimentCli : CliktCommand(name = "sc20-experiment") {
/**
* The maximum number of worker threads to use.
*/
- private val workerParallelism by option("--worker-parallelism")
+ private val parallelism by option("--parallelism")
.int()
.default(Runtime.getRuntime().availableProcessors())
/**
- * The maximum number of host writer threads to use.
- */
- private val hostWriterParallelism by option("--host-writer-parallelism")
- .int()
- .default(8)
-
- /**
- * The maximum number of provisioner writer threads to use.
- */
- private val provisionerWriterParallelism by option("--provisioner-writer-parallelism")
- .int()
- .default(1)
-
- /**
* The buffer size for writing results.
*/
private val bufferSize by option("--buffer-size")
.int()
.default(4096)
- override fun run() {
- val ds = HikariDataSource()
- ds.maximumPoolSize = Runtime.getRuntime().availableProcessors() * 3
- ds.jdbcUrl = jdbcUrl
- ds.addDataSourceProperty("reWriteBatchedInserts", "true")
+ /**
+ * The path to the output directory.
+ */
+ private val output by option("-O", "--output", help = "path to the output directory")
+ .file(canBeFile = false)
+ .defaultLazy { File("data") }
- reporter.bufferSize = bufferSize
- reporter.hostParallelism = hostWriterParallelism
- reporter.provisionerParallelism = provisionerWriterParallelism
+ override fun run() {
+ logger.info { "Constructing performance interference model" }
val performanceInterferenceModel =
performanceInterferenceStream?.let { Sc20PerformanceInterferenceReader(it).construct() }
- val runner = ExperimentRunner(portfolios, ds, reporter, environmentPath, tracePath, performanceInterferenceModel, workerParallelism)
+ logger.info { "Creating experiment descriptor" }
+ val descriptor = object : Experiment(environmentPath, tracePath, output, performanceInterferenceModel, vmPlacements, bufferSize) {
+ private val descriptor = this
+ override val children: Sequence<ExperimentDescriptor> = sequence {
+ for ((i, producer) in portfolios.withIndex()) {
+ yield(producer(descriptor, i))
+ }
+ }
+ }
+ logger.info { "Starting experiment runner [parallelism=$parallelism]" }
+ val scheduler = ThreadPoolExperimentScheduler(parallelism)
+ val runner = DefaultExperimentRunner(scheduler)
try {
- runner.run()
+ runner.execute(descriptor, ConsoleExperimentReporter())
} finally {
- runner.close()
- ds.close()
- }
- }
-}
-
-/**
- * An option for specifying the type of reporter to use.
- */
-internal sealed class Reporter(name: String) : OptionGroup(name), ExperimentReporterProvider {
- var bufferSize = 4096
- var hostParallelism = 8
- var provisionerParallelism = 1
-
- class Parquet : Reporter("Options for reporting using Parquet") {
- private val path by option("--parquet-directory", help = "path to where the output should be stored")
- .file()
- .defaultLazy { File("data") }
-
- override fun createReporter(scenario: Long, run: Int): ExperimentReporter {
- val hostWriter = ParquetHostMetricsWriter(File(path, "$scenario-$run-host.parquet"), bufferSize)
- val provisionerWriter = ParquetProvisionerMetricsWriter(File(path, "$scenario-$run-provisioner.parquet"), bufferSize)
- return ExperimentParquetReporter(scenario, run, hostWriter, provisionerWriter)
- }
-
- override fun close() {}
- }
-
- class Postgres : Reporter("Options for reporting using PostgreSQL") {
- lateinit var hostWriter: PostgresHostMetricsWriter
- lateinit var provisionerWriter: PostgresProvisionerMetricsWriter
-
- override fun init(ds: DataSource) {
- hostWriter = PostgresHostMetricsWriter(ds, hostParallelism, bufferSize)
- provisionerWriter = PostgresProvisionerMetricsWriter(ds, provisionerParallelism, bufferSize)
- }
-
- override fun createReporter(scenario: Long, run: Int): ExperimentReporter {
- return ExperimentPostgresReporter(scenario, run, hostWriter, provisionerWriter)
- }
-
- override fun close() {
- hostWriter.close()
- provisionerWriter.close()
+ scheduler.close()
}
}
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Experiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Experiment.kt
new file mode 100644
index 00000000..5feb5917
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Experiment.kt
@@ -0,0 +1,78 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.experiment
+
+import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
+import com.atlarge.opendc.experiments.sc20.runner.ContainerExperimentDescriptor
+import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor
+import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionContext
+import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionListener
+import com.atlarge.opendc.experiments.sc20.telemetry.RunEvent
+import com.atlarge.opendc.experiments.sc20.telemetry.parquet.ParquetRunEventWriter
+import java.io.File
+
+/**
+ * The global configuration of the experiment.
+ *
+ * @param environments The path to the topologies directory.
+ * @param traces The path to the traces directory.
+ * @param output The output directory.
+ * @param performanceInterferenceModel The optional performance interference model that has been specified.
+ * @param vmPlacements Original VM placement in the trace.
+ * @param bufferSize The buffer size of the event reporters.
+ */
+public abstract class Experiment(
+ val environments: File,
+ val traces: File,
+ val output: File,
+ val performanceInterferenceModel: PerformanceInterferenceModel?,
+ val vmPlacements: Map<String, String>,
+ val bufferSize: Int
+) : ContainerExperimentDescriptor() {
+ override val parent: ExperimentDescriptor? = null
+
+ override suspend fun invoke(context: ExperimentExecutionContext) {
+ val writer = ParquetRunEventWriter(File(output, "experiments.parquet"), bufferSize)
+ try {
+ val listener = object : ExperimentExecutionListener by context.listener {
+ override fun descriptorRegistered(descriptor: ExperimentDescriptor) {
+ if (descriptor is Run) {
+ writer.write(RunEvent(descriptor, System.currentTimeMillis()))
+ }
+
+ context.listener.descriptorRegistered(descriptor)
+ }
+ }
+
+ val newContext = object : ExperimentExecutionContext by context {
+ override val listener: ExperimentExecutionListener = listener
+ }
+
+ super.invoke(newContext)
+ } finally {
+ writer.close()
+ }
+ }
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
index 1bc463c3..32dc87ef 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
@@ -22,7 +22,7 @@
* SOFTWARE.
*/
-package com.atlarge.opendc.experiments.sc20
+package com.atlarge.opendc.experiments.sc20.experiment
import com.atlarge.odcsim.Domain
import com.atlarge.odcsim.simulationContext
@@ -41,7 +41,7 @@ import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy
import com.atlarge.opendc.core.failure.CorrelatedFaultInjector
import com.atlarge.opendc.core.failure.FailureDomain
import com.atlarge.opendc.core.failure.FaultInjector
-import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporter
+import com.atlarge.opendc.experiments.sc20.experiment.monitor.ExperimentMonitor
import com.atlarge.opendc.experiments.sc20.trace.Sc20StreamingParquetTraceReader
import com.atlarge.opendc.format.environment.EnvironmentReader
import com.atlarge.opendc.format.trace.TraceReader
@@ -83,7 +83,13 @@ suspend fun createFailureDomain(
for (node in bareMetalProvisioner.nodes()) {
val cluster = node.metadata[NODE_CLUSTER] as String
val injector =
- injectors.getOrPut(cluster) { createFaultInjector(simulationContext.domain, random, failureInterval) }
+ injectors.getOrPut(cluster) {
+ createFaultInjector(
+ simulationContext.domain,
+ random,
+ failureInterval
+ )
+ }
injector.enqueue(node.metadata["driver"] as FailureDomain)
}
}
@@ -143,7 +149,7 @@ suspend fun createProvisioner(
* Attach the specified monitor to the VM provisioner.
*/
@OptIn(ExperimentalCoroutinesApi::class)
-suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: ExperimentReporter) {
+suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: ExperimentMonitor) {
val domain = simulationContext.domain
val clock = simulationContext.clock
val hypervisors = scheduler.drivers()
@@ -151,13 +157,13 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: Ex
// Monitor hypervisor events
for (hypervisor in hypervisors) {
// TODO Do not expose VirtDriver directly but use Hypervisor class.
- reporter.reportHostStateChange(clock.millis(), hypervisor, (hypervisor as SimpleVirtDriver).server)
+ monitor.reportHostStateChange(clock.millis(), hypervisor, (hypervisor as SimpleVirtDriver).server)
hypervisor.server.events
.onEach { event ->
val time = clock.millis()
when (event) {
is ServerEvent.StateChanged -> {
- reporter.reportHostStateChange(time, hypervisor, event.server)
+ monitor.reportHostStateChange(time, hypervisor, event.server)
}
}
}
@@ -165,7 +171,7 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: Ex
hypervisor.events
.onEach { event ->
when (event) {
- is HypervisorEvent.SliceFinished -> reporter.reportHostSlice(
+ is HypervisorEvent.SliceFinished -> monitor.reportHostSlice(
simulationContext.clock.millis(),
event.requestedBurst,
event.grantedBurst,
@@ -182,7 +188,7 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: Ex
val driver = hypervisor.server.services[BareMetalDriver.Key]
driver.powerDraw
- .onEach { reporter.reportPowerConsumption(hypervisor.server, it) }
+ .onEach { monitor.reportPowerConsumption(hypervisor.server, it) }
.launchIn(domain)
}
@@ -190,7 +196,7 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: Ex
.onEach { event ->
when (event) {
is VirtProvisioningEvent.MetricsAvailable ->
- reporter.reportProvisionerMetrics(clock.millis(), event)
+ monitor.reportProvisionerMetrics(clock.millis(), event)
}
}
.launchIn(domain)
@@ -199,7 +205,7 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: Ex
/**
* Process the trace.
*/
-suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, reporter: ExperimentReporter, vmPlacements: Map<String, String> = emptyMap()) {
+suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, monitor: ExperimentMonitor, vmPlacements: Map<String, String> = emptyMap()) {
val domain = simulationContext.domain
try {
@@ -239,7 +245,7 @@ suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtP
val time = simulationContext.clock.millis()
if (it is ServerEvent.StateChanged) {
- reporter.reportVmStateChange(time, it.server)
+ monitor.reportVmStateChange(time, it.server)
}
delay(1)
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolio.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolio.kt
new file mode 100644
index 00000000..6a40f5fb
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolio.kt
@@ -0,0 +1,90 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.experiment
+
+import com.atlarge.opendc.experiments.sc20.experiment.model.OperationalPhenomena
+import com.atlarge.opendc.experiments.sc20.experiment.model.Topology
+import com.atlarge.opendc.experiments.sc20.experiment.model.Workload
+import com.atlarge.opendc.experiments.sc20.runner.ContainerExperimentDescriptor
+
+/**
+ * A portfolio represents a collection of scenarios are tested.
+ */
+public abstract class Portfolio(
+ override val parent: Experiment,
+ val id: Int,
+ val name: String
+) : ContainerExperimentDescriptor() {
+ /**
+ * The topologies to consider.
+ */
+ protected abstract val topologies: List<Topology>
+
+ /**
+ * The workloads to consider.
+ */
+ protected abstract val workloads: List<Workload>
+
+ /**
+ * The operational phenomenas to consider.
+ */
+ protected abstract val operationalPhenomenas: List<OperationalPhenomena>
+
+ /**
+ * The allocation policies to consider.
+ */
+ protected abstract val allocationPolicies: List<String>
+
+ /**
+ * The number of repetitions to perform.
+ */
+ open val repetitions: Int = 32
+
+ /**
+ * Resolve the children of this container.
+ */
+ override val children: Sequence<Scenario> = sequence {
+ var id = 0
+ for (topology in topologies) {
+ for (workload in workloads) {
+ for (operationalPhenomena in operationalPhenomenas) {
+ for (allocationPolicy in allocationPolicies) {
+ yield(
+ Scenario(
+ this@Portfolio,
+ id++,
+ repetitions,
+ topology,
+ workload,
+ allocationPolicy,
+ operationalPhenomena
+ )
+ )
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt
index 668304b6..4800ceba 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt
@@ -22,42 +22,13 @@
* SOFTWARE.
*/
-package com.atlarge.opendc.experiments.sc20
-
-abstract class AbstractSc20Portfolio(name: String) : Portfolio(name) {
- abstract val topologies: List<Topology>
- abstract val workloads: List<Workload>
- abstract val operationalPhenomena: List<Pair<Double, Boolean>>
- abstract val allocationPolicies: List<String>
-
- open val repetitions = 8
-
- override val scenarios: Sequence<Scenario> = sequence {
- for (topology in topologies) {
- for (workload in workloads) {
- for ((failureFrequency, hasInterference) in operationalPhenomena) {
- for (allocationPolicy in allocationPolicies) {
- yield(
- Scenario(
- this@AbstractSc20Portfolio,
- repetitions,
- topology,
- workload,
- allocationPolicy,
- failureFrequency,
- hasInterference
- )
- )
- }
- }
- }
- }
- }
-}
+package com.atlarge.opendc.experiments.sc20.experiment
-private val defaultFailureInterval = 24.0 * 7
+import com.atlarge.opendc.experiments.sc20.experiment.model.OperationalPhenomena
+import com.atlarge.opendc.experiments.sc20.experiment.model.Topology
+import com.atlarge.opendc.experiments.sc20.experiment.model.Workload
-object HorVerPortfolio : AbstractSc20Portfolio("horizontal_vs_vertical") {
+public class HorVerPortfolio(parent: Experiment, id: Int) : Portfolio(parent, id, "horizontal_vs_vertical") {
override val topologies = listOf(
Topology("base"),
Topology("rep-vol-hor-hom"),
@@ -77,8 +48,8 @@ object HorVerPortfolio : AbstractSc20Portfolio("horizontal_vs_vertical") {
Workload("solvinity", 1.0)
)
- override val operationalPhenomena = listOf(
- defaultFailureInterval to true
+ override val operationalPhenomenas = listOf(
+ OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true)
)
override val allocationPolicies = listOf(
@@ -86,7 +57,7 @@ object HorVerPortfolio : AbstractSc20Portfolio("horizontal_vs_vertical") {
)
}
-object MoreVelocityPortfolio : AbstractSc20Portfolio("more_velocity") {
+public class MoreVelocityPortfolio(parent: Experiment, id: Int) : Portfolio(parent, id, "more_velocity") {
override val topologies = listOf(
Topology("base"),
Topology("rep-vel-ver-hom"),
@@ -102,8 +73,8 @@ object MoreVelocityPortfolio : AbstractSc20Portfolio("more_velocity") {
Workload("solvinity", 1.0)
)
- override val operationalPhenomena = listOf(
- defaultFailureInterval to true
+ override val operationalPhenomenas = listOf(
+ OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true)
)
override val allocationPolicies = listOf(
@@ -111,7 +82,7 @@ object MoreVelocityPortfolio : AbstractSc20Portfolio("more_velocity") {
)
}
-object MoreHpcPortfolio : AbstractSc20Portfolio("more_hpc") {
+public class MoreHpcPortfolio(parent: Experiment, id: Int) : Portfolio(parent, id, "more_hpc") {
override val topologies = listOf(
Topology("base"),
Topology("exp-vol-hor-hom"),
@@ -126,8 +97,8 @@ object MoreHpcPortfolio : AbstractSc20Portfolio("more_hpc") {
Workload("solvinity", 1.0)
)
- override val operationalPhenomena = listOf(
- defaultFailureInterval to true
+ override val operationalPhenomenas = listOf(
+ OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true)
)
override val allocationPolicies = listOf(
@@ -135,7 +106,7 @@ object MoreHpcPortfolio : AbstractSc20Portfolio("more_hpc") {
)
}
-object OperationalPhenomenaPortfolio : AbstractSc20Portfolio("operational_phenomena") {
+public class OperationalPhenomenaPortfolio(parent: Experiment, id: Int) : Portfolio(parent, id, "operational_phenomena") {
override val topologies = listOf(
Topology("base")
)
@@ -146,11 +117,11 @@ object OperationalPhenomenaPortfolio : AbstractSc20Portfolio("operational_phenom
Workload("solvinity", 1.0)
)
- override val operationalPhenomena = listOf(
- defaultFailureInterval to true,
- 0.0 to true,
- defaultFailureInterval to false,
- defaultFailureInterval to true
+ override val operationalPhenomenas = listOf(
+ OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true),
+ OperationalPhenomena(failureFrequency = 0.0, hasInterference = true),
+ OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = false),
+ OperationalPhenomena(failureFrequency = 0.0, hasInterference = false)
)
override val allocationPolicies = listOf(
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Scenario.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt
index 457255cb..6d53fd17 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Scenario.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt
@@ -22,24 +22,26 @@
* SOFTWARE.
*/
-package com.atlarge.opendc.experiments.sc20
+package com.atlarge.opendc.experiments.sc20.experiment
import com.atlarge.odcsim.SimulationEngineProvider
-import com.atlarge.opendc.compute.core.workload.VmWorkload
import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy
import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy
import com.atlarge.opendc.compute.virt.service.allocation.NumberOfActiveServersAllocationPolicy
import com.atlarge.opendc.compute.virt.service.allocation.ProvisionedCoresAllocationPolicy
import com.atlarge.opendc.compute.virt.service.allocation.RandomAllocationPolicy
import com.atlarge.opendc.compute.virt.service.allocation.ReplayAllocationPolicy
-import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporter
-import com.atlarge.opendc.format.environment.EnvironmentReader
-import com.atlarge.opendc.format.trace.TraceReader
+import com.atlarge.opendc.experiments.sc20.experiment.monitor.ParquetExperimentMonitor
+import com.atlarge.opendc.experiments.sc20.runner.TrialExperimentDescriptor
+import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionContext
+import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader
+import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader
+import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
-import kotlinx.coroutines.runBlocking
import mu.KotlinLogging
+import java.io.File
import java.util.ServiceLoader
import kotlin.random.Random
@@ -54,36 +56,19 @@ private val logger = KotlinLogging.logger {}
private val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
/**
- * A scenario represents a single point in the design space (a unique combination of parameters).
+ * An experiment run represent a single invocation of a trial and is used to distinguish between repetitions of the
+ * same set of parameters.
*/
-public class Scenario(
- val portfolio: Portfolio,
- val repetitions: Int,
- val topology: Topology,
- val workload: Workload,
- val allocationPolicy: String,
- val failureFrequency: Double,
- val hasInterference: Boolean
-) {
- /**
- * The runs this scenario consists of.
- */
- public val runs: Sequence<Run> = sequence {
- repeat(repetitions) { i ->
- yield(Run(this@Scenario, i, i))
- }
- }
-
- /**
- * Perform a single run of this scenario.
- */
- public operator fun invoke(run: Run, reporter: ExperimentReporter, environment: EnvironmentReader, trace: TraceReader<VmWorkload>) {
- val system = provider("experiment-${run.id}")
+public data class Run(override val parent: Scenario, val id: Int, val seed: Int) : TrialExperimentDescriptor() {
+ override suspend fun invoke(context: ExperimentExecutionContext) {
+ val experiment = parent.parent.parent
+ val system = provider("experiment-$id")
val root = system.newDomain("root")
- val seeder = Random(run.seed)
+ val seeder = Random(seed)
+ val environment = Sc20ClusterEnvironmentReader(File(experiment.environments, "${parent.topology.name}.txt"))
val chan = Channel<Unit>(Channel.CONFLATED)
- val allocationPolicy = when (this.allocationPolicy) {
+ val allocationPolicy = when (parent.allocationPolicy) {
"mem" -> AvailableMemoryAllocationPolicy()
"mem-inv" -> AvailableMemoryAllocationPolicy(true)
"core-mem" -> AvailableCoreMemoryAllocationPolicy()
@@ -94,21 +79,49 @@ public class Scenario(
"provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true)
"random" -> RandomAllocationPolicy(Random(seeder.nextInt()))
"replay" -> ReplayAllocationPolicy(emptyMap())
- else -> throw IllegalArgumentException("Unknown policy ${this.allocationPolicy}")
+ else -> throw IllegalArgumentException("Unknown policy ${parent.allocationPolicy}")
}
+ @Suppress("UNCHECKED_CAST")
+ val rawTraceReaders = context.cache.computeIfAbsent("raw-trace-readers") { mutableMapOf<String, Sc20RawParquetTraceReader>() } as MutableMap<String, Sc20RawParquetTraceReader>
+ val raw = synchronized(rawTraceReaders) {
+ val name = parent.workload.name
+ rawTraceReaders.computeIfAbsent(name) {
+ logger.info { "Loading trace $name" }
+ Sc20RawParquetTraceReader(File(experiment.traces, name))
+ }
+ }
+ val trace = Sc20ParquetTraceReader(raw, experiment.performanceInterferenceModel, this)
+
+ val monitor = ParquetExperimentMonitor(this)
+
root.launch {
- val (bareMetalProvisioner, scheduler) = createProvisioner(root, environment, allocationPolicy)
+ val (bareMetalProvisioner, scheduler) = createProvisioner(
+ root,
+ environment,
+ allocationPolicy
+ )
- val failureDomain = if (failureFrequency > 0) {
+ val failureDomain = if (parent.operationalPhenomena.failureFrequency > 0) {
logger.debug("ENABLING failures")
- createFailureDomain(seeder.nextInt(), failureFrequency, bareMetalProvisioner, chan)
+ createFailureDomain(
+ seeder.nextInt(),
+ parent.operationalPhenomena.failureFrequency,
+ bareMetalProvisioner,
+ chan
+ )
} else {
null
}
- attachMonitor(scheduler, reporter)
- processTrace(trace, scheduler, chan, reporter, emptyMap())
+ attachMonitor(scheduler, monitor)
+ processTrace(
+ trace,
+ scheduler,
+ chan,
+ monitor,
+ experiment.vmPlacements
+ )
logger.debug("SUBMIT=${scheduler.submittedVms}")
logger.debug("FAIL=${scheduler.unscheduledVms}")
@@ -120,9 +133,11 @@ public class Scenario(
scheduler.terminate()
}
- runBlocking {
+ try {
system.run()
+ } finally {
system.terminate()
+ monitor.close()
}
}
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Scenario.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Scenario.kt
new file mode 100644
index 00000000..98bc7fc2
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Scenario.kt
@@ -0,0 +1,48 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.experiment
+
+import com.atlarge.opendc.experiments.sc20.experiment.model.OperationalPhenomena
+import com.atlarge.opendc.experiments.sc20.experiment.model.Topology
+import com.atlarge.opendc.experiments.sc20.experiment.model.Workload
+import com.atlarge.opendc.experiments.sc20.runner.ContainerExperimentDescriptor
+import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor
+
+/**
+ * A scenario represents a single point in the design space (a unique combination of parameters).
+ */
+public class Scenario(
+ override val parent: Portfolio,
+ val id: Int,
+ val repetitions: Int,
+ val topology: Topology,
+ val workload: Workload,
+ val allocationPolicy: String,
+ val operationalPhenomena: OperationalPhenomena
+) : ContainerExperimentDescriptor() {
+ override val children: Sequence<ExperimentDescriptor> = sequence {
+ repeat(repetitions) { i -> yield(Run(this@Scenario, i, i)) }
+ }
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/OperationalPhenomena.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/OperationalPhenomena.kt
new file mode 100644
index 00000000..af99df84
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/OperationalPhenomena.kt
@@ -0,0 +1,33 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.experiment.model
+
+/**
+ * Operation phenomena during experiments.
+ *
+ * @param failureFrequency The average time between failures in hours.
+ * @param hasInterference A flag to enable performance interference between VMs.
+ */
+public data class OperationalPhenomena(val failureFrequency: Double, val hasInterference: Boolean)
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Topology.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Topology.kt
index d2be9599..3ed71e09 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Topology.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Topology.kt
@@ -22,7 +22,7 @@
* SOFTWARE.
*/
-package com.atlarge.opendc.experiments.sc20
+package com.atlarge.opendc.experiments.sc20.experiment.model
/**
* The datacenter topology on which we test the workload.
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Workload.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Workload.kt
index 4ab5ec8c..2dbdf570 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Workload.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Workload.kt
@@ -22,7 +22,7 @@
* SOFTWARE.
*/
-package com.atlarge.opendc.experiments.sc20
+package com.atlarge.opendc.experiments.sc20.experiment.model
/**
* A workload that is considered for a scenario.
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ExperimentMonitor.kt
index 049035cc..1f674f00 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporter.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ExperimentMonitor.kt
@@ -22,7 +22,7 @@
* SOFTWARE.
*/
-package com.atlarge.opendc.experiments.sc20.reporter
+package com.atlarge.opendc.experiments.sc20.experiment.monitor
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.virt.driver.VirtDriver
@@ -30,9 +30,9 @@ import com.atlarge.opendc.compute.virt.service.VirtProvisioningEvent
import java.io.Closeable
/**
- * A reporter used by experiments to report metrics.
+ * A monitor watches the events of an experiment.
*/
-interface ExperimentReporter : Closeable {
+interface ExperimentMonitor : Closeable {
/**
* This method is invoked when the state of a VM changes.
*/
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetExperimentReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt
index 9426933e..33978aab 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetExperimentReporter.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt
@@ -22,23 +22,38 @@
* SOFTWARE.
*/
-package com.atlarge.opendc.experiments.sc20.reporter
+package com.atlarge.opendc.experiments.sc20.experiment.monitor
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.virt.driver.VirtDriver
import com.atlarge.opendc.compute.virt.service.VirtProvisioningEvent
+import com.atlarge.opendc.experiments.sc20.experiment.Run
+import com.atlarge.opendc.experiments.sc20.telemetry.HostEvent
+import com.atlarge.opendc.experiments.sc20.telemetry.ProvisionerEvent
+import com.atlarge.opendc.experiments.sc20.telemetry.parquet.ParquetHostEventWriter
+import com.atlarge.opendc.experiments.sc20.telemetry.parquet.ParquetProvisionerEventWriter
import mu.KotlinLogging
+import java.io.File
+/**
+ * The logger instance to use.
+ */
private val logger = KotlinLogging.logger {}
-class ExperimentParquetReporter(
- val scenario: Long,
- val run: Int,
- val hostWriter: ParquetHostMetricsWriter,
- val provisionerWriter: ParquetProvisionerMetricsWriter
-) :
- ExperimentReporter {
+/**
+ * An [ExperimentMonitor] that logs the events to a Parquet file.
+ */
+class ParquetExperimentMonitor(val run: Run) : ExperimentMonitor {
+ private val partition = "portfolio_id=${run.parent.parent.id}/scenario_id=${run.parent.id}/run_id=${run.id}"
+ private val hostWriter = ParquetHostEventWriter(
+ File(run.parent.parent.parent.output, "host-metrics/$partition/data.parquet"),
+ run.parent.parent.parent.bufferSize
+ )
+ private val provisionerWriter = ParquetProvisionerEventWriter(
+ File(run.parent.parent.parent.output, "provisioner-metrics/$partition/data.parquet"),
+ run.parent.parent.parent.bufferSize
+ )
private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>()
override fun reportVmStateChange(time: Long, server: Server) {}
@@ -92,7 +107,7 @@ class ExperimentParquetReporter(
duration: Long
) {
hostWriter.write(
- scenario, run, HostMetrics(
+ HostEvent(
time,
duration,
hostServer,
@@ -110,9 +125,7 @@ class ExperimentParquetReporter(
override fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) {
provisionerWriter.write(
- scenario,
- run,
- ProvisionerMetrics(
+ ProvisionerEvent(
time,
event.totalHostCount,
event.availableHostCount,
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt
new file mode 100644
index 00000000..f59402d5
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt
@@ -0,0 +1,75 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.reporter
+
+import com.atlarge.opendc.experiments.sc20.experiment.Run
+import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor
+import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionListener
+import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionResult
+import me.tongfei.progressbar.ProgressBar
+import me.tongfei.progressbar.ProgressBarBuilder
+
+/**
+ * A reporter that reports the experiment progress to the console.
+ */
+public class ConsoleExperimentReporter : ExperimentExecutionListener {
+ /**
+ * The active [Run]s.
+ */
+ private val runs: MutableSet<Run> = mutableSetOf()
+
+ /**
+ * The total number of runs.
+ */
+ private var total = 0
+
+ /**
+ * The progress bar to keep track of the progress.
+ */
+ private val pb: ProgressBar = ProgressBarBuilder()
+ .setTaskName("")
+ .setInitialMax(1)
+ .build()
+
+ override fun descriptorRegistered(descriptor: ExperimentDescriptor) {
+ if (descriptor is Run) {
+ runs += descriptor
+ pb.maxHint((++total).toLong())
+ }
+ }
+
+ override fun executionFinished(descriptor: ExperimentDescriptor, result: ExperimentExecutionResult) {
+ if (descriptor is Run) {
+ runs -= descriptor
+
+ pb.stepTo(total - runs.size.toLong())
+ if (runs.isEmpty()) {
+ pb.close()
+ }
+ }
+ }
+
+ override fun executionStarted(descriptor: ExperimentDescriptor) {}
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetHostMetricsWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetHostMetricsWriter.kt
deleted file mode 100644
index 33839191..00000000
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetHostMetricsWriter.kt
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2020 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package com.atlarge.opendc.experiments.sc20.reporter
-
-import org.apache.avro.Schema
-import org.apache.avro.SchemaBuilder
-import org.apache.avro.generic.GenericData
-import java.io.File
-
-private val schema: Schema = SchemaBuilder
- .record("host_metrics")
- .namespace("com.atlarge.opendc.experiments.sc20")
- .fields()
- .name("scenario_id").type().longType().noDefault()
- .name("run_id").type().intType().noDefault()
- .name("timestamp").type().longType().noDefault()
- .name("duration").type().longType().noDefault()
- .name("host_id").type().stringType().noDefault()
- .name("state").type().stringType().noDefault()
- .name("vm_count").type().intType().noDefault()
- .name("requested_burst").type().longType().noDefault()
- .name("granted_burst").type().longType().noDefault()
- .name("overcommissioned_burst").type().longType().noDefault()
- .name("interfered_burst").type().longType().noDefault()
- .name("cpu_usage").type().doubleType().noDefault()
- .name("cpu_demand").type().doubleType().noDefault()
- .name("power_draw").type().doubleType().noDefault()
- .endRecord()
-
-public class ParquetHostMetricsWriter(path: File, batchSize: Int) :
- ParquetMetricsWriter<HostMetrics>(path, schema, batchSize) {
-
- override fun persist(action: Action.Write<HostMetrics>, row: GenericData.Record) {
- row.put("scenario_id", action.scenario)
- row.put("run_id", action.run)
- row.put("host_id", action.metrics.host.name)
- row.put("state", action.metrics.host.state.name)
- row.put("timestamp", action.metrics.time)
- row.put("duration", action.metrics.duration)
- row.put("vm_count", action.metrics.vmCount)
- row.put("requested_burst", action.metrics.requestedBurst)
- row.put("granted_burst", action.metrics.grantedBurst)
- row.put("overcommissioned_burst", action.metrics.overcommissionedBurst)
- row.put("interfered_burst", action.metrics.interferedBurst)
- row.put("cpu_usage", action.metrics.cpuUsage)
- row.put("cpu_demand", action.metrics.cpuDemand)
- row.put("power_draw", action.metrics.powerDraw)
- }
-
- override fun toString(): String = "host-writer"
-}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetProvisionerMetricsWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetProvisionerMetricsWriter.kt
deleted file mode 100644
index 0c74b23e..00000000
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetProvisionerMetricsWriter.kt
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2020 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package com.atlarge.opendc.experiments.sc20.reporter
-
-import org.apache.avro.Schema
-import org.apache.avro.SchemaBuilder
-import org.apache.avro.generic.GenericData
-import java.io.File
-
-private val schema: Schema = SchemaBuilder
- .record("host_metrics")
- .namespace("com.atlarge.opendc.experiments.sc20")
- .fields()
- .name("scenario_id").type().longType().noDefault()
- .name("run_id").type().intType().noDefault()
- .name("timestamp").type().longType().noDefault()
- .name("host_total_count").type().intType().noDefault()
- .name("host_available_count").type().intType().noDefault()
- .name("vm_total_count").type().intType().noDefault()
- .name("vm_active_count").type().intType().noDefault()
- .name("vm_inactive_count").type().intType().noDefault()
- .name("vm_waiting_count").type().intType().noDefault()
- .name("vm_failed_count").type().intType().noDefault()
- .endRecord()
-
-public class ParquetProvisionerMetricsWriter(path: File, batchSize: Int) :
- ParquetMetricsWriter<ProvisionerMetrics>(path, schema, batchSize) {
-
- override fun persist(action: Action.Write<ProvisionerMetrics>, row: GenericData.Record) {
- row.put("scenario_id", action.scenario)
- row.put("run_id", action.run)
- row.put("timestamp", action.metrics.time)
- row.put("host_total_count", action.metrics.totalHostCount)
- row.put("host_available_count", action.metrics.availableHostCount)
- row.put("vm_total_count", action.metrics.totalVmCount)
- row.put("vm_active_count", action.metrics.activeVmCount)
- row.put("vm_inactive_count", action.metrics.inactiveVmCount)
- row.put("vm_waiting_count", action.metrics.waitingVmCount)
- row.put("vm_failed_count", action.metrics.failedVmCount)
- }
-
- override fun toString(): String = "host-writer"
-}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt
deleted file mode 100644
index 595b9777..00000000
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2020 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package com.atlarge.opendc.experiments.sc20.reporter
-
-import com.atlarge.opendc.compute.core.Server
-import com.atlarge.opendc.compute.core.ServerState
-import com.atlarge.opendc.compute.virt.driver.VirtDriver
-import com.atlarge.opendc.compute.virt.service.VirtProvisioningEvent
-import mu.KotlinLogging
-
-private val logger = KotlinLogging.logger {}
-
-class ExperimentPostgresReporter(
- val scenario: Long,
- val run: Int,
- val hostWriter: PostgresHostMetricsWriter,
- val provisionerWriter: PostgresProvisionerMetricsWriter
-) : ExperimentReporter {
- private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>()
-
- override fun reportVmStateChange(time: Long, server: Server) {}
-
- override fun reportHostStateChange(
- time: Long,
- driver: VirtDriver,
- server: Server
- ) {
- val lastServerState = lastServerStates[server]
- logger.debug("Host ${server.uid} changed state ${server.state} [$time]")
-
- if (server.state == ServerState.SHUTOFF && lastServerState != null) {
- val duration = time - lastServerState.second
- reportHostSlice(
- time,
- 0,
- 0,
- 0,
- 0,
- 0.0,
- 0.0,
- 0,
- server,
- duration
- )
-
- lastServerStates.remove(server)
- lastPowerConsumption.remove(server)
- } else {
- lastServerStates[server] = Pair(server.state, time)
- }
- }
-
- private val lastPowerConsumption = mutableMapOf<Server, Double>()
-
- override fun reportPowerConsumption(host: Server, draw: Double) {
- lastPowerConsumption[host] = draw
- }
-
- override fun reportHostSlice(
- time: Long,
- requestedBurst: Long,
- grantedBurst: Long,
- overcommissionedBurst: Long,
- interferedBurst: Long,
- cpuUsage: Double,
- cpuDemand: Double,
- numberOfDeployedImages: Int,
- hostServer: Server,
- duration: Long
- ) {
- hostWriter.write(
- scenario, run, HostMetrics(
- time,
- duration,
- hostServer,
- numberOfDeployedImages,
- requestedBurst,
- grantedBurst,
- overcommissionedBurst,
- interferedBurst,
- cpuUsage,
- cpuDemand,
- lastPowerConsumption[hostServer] ?: 200.0
- )
- )
- }
-
- override fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) {
- provisionerWriter.write(
- scenario,
- run,
- ProvisionerMetrics(
- time,
- event.totalHostCount,
- event.availableHostCount,
- event.totalVmCount,
- event.activeVmCount,
- event.inactiveVmCount,
- event.waitingVmCount,
- event.failedVmCount
- )
- )
- }
-
- override fun close() {}
-}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresHostMetricsWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresHostMetricsWriter.kt
deleted file mode 100644
index 57e665ae..00000000
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresHostMetricsWriter.kt
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2020 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package com.atlarge.opendc.experiments.sc20.reporter
-
-import de.bytefish.pgbulkinsert.row.SimpleRow
-import de.bytefish.pgbulkinsert.row.SimpleRowWriter
-import javax.sql.DataSource
-
-private val table: SimpleRowWriter.Table = SimpleRowWriter.Table(
- "host_metrics",
- *arrayOf(
- "scenario_id",
- "run_id",
- "host_id",
- "state",
- "timestamp",
- "duration",
- "vm_count",
- "requested_burst",
- "granted_burst",
- "overcommissioned_burst",
- "interfered_burst",
- "cpu_usage",
- "cpu_demand",
- "power_draw"
- )
-)
-
-/**
- * A [PostgresMetricsWriter] for persisting [HostMetrics].
- */
-public class PostgresHostMetricsWriter(ds: DataSource, parallelism: Int, batchSize: Int) :
- PostgresMetricsWriter<HostMetrics>(ds, table, parallelism, batchSize) {
-
- override fun persist(action: Action.Write<HostMetrics>, row: SimpleRow) {
- row.setLong("scenario_id", action.scenario)
- row.setInteger("run_id", action.run)
- row.setText("host_id", action.metrics.host.name)
- row.setText("state", action.metrics.host.state.name)
- row.setLong("timestamp", action.metrics.time)
- row.setLong("duration", action.metrics.duration)
- row.setInteger("vm_count", action.metrics.vmCount)
- row.setLong("requested_burst", action.metrics.requestedBurst)
- row.setLong("granted_burst", action.metrics.grantedBurst)
- row.setLong("overcommissioned_burst", action.metrics.overcommissionedBurst)
- row.setLong("interfered_burst", action.metrics.interferedBurst)
- row.setDouble("cpu_usage", action.metrics.cpuUsage)
- row.setDouble("cpu_demand", action.metrics.cpuDemand)
- row.setDouble("power_draw", action.metrics.powerDraw)
- }
-
- override fun toString(): String = "host-writer"
-}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt
deleted file mode 100644
index bee01e51..00000000
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2020 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package com.atlarge.opendc.experiments.sc20.reporter
-
-import de.bytefish.pgbulkinsert.row.SimpleRow
-import de.bytefish.pgbulkinsert.row.SimpleRowWriter
-import org.postgresql.PGConnection
-import java.io.Closeable
-import java.util.concurrent.ArrayBlockingQueue
-import java.util.concurrent.BlockingQueue
-import java.util.concurrent.Executors
-import java.util.concurrent.TimeUnit
-import javax.sql.DataSource
-
-/**
- * The experiment writer is a separate thread that is responsible for writing the results to the
- * database.
- */
-public abstract class PostgresMetricsWriter<T>(
- private val ds: DataSource,
- private val table: SimpleRowWriter.Table,
- private val parallelism: Int = 8,
- private val bufferSize: Int = 4096
-) : Runnable, Closeable {
- /**
- * The queue of commands to process.
- */
- private val queue: BlockingQueue<Action> = ArrayBlockingQueue(parallelism * bufferSize)
-
- /**
- * The executor service to use.
- */
- private val executorService = Executors.newFixedThreadPool(parallelism)
-
- /**
- * Write the specified metrics to the database.
- */
- public fun write(scenario: Long, run: Int, metrics: T) {
- queue.put(Action.Write(scenario, run, metrics))
- }
-
- /**
- * Signal the writer to stop.
- */
- public override fun close() {
- repeat(parallelism) {
- queue.put(Action.Stop)
- }
- executorService.shutdown()
- executorService.awaitTermination(5, TimeUnit.MINUTES)
- }
-
- /**
- * Persist the specified metrics to the given [row].
- */
- public abstract fun persist(action: Action.Write<T>, row: SimpleRow)
-
- init {
- repeat(parallelism) {
- executorService.submit { run() }
- }
- }
-
- /**
- * Start the writer thread.
- */
- override fun run() {
- val conn = ds.connection
-
- val writer = SimpleRowWriter(table)
- writer.open(conn.unwrap(PGConnection::class.java))
-
- try {
-
- loop@ while (true) {
- val action = queue.take()
- when (action) {
- is Action.Stop -> break@loop
- is Action.Write<*> -> writer.startRow {
- @Suppress("UNCHECKED_CAST")
- persist(action as Action.Write<T>, it)
- }
- }
- }
- } finally {
- writer.close()
- conn.close()
- }
- }
-
- sealed class Action {
- /**
- * A poison pill that will stop the writer thread.
- */
- object Stop : Action()
-
- /**
- * Write the specified metrics to the database.
- */
- data class Write<out T>(val scenario: Long, val run: Int, val metrics: T) : Action()
- }
-}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresProvisionerMetricsWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresProvisionerMetricsWriter.kt
deleted file mode 100644
index 17788112..00000000
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresProvisionerMetricsWriter.kt
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2020 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package com.atlarge.opendc.experiments.sc20.reporter
-
-import de.bytefish.pgbulkinsert.row.SimpleRow
-import de.bytefish.pgbulkinsert.row.SimpleRowWriter
-import javax.sql.DataSource
-
-private val table: SimpleRowWriter.Table = SimpleRowWriter.Table(
- "provisioner_metrics",
- *arrayOf(
- "scenario_id",
- "run_id",
- "timestamp",
- "host_total_count",
- "host_available_count",
- "vm_total_count",
- "vm_active_count",
- "vm_inactive_count",
- "vm_waiting_count",
- "vm_failed_count"
- )
-)
-
-/**
- * A [PostgresMetricsWriter] for persisting [ProvisionerMetrics].
- */
-public class PostgresProvisionerMetricsWriter(ds: DataSource, parallelism: Int, batchSize: Int) :
- PostgresMetricsWriter<ProvisionerMetrics>(ds, table, parallelism, batchSize) {
-
- override fun persist(action: Action.Write<ProvisionerMetrics>, row: SimpleRow) {
- row.setLong("scenario_id", action.scenario)
- row.setInteger("run_id", action.run)
- row.setLong("timestamp", action.metrics.time)
- row.setInteger("host_total_count", action.metrics.totalHostCount)
- row.setInteger("host_available_count", action.metrics.availableHostCount)
- row.setInteger("vm_total_count", action.metrics.totalVmCount)
- row.setInteger("vm_active_count", action.metrics.activeVmCount)
- row.setInteger("vm_inactive_count", action.metrics.inactiveVmCount)
- row.setInteger("vm_waiting_count", action.metrics.waitingVmCount)
- row.setInteger("vm_failed_count", action.metrics.failedVmCount)
- }
-
- override fun toString(): String = "provisioner-writer"
-}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ContainerExperimentDescriptor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ContainerExperimentDescriptor.kt
new file mode 100644
index 00000000..dac32586
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ContainerExperimentDescriptor.kt
@@ -0,0 +1,68 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.runner
+
+import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionContext
+import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionResult
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.supervisorScope
+
+/**
+ * An abstract [ExperimentDescriptor] specifically for containers.
+ */
+public abstract class ContainerExperimentDescriptor : ExperimentDescriptor() {
+ /**
+ * The child descriptors of this container.
+ */
+ public abstract val children: Sequence<ExperimentDescriptor>
+
+ override val type: Type = Type.CONTAINER
+
+ override suspend fun invoke(context: ExperimentExecutionContext) {
+ val materializedChildren = children.toList()
+ for (child in materializedChildren) {
+ context.listener.descriptorRegistered(child)
+ }
+
+ supervisorScope {
+ for (child in materializedChildren) {
+ if (child.isTrial) {
+ launch {
+ val worker = context.scheduler.allocate()
+ context.listener.executionStarted(child)
+ try {
+ worker(child, context)
+ context.listener.executionFinished(child, ExperimentExecutionResult.Success)
+ } catch (e: Throwable) {
+ context.listener.executionFinished(child, ExperimentExecutionResult.Failed(e))
+ }
+ }
+ } else {
+ launch { child(context) }
+ }
+ }
+ }
+ }
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ExperimentDescriptor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ExperimentDescriptor.kt
new file mode 100644
index 00000000..64b6b767
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ExperimentDescriptor.kt
@@ -0,0 +1,81 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.runner
+
+import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionContext
+import java.io.Serializable
+
+/**
+ * An immutable description of an experiment in the **odcsim* simulation framework, which may be a single atomic trial
+ * or a composition of multiple trials.
+ *
+ * This class represents a dynamic tree-like structure where the children of the nodes are not known at instantiation
+ * since they might be generated dynamically.
+ */
+public abstract class ExperimentDescriptor : Serializable {
+ /**
+ * The parent of this descriptor, or `null` if it has no parent.
+ */
+ public abstract val parent: ExperimentDescriptor?
+
+ /**
+ * The type of descriptor.
+ */
+ abstract val type: Type
+
+ /**
+ * A flag to indicate that this descriptor is a root descriptor.
+ */
+ public open val isRoot: Boolean
+ get() = parent == null
+
+ /**
+ * A flag to indicate that this descriptor describes an experiment trial.
+ */
+ val isTrial: Boolean
+ get() = type == Type.TRIAL
+
+ /**
+ * Execute this [ExperimentDescriptor].
+ *
+ * @param context The context to execute the descriptor in.
+ */
+ public abstract suspend operator fun invoke(context: ExperimentExecutionContext)
+
+ /**
+ * The types of experiment descriptors.
+ */
+ enum class Type {
+ /**
+ * A composition of multiple experiment descriptions whose invocation happens on a single thread.
+ */
+ CONTAINER,
+
+ /**
+ * An invocation of a single scenario of an experiment whose invocation may happen on different threads.
+ */
+ TRIAL
+ }
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ExperimentRunner.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ExperimentRunner.kt
new file mode 100644
index 00000000..77f970fe
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ExperimentRunner.kt
@@ -0,0 +1,51 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.runner
+
+import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionListener
+
+/**
+ * An [ExperimentRunner] facilitates discovery and execution of experiments.
+ */
+public interface ExperimentRunner {
+ /**
+ * The unique identifier of this runner.
+ */
+ val id: String
+
+ /**
+ * The version of this runner.
+ */
+ val version: String?
+ get() = null
+
+ /**
+ * Execute the specified experiment represented as [ExperimentDescriptor].
+ *
+ * @param root The experiment to execute.
+ * @param listener The listener to report events to.
+ */
+ public fun execute(root: ExperimentDescriptor, listener: ExperimentExecutionListener)
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Run.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/TrialExperimentDescriptor.kt
index b2151b16..cf05416a 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Run.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/TrialExperimentDescriptor.kt
@@ -22,10 +22,11 @@
* SOFTWARE.
*/
-package com.atlarge.opendc.experiments.sc20
+package com.atlarge.opendc.experiments.sc20.runner
/**
- * An experiment run represent a single invocation of a trial and is used to distinguish between repetitions of the
- * same set of parameters.
+ * An abstract [ExperimentDescriptor] specifically for trials.
*/
-public data class Run(val scenario: Scenario, val id: Int, val seed: Int)
+public abstract class TrialExperimentDescriptor : ExperimentDescriptor() {
+ override val type: Type = Type.TRIAL
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionContext.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionContext.kt
new file mode 100644
index 00000000..9a04c491
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionContext.kt
@@ -0,0 +1,45 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.runner.execution
+
+/**
+ * The execution context of an experiment.
+ */
+public interface ExperimentExecutionContext {
+ /**
+ * The execution listener to use.
+ */
+ public val listener: ExperimentExecutionListener
+
+ /**
+ * The experiment scheduler to use.
+ */
+ public val scheduler: ExperimentScheduler
+
+ /**
+ * A cache for objects within a single runner.
+ */
+ public val cache: MutableMap<Any?, Any?>
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionListener.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionListener.kt
new file mode 100644
index 00000000..f6df0524
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionListener.kt
@@ -0,0 +1,48 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.runner.execution
+
+import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor
+
+/**
+ * Listener to be notified of experiment execution events by experiment runners.
+ */
+interface ExperimentExecutionListener {
+ /**
+ * A method that is invoked when a new [ExperimentDescriptor] is registered.
+ */
+ fun descriptorRegistered(descriptor: ExperimentDescriptor)
+
+ /**
+ * A method that is invoked when when the execution of a leaf or subtree of the experiment tree has finished,
+ * regardless of the outcome.
+ */
+ fun executionFinished(descriptor: ExperimentDescriptor, result: ExperimentExecutionResult)
+
+ /**
+ * A method that is invoked when the execution of a leaf or subtree of the experiment tree is about to be started.
+ */
+ fun executionStarted(descriptor: ExperimentDescriptor)
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporterProvider.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionResult.kt
index 8f42cdd4..057e1f92 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporterProvider.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionResult.kt
@@ -22,19 +22,21 @@
* SOFTWARE.
*/
-package com.atlarge.opendc.experiments.sc20.reporter
+package com.atlarge.opendc.experiments.sc20.runner.execution
-import java.io.Closeable
-import javax.sql.DataSource
+import java.io.Serializable
-interface ExperimentReporterProvider : Closeable {
+/**
+ * The result of executing an experiment.
+ */
+public sealed class ExperimentExecutionResult : Serializable {
/**
- * Initialize the provider with the specified data source.
+ * The experiment executed successfully
*/
- public fun init(ds: DataSource) {}
+ public object Success : ExperimentExecutionResult()
/**
- * Create a reporter for a single run.
+ * The experiment failed during execution.
*/
- public fun createReporter(scenario: Long, run: Int): ExperimentReporter
+ public data class Failed(val throwable: Throwable) : ExperimentExecutionResult()
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentScheduler.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentScheduler.kt
new file mode 100644
index 00000000..0346a7f8
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentScheduler.kt
@@ -0,0 +1,59 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.runner.execution
+
+import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor
+import java.io.Closeable
+
+/**
+ * A interface for scheduling the execution of experiment trials over compute resources (threads/containers/vms)
+ */
+interface ExperimentScheduler : Closeable {
+ /**
+ * Allocate a [Worker] for executing an experiment trial. This method may suspend in case no resources are directly
+ * available at the moment.
+ *
+ * @return The available worker.
+ */
+ suspend fun allocate(): ExperimentScheduler.Worker
+
+ /**
+ * An isolated worker of an [ExperimentScheduler] that is responsible for executing a single experiment trial.
+ */
+ interface Worker {
+ /**
+ * Dispatch the specified [ExperimentDescriptor] to execute some time in the future and return the results of
+ * the trial.
+ *
+ * @param descriptor The descriptor to execute.
+ * @param context The context to execute the descriptor in.
+ * @return The results of the experiment trial.
+ */
+ suspend operator fun invoke(
+ descriptor: ExperimentDescriptor,
+ context: ExperimentExecutionContext
+ ): ExperimentExecutionResult
+ }
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt
new file mode 100644
index 00000000..31632b8c
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt
@@ -0,0 +1,85 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.runner.execution
+
+import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor
+import kotlinx.coroutines.asCoroutineDispatcher
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.supervisorScope
+import kotlinx.coroutines.sync.Semaphore
+import kotlinx.coroutines.withContext
+import java.util.concurrent.Executors
+
+/**
+ * An [ExperimentScheduler] that runs experiments using a local thread pool.
+ *
+ * @param parallelism The maximum amount of parallel workers (default is the number of available processors).
+ */
+class ThreadPoolExperimentScheduler(parallelism: Int = Runtime.getRuntime().availableProcessors() + 1) : ExperimentScheduler {
+ private val dispatcher = Executors.newCachedThreadPool().asCoroutineDispatcher()
+ private val tickets = Semaphore(parallelism)
+
+ override suspend fun allocate(): ExperimentScheduler.Worker {
+ tickets.acquire()
+ return object : ExperimentScheduler.Worker {
+ override suspend fun invoke(
+ descriptor: ExperimentDescriptor,
+ context: ExperimentExecutionContext
+ ): ExperimentExecutionResult = supervisorScope {
+ val listener =
+ object : ExperimentExecutionListener {
+ override fun descriptorRegistered(descriptor: ExperimentDescriptor) {
+ launch { context.listener.descriptorRegistered(descriptor) }
+ }
+
+ override fun executionFinished(descriptor: ExperimentDescriptor, result: ExperimentExecutionResult) {
+ launch { context.listener.executionFinished(descriptor, result) }
+ }
+
+ override fun executionStarted(descriptor: ExperimentDescriptor) {
+ launch { context.listener.executionStarted(descriptor) }
+ }
+ }
+
+ val newContext = object : ExperimentExecutionContext by context {
+ override val listener: ExperimentExecutionListener = listener
+ }
+
+ try {
+ withContext(dispatcher) {
+ descriptor(newContext)
+ ExperimentExecutionResult.Success
+ }
+ } catch (e: Throwable) {
+ ExperimentExecutionResult.Failed(e)
+ } finally {
+ tickets.release()
+ }
+ }
+ }
+ }
+
+ override fun close() = dispatcher.close()
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt
new file mode 100644
index 00000000..3b80276f
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt
@@ -0,0 +1,62 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.runner.internal
+
+import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor
+import com.atlarge.opendc.experiments.sc20.runner.ExperimentRunner
+import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionContext
+import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionListener
+import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionResult
+import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentScheduler
+import kotlinx.coroutines.runBlocking
+import java.util.concurrent.ConcurrentHashMap
+
+/**
+ * The default implementation of the [ExperimentRunner] interface.
+ *
+ * @param scheduler The scheduler to use.
+ */
+public class DefaultExperimentRunner(val scheduler: ExperimentScheduler) : ExperimentRunner {
+ override val id: String = "default"
+
+ override val version: String? = "1.0"
+
+ override fun execute(root: ExperimentDescriptor, listener: ExperimentExecutionListener) = runBlocking {
+ val context = object : ExperimentExecutionContext {
+ override val listener: ExperimentExecutionListener = listener
+ override val scheduler: ExperimentScheduler = this@DefaultExperimentRunner.scheduler
+ override val cache: MutableMap<Any?, Any?> = ConcurrentHashMap()
+ }
+
+ listener.descriptorRegistered(root)
+ context.listener.executionStarted(root)
+ try {
+ root(context)
+ context.listener.executionFinished(root, ExperimentExecutionResult.Success)
+ } catch (e: Throwable) {
+ context.listener.executionFinished(root, ExperimentExecutionResult.Failed(e))
+ }
+ }
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolio.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/Event.kt
index 34505fce..c1e14e2a 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolio.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/Event.kt
@@ -22,14 +22,14 @@
* SOFTWARE.
*/
-package com.atlarge.opendc.experiments.sc20
+package com.atlarge.opendc.experiments.sc20.telemetry
/**
- * A portfolio represents a collection of scenarios are tested.
+ * An event that occurs within the system.
*/
-public abstract class Portfolio(val name: String) {
+public abstract class Event(val name: String) {
/**
- * The scenarios of this portfolio consists of.
+ * The time of occurrence of this event.
*/
- abstract val scenarios: Sequence<Scenario>
+ public abstract val timestamp: Long
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/HostMetrics.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/HostEvent.kt
index 061f6cce..8e91bca2 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/HostMetrics.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/HostEvent.kt
@@ -22,15 +22,15 @@
* SOFTWARE.
*/
-package com.atlarge.opendc.experiments.sc20.reporter
+package com.atlarge.opendc.experiments.sc20.telemetry
import com.atlarge.opendc.compute.core.Server
/**
* A periodic report of the host machine metrics.
*/
-data class HostMetrics(
- val time: Long,
+data class HostEvent(
+ override val timestamp: Long,
val duration: Long,
val host: Server,
val vmCount: Int,
@@ -41,4 +41,4 @@ data class HostMetrics(
val cpuUsage: Double,
val cpuDemand: Double,
val powerDraw: Double
-)
+) : Event("host-metrics")
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ProvisionerMetrics.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/ProvisionerEvent.kt
index 966662cd..df619632 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ProvisionerMetrics.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/ProvisionerEvent.kt
@@ -22,13 +22,13 @@
* SOFTWARE.
*/
-package com.atlarge.opendc.experiments.sc20.reporter
+package com.atlarge.opendc.experiments.sc20.telemetry
/**
* A periodic report of the provisioner's metrics.
*/
-data class ProvisionerMetrics(
- val time: Long,
+data class ProvisionerEvent(
+ override val timestamp: Long,
val totalHostCount: Int,
val availableHostCount: Int,
val totalVmCount: Int,
@@ -36,4 +36,4 @@ data class ProvisionerMetrics(
val inactiveVmCount: Int,
val waitingVmCount: Int,
val failedVmCount: Int
-)
+) : Event("provisioner-metrics")
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/RunEvent.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/RunEvent.kt
new file mode 100644
index 00000000..497d2c3f
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/RunEvent.kt
@@ -0,0 +1,35 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.telemetry
+
+import com.atlarge.opendc.experiments.sc20.experiment.Run
+
+/**
+ * A periodic report of the host machine metrics.
+ */
+data class RunEvent(
+ val run: Run,
+ override val timestamp: Long
+) : Event("run")
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/VmMetrics.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/VmEvent.kt
index 5f963206..7289fb21 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/VmMetrics.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/VmEvent.kt
@@ -22,15 +22,15 @@
* SOFTWARE.
*/
-package com.atlarge.opendc.experiments.sc20.reporter
+package com.atlarge.opendc.experiments.sc20.telemetry
import com.atlarge.opendc.compute.core.Server
/**
* A periodic report of a virtual machine's metrics.
*/
-data class VmMetrics(
- val time: Long,
+data class VmEvent(
+ override val timestamp: Long,
val duration: Long,
val vm: Server,
val host: Server,
@@ -40,4 +40,4 @@ data class VmMetrics(
val interferedBurst: Long,
val cpuUsage: Double,
val cpuDemand: Double
-)
+) : Event("vm-metrics")
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetMetricsWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt
index e82e9e47..a69bd4b2 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetMetricsWriter.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt
@@ -22,8 +22,9 @@
* SOFTWARE.
*/
-package com.atlarge.opendc.experiments.sc20.reporter
+package com.atlarge.opendc.experiments.sc20.telemetry.parquet
+import com.atlarge.opendc.experiments.sc20.telemetry.Event
import mu.KotlinLogging
import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
@@ -36,24 +37,35 @@ import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.BlockingQueue
import kotlin.concurrent.thread
+/**
+ * The logging instance to use.
+ */
private val logger = KotlinLogging.logger {}
-public abstract class ParquetMetricsWriter<T>(
+/**
+ * A writer that writes events in Parquet format.
+ */
+public open class ParquetEventWriter<in T : Event>(
private val path: File,
private val schema: Schema,
+ private val converter: (T, GenericData.Record) -> Unit,
private val bufferSize: Int = 4096
) : Runnable, Closeable {
/**
* The queue of commands to process.
*/
private val queue: BlockingQueue<Action> = ArrayBlockingQueue(bufferSize)
+
+ /**
+ * The thread that is responsible for writing the Parquet records.
+ */
private val writerThread = thread(start = true, name = "parquet-writer") { run() }
/**
* Write the specified metrics to the database.
*/
- public fun write(scenario: Long, run: Int, metrics: T) {
- queue.put(Action.Write(scenario, run, metrics))
+ public fun write(event: T) {
+ queue.put(Action.Write(event))
}
/**
@@ -65,11 +77,6 @@ public abstract class ParquetMetricsWriter<T>(
}
/**
- * Persist the specified metrics to the given [row].
- */
- public abstract fun persist(action: Action.Write<T>, row: GenericData.Record)
-
- /**
* Start the writer thread.
*/
override fun run() {
@@ -88,7 +95,7 @@ public abstract class ParquetMetricsWriter<T>(
is Action.Write<*> -> {
val record = GenericData.Record(schema)
@Suppress("UNCHECKED_CAST")
- persist(action as Action.Write<T>, record)
+ converter(action.event as T, record)
writer.write(record)
}
}
@@ -109,6 +116,6 @@ public abstract class ParquetMetricsWriter<T>(
/**
* Write the specified metrics to the database.
*/
- data class Write<out T>(val scenario: Long, val run: Int, val metrics: T) : Action()
+ data class Write<out T : Event>(val event: T) : Action()
}
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt
new file mode 100644
index 00000000..7e5ad911
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt
@@ -0,0 +1,81 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.telemetry.parquet
+
+import com.atlarge.opendc.experiments.sc20.telemetry.HostEvent
+import org.apache.avro.Schema
+import org.apache.avro.SchemaBuilder
+import org.apache.avro.generic.GenericData
+import java.io.File
+
+/**
+ * A Parquet event writer for [HostEvent]s.
+ */
+public class ParquetHostEventWriter(path: File, bufferSize: Int) :
+ ParquetEventWriter<HostEvent>(path, schema, convert, bufferSize) {
+
+ override fun toString(): String = "host-writer"
+
+ companion object {
+ val convert: (HostEvent, GenericData.Record) -> Unit = { event, record ->
+ // record.put("portfolio_id", event.run.parent.parent.id)
+ // record.put("scenario_id", event.run.parent.id)
+ // record.put("run_id", event.run.id)
+ record.put("host_id", event.host.name)
+ record.put("state", event.host.state.name)
+ record.put("timestamp", event.timestamp)
+ record.put("duration", event.duration)
+ record.put("vm_count", event.vmCount)
+ record.put("requested_burst", event.requestedBurst)
+ record.put("granted_burst", event.grantedBurst)
+ record.put("overcommissioned_burst", event.overcommissionedBurst)
+ record.put("interfered_burst", event.interferedBurst)
+ record.put("cpu_usage", event.cpuUsage)
+ record.put("cpu_demand", event.cpuDemand)
+ record.put("power_draw", event.powerDraw)
+ }
+
+ val schema: Schema = SchemaBuilder
+ .record("host_metrics")
+ .namespace("com.atlarge.opendc.experiments.sc20")
+ .fields()
+ // .name("portfolio_id").type().intType().noDefault()
+ // .name("scenario_id").type().intType().noDefault()
+ // .name("run_id").type().intType().noDefault()
+ .name("timestamp").type().longType().noDefault()
+ .name("duration").type().longType().noDefault()
+ .name("host_id").type().stringType().noDefault()
+ .name("state").type().stringType().noDefault()
+ .name("vm_count").type().intType().noDefault()
+ .name("requested_burst").type().longType().noDefault()
+ .name("granted_burst").type().longType().noDefault()
+ .name("overcommissioned_burst").type().longType().noDefault()
+ .name("interfered_burst").type().longType().noDefault()
+ .name("cpu_usage").type().doubleType().noDefault()
+ .name("cpu_demand").type().doubleType().noDefault()
+ .name("power_draw").type().doubleType().noDefault()
+ .endRecord()
+ }
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt
new file mode 100644
index 00000000..1f3b0472
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt
@@ -0,0 +1,67 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.telemetry.parquet
+
+import com.atlarge.opendc.experiments.sc20.telemetry.ProvisionerEvent
+import org.apache.avro.Schema
+import org.apache.avro.SchemaBuilder
+import org.apache.avro.generic.GenericData
+import java.io.File
+
+/**
+ * A Parquet event writer for [ProvisionerEvent]s.
+ */
+public class ParquetProvisionerEventWriter(path: File, bufferSize: Int) :
+ ParquetEventWriter<ProvisionerEvent>(path, schema, convert, bufferSize) {
+
+ override fun toString(): String = "provisioner-writer"
+
+ companion object {
+ val convert: (ProvisionerEvent, GenericData.Record) -> Unit = { event, record ->
+ record.put("timestamp", event.timestamp)
+ record.put("host_total_count", event.totalHostCount)
+ record.put("host_available_count", event.availableHostCount)
+ record.put("vm_total_count", event.totalVmCount)
+ record.put("vm_active_count", event.activeVmCount)
+ record.put("vm_inactive_count", event.inactiveVmCount)
+ record.put("vm_waiting_count", event.waitingVmCount)
+ record.put("vm_failed_count", event.failedVmCount)
+ }
+
+ val schema: Schema = SchemaBuilder
+ .record("provisioner_metrics")
+ .namespace("com.atlarge.opendc.experiments.sc20")
+ .fields()
+ .name("timestamp").type().longType().noDefault()
+ .name("host_total_count").type().intType().noDefault()
+ .name("host_available_count").type().intType().noDefault()
+ .name("vm_total_count").type().intType().noDefault()
+ .name("vm_active_count").type().intType().noDefault()
+ .name("vm_inactive_count").type().intType().noDefault()
+ .name("vm_waiting_count").type().intType().noDefault()
+ .name("vm_failed_count").type().intType().noDefault()
+ .endRecord()
+ }
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt
new file mode 100644
index 00000000..1549b8d2
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt
@@ -0,0 +1,78 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.telemetry.parquet
+
+import com.atlarge.opendc.experiments.sc20.telemetry.RunEvent
+import org.apache.avro.Schema
+import org.apache.avro.SchemaBuilder
+import org.apache.avro.generic.GenericData
+import java.io.File
+
+/**
+ * A Parquet event writer for [RunEvent]s.
+ */
+public class ParquetRunEventWriter(path: File, bufferSize: Int) :
+ ParquetEventWriter<RunEvent>(path, schema, convert, bufferSize) {
+
+ override fun toString(): String = "run-writer"
+
+ companion object {
+ val convert: (RunEvent, GenericData.Record) -> Unit = { event, record ->
+ val run = event.run
+ val scenario = run.parent
+ val portfolio = scenario.parent
+ record.put("portfolio_id", portfolio.id)
+ record.put("portfolio_name", portfolio.name)
+ record.put("scenario_id", scenario.id)
+ record.put("run_id", run.id)
+ record.put("repetitions", scenario.repetitions)
+ record.put("topology", scenario.topology.name)
+ record.put("workload_name", scenario.workload.name)
+ record.put("workload_fraction", scenario.workload.fraction)
+ record.put("allocation_policy", scenario.allocationPolicy)
+ record.put("failure_frequency", scenario.operationalPhenomena.failureFrequency)
+ record.put("interference", scenario.operationalPhenomena.hasInterference)
+ record.put("seed", run.seed)
+ }
+
+ val schema: Schema = SchemaBuilder
+ .record("runs")
+ .namespace("com.atlarge.opendc.experiments.sc20")
+ .fields()
+ .name("portfolio_id").type().intType().noDefault()
+ .name("portfolio_name").type().stringType().noDefault()
+ .name("scenario_id").type().intType().noDefault()
+ .name("run_id").type().intType().noDefault()
+ .name("repetitions").type().intType().noDefault()
+ .name("topology").type().stringType().noDefault()
+ .name("workload_name").type().stringType().noDefault()
+ .name("workload_fraction").type().doubleType().noDefault()
+ .name("allocation_policy").type().stringType().noDefault()
+ .name("failure_frequency").type().doubleType().noDefault()
+ .name("interference").type().booleanType().noDefault()
+ .name("seed").type().intType().noDefault()
+ .endRecord()
+ }
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt
index 28026fde..96b6b426 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt
@@ -28,8 +28,7 @@ import com.atlarge.opendc.compute.core.image.VmImage
import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL
import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
import com.atlarge.opendc.compute.core.workload.VmWorkload
-import com.atlarge.opendc.experiments.sc20.Run
-import com.atlarge.opendc.experiments.sc20.sampleWorkload
+import com.atlarge.opendc.experiments.sc20.experiment.Run
import com.atlarge.opendc.format.trace.TraceEntry
import com.atlarge.opendc.format.trace.TraceReader
import kotlin.random.Random
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/WorkloadSampler.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt
index 99634e1b..e03c59bc 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/WorkloadSampler.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt
@@ -22,9 +22,10 @@
* SOFTWARE.
*/
-package com.atlarge.opendc.experiments.sc20
+package com.atlarge.opendc.experiments.sc20.trace
import com.atlarge.opendc.compute.core.workload.VmWorkload
+import com.atlarge.opendc.experiments.sc20.experiment.Run
import com.atlarge.opendc.format.trace.TraceEntry
import mu.KotlinLogging
import kotlin.random.Random
@@ -42,7 +43,7 @@ fun sampleWorkload(trace: List<TraceEntry<VmWorkload>>, run: Run): List<TraceEnt
* Sample a regular (non-HPC) workload.
*/
fun sampleRegularWorkload(trace: List<TraceEntry<VmWorkload>>, run: Run): List<TraceEntry<VmWorkload>> {
- val fraction = run.scenario.workload.fraction
+ val fraction = run.parent.workload.fraction
if (fraction >= 1) {
return trace
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/util/DatabaseHelper.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/util/DatabaseHelper.kt
deleted file mode 100644
index 4fcfdb6b..00000000
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/util/DatabaseHelper.kt
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2020 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package com.atlarge.opendc.experiments.sc20.util
-
-import com.atlarge.opendc.experiments.sc20.Portfolio
-import com.atlarge.opendc.experiments.sc20.Run
-import com.atlarge.opendc.experiments.sc20.Scenario
-import java.io.Closeable
-import java.sql.Connection
-import java.sql.Statement
-
-/**
- * A helper class for writing to the database.
- */
-class DatabaseHelper(val conn: Connection) : Closeable {
- /**
- * Prepared statement to create experiment.
- */
- private val createExperiment = conn.prepareStatement("INSERT INTO experiments DEFAULT VALUES", arrayOf("id"))
-
- /**
- * Prepared statement for creating a portfolio.
- */
- private val createPortfolio = conn.prepareStatement("INSERT INTO portfolios (experiment_id, name) VALUES (?, ?)", arrayOf("id"))
-
- /**
- * Prepared statement for creating a scenario
- */
- private val createScenario = conn.prepareStatement("INSERT INTO scenarios (portfolio_id, repetitions, topology, workload_name, workload_fraction, allocation_policy, failure_frequency, interference) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", arrayOf("id"))
-
- /**
- * Prepared statement for creating a run.
- */
- private val createRun = conn.prepareStatement("INSERT INTO runs (id, scenario_id, seed) VALUES (?, ?, ?)", arrayOf("id"))
-
- /**
- * Prepared statement for starting a run.
- */
- private val startRun = conn.prepareStatement("UPDATE runs SET state = 'active'::run_state,start_time = now() WHERE id = ? AND scenario_id = ?")
-
- /**
- * Prepared statement for finishing a run.
- */
- private val finishRun = conn.prepareStatement("UPDATE runs SET state = ?::run_state,end_time = now() WHERE id = ? AND scenario_id = ?")
-
- /**
- * Create a new experiment and return its id.
- */
- fun createExperiment(): Long {
- val affectedRows = createExperiment.executeUpdate()
- check(affectedRows != 0)
- return createExperiment.latestId
- }
-
- /**
- * Persist a [Portfolio] and return its id.
- */
- fun persist(portfolio: Portfolio, experimentId: Long): Long {
- createPortfolio.setLong(1, experimentId)
- createPortfolio.setString(2, portfolio.name)
-
- val affectedRows = createPortfolio.executeUpdate()
- check(affectedRows != 0)
- return createPortfolio.latestId
- }
-
- /**
- * Persist a [Scenario] and return its id.
- */
- fun persist(scenario: Scenario, portfolioId: Long): Long {
- createScenario.setLong(1, portfolioId)
- createScenario.setInt(2, scenario.repetitions)
- createScenario.setString(3, scenario.topology.name)
- createScenario.setString(4, scenario.workload.name)
- createScenario.setDouble(5, scenario.workload.fraction)
- createScenario.setString(6, scenario.allocationPolicy)
- createScenario.setDouble(7, scenario.failureFrequency)
- createScenario.setBoolean(8, scenario.hasInterference)
-
- val affectedRows = createScenario.executeUpdate()
- check(affectedRows != 0)
- return createScenario.latestId
- }
-
- /**
- * Persist a [Run] and return its id.
- */
- fun persist(run: Run, scenarioId: Long): Int {
- createRun.setInt(1, run.id)
- createRun.setLong(2, scenarioId)
- createRun.setInt(3, run.seed)
-
- val affectedRows = createRun.executeUpdate()
- check(affectedRows != 0)
- return createRun.latestId.toInt()
- }
-
- /**
- * Start run.
- */
- fun startRun(scenario: Long, run: Int) {
- startRun.setInt(1, run)
- startRun.setLong(2, scenario)
-
- val affectedRows = startRun.executeUpdate()
- check(affectedRows != 0)
- }
-
- /**
- * Finish a run.
- */
- fun finishRun(scenario: Long, run: Int, hasFailed: Boolean) {
- finishRun.setString(1, if (hasFailed) "fail" else "ok")
- finishRun.setInt(2, run)
- finishRun.setLong(3, scenario)
-
- val affectedRows = finishRun.executeUpdate()
- check(affectedRows != 0)
- }
-
- /**
- * Obtain the latest identifier of the specified statement.
- */
- private val Statement.latestId: Long
- get() {
- val rs = generatedKeys
- return try {
- check(rs.next())
- rs.getLong(1)
- } finally {
- rs.close()
- }
- }
-
- override fun close() {
- conn.close()
- }
-}
diff --git a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
index fd617115..ae3d6db1 100644
--- a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
+++ b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
@@ -31,7 +31,12 @@ import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.workload.VmWorkload
import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService
import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy
-import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporter
+import com.atlarge.opendc.experiments.sc20.experiment.attachMonitor
+import com.atlarge.opendc.experiments.sc20.experiment.createFailureDomain
+import com.atlarge.opendc.experiments.sc20.experiment.createProvisioner
+import com.atlarge.opendc.experiments.sc20.experiment.createTraceReader
+import com.atlarge.opendc.experiments.sc20.experiment.monitor.ExperimentMonitor
+import com.atlarge.opendc.experiments.sc20.experiment.processTrace
import com.atlarge.opendc.format.environment.EnvironmentReader
import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
import com.atlarge.opendc.format.trace.TraceReader
@@ -96,19 +101,33 @@ class Sc20IntegrationTest {
lateinit var scheduler: SimpleVirtProvisioningService
root.launch {
- val res = createProvisioner(root, environmentReader, allocationPolicy)
+ val res = createProvisioner(
+ root,
+ environmentReader,
+ allocationPolicy
+ )
val bareMetalProvisioner = res.first
scheduler = res.second
val failureDomain = if (failures) {
println("ENABLING failures")
- createFailureDomain(seed, 24.0 * 7, bareMetalProvisioner, chan)
+ createFailureDomain(
+ seed,
+ 24.0 * 7,
+ bareMetalProvisioner,
+ chan
+ )
} else {
null
}
attachMonitor(scheduler, monitor)
- processTrace(traceReader, scheduler, chan, monitor)
+ processTrace(
+ traceReader,
+ scheduler,
+ chan,
+ monitor
+ )
println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}")
@@ -141,7 +160,12 @@ class Sc20IntegrationTest {
val performanceInterferenceStream = object {}.javaClass.getResourceAsStream("/env/performance-interference.json")
val performanceInterferenceModel = Sc20PerformanceInterferenceReader(performanceInterferenceStream)
.construct()
- return createTraceReader(File("src/test/resources/trace"), performanceInterferenceModel, emptyList(), 0)
+ return createTraceReader(
+ File("src/test/resources/trace"),
+ performanceInterferenceModel,
+ emptyList(),
+ 0
+ )
}
/**
@@ -152,7 +176,7 @@ class Sc20IntegrationTest {
return Sc20ClusterEnvironmentReader(stream)
}
- class TestExperimentReporter : ExperimentReporter {
+ class TestExperimentReporter : ExperimentMonitor {
var totalRequestedBurst = 0L
var totalGrantedBurst = 0L
var totalOvercommissionedBurst = 0L