summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-05-13 01:28:18 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-05-13 01:28:18 +0200
commit924179b45b4e7e1ac848fd852fe39d927ca0d85a (patch)
tree1c9fe231922a5df7e7205fdc7e3f00b996fb60f8
parent24fd4828d6798c19476543fa16df87d45811b54e (diff)
feat: Add experiment orchestrator in Kotlin
-rw-r--r--opendc/opendc-experiments-sc20/build.gradle.kts1
-rw-r--r--opendc/opendc-experiments-sc20/schema.sql75
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt202
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt219
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolio.kt35
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt163
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Run.kt31
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Scenario.kt129
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Topology.kt30
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Workload.kt30
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporterProvider.kt31
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt (renamed from opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt)0
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/util/DatabaseHelper.kt160
13 files changed, 895 insertions, 211 deletions
diff --git a/opendc/opendc-experiments-sc20/build.gradle.kts b/opendc/opendc-experiments-sc20/build.gradle.kts
index df291039..b7440792 100644
--- a/opendc/opendc-experiments-sc20/build.gradle.kts
+++ b/opendc/opendc-experiments-sc20/build.gradle.kts
@@ -46,6 +46,7 @@ dependencies {
exclude(group = "org.slf4j", module = "slf4j-log4j12")
exclude(group = "log4j")
}
+ implementation("com.zaxxer:HikariCP:3.4.5")
runtimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:2.13.1")
runtimeOnly("org.postgresql:postgresql:42.2.12")
runtimeOnly(project(":odcsim:odcsim-engine-omega"))
diff --git a/opendc/opendc-experiments-sc20/schema.sql b/opendc/opendc-experiments-sc20/schema.sql
index 677240fa..515348c6 100644
--- a/opendc/opendc-experiments-sc20/schema.sql
+++ b/opendc/opendc-experiments-sc20/schema.sql
@@ -1,13 +1,26 @@
--- A portfolio represents a collection of scenarios are tested.
-DROP TABLE IF EXISTS portfolios;
+-- An experiment represents a collection of portfolios.
+DROP TABLE IF EXISTS experiments CASCADE;
+CREATE TABLE experiments
+(
+ id BIGSERIAL PRIMARY KEY NOT NULL,
+ creation_time TIMESTAMP NOT NULL DEFAULT (now())
+);
+
+-- A portfolio represents a collection of scenarios tested.
+DROP TABLE IF EXISTS portfolios CASCADE;
CREATE TABLE portfolios
(
- id BIGSERIAL PRIMARY KEY NOT NULL,
- name TEXT NOT NULL
+ id BIGSERIAL PRIMARY KEY NOT NULL,
+ experiment_id BIGINT NOT NULL,
+ name TEXT NOT NULL,
+
+ FOREIGN KEY (experiment_id) REFERENCES experiments (id)
+ ON DELETE CASCADE
+ ON UPDATE CASCADE
);
-- A scenario represents a single point in the design space (a unique combination of parameters)
-DROP TABLE IF EXISTS scenarios;
+DROP TABLE IF EXISTS scenarios CASCADE;
CREATE TABLE scenarios
(
id BIGSERIAL PRIMARY KEY NOT NULL,
@@ -17,27 +30,30 @@ CREATE TABLE scenarios
workload_name TEXT NOT NULL,
workload_fraction DOUBLE PRECISION NOT NULL,
allocation_policy TEXT NOT NULL,
- failures BIT NOT NULL,
- interference BIT NOT NULL,
+ failures BOOLEAN NOT NULL,
+ interference BOOLEAN NOT NULL,
FOREIGN KEY (portfolio_id) REFERENCES portfolios (id)
ON DELETE CASCADE
ON UPDATE CASCADE
+
);
+DROP TYPE IF EXISTS run_state CASCADE;
CREATE TYPE run_state AS ENUM ('wait', 'active', 'fail', 'ok');
-- An experiment run represent a single invocation of a trial and is used to distinguish between repetitions of the
-- same set of parameters.
-DROP TABLE IF EXISTS runs;
+DROP TABLE IF EXISTS runs CASCADE;
CREATE TABLE runs
(
- id INTEGER NOT NULL,
- scenario_id BIGINT NOT NULL,
- seed INTEGER NOT NULL,
- state run_state NOT NULL DEFAULT 'wait'::run_state,
- start_time TIMESTAMP NOT NULL,
- end_time TIMESTAMP,
+ id INTEGER NOT NULL,
+ scenario_id BIGINT NOT NULL,
+ seed INTEGER NOT NULL,
+ state run_state NOT NULL DEFAULT 'wait'::run_state,
+ submit_time TIMESTAMP NOT NULL DEFAULT (now()),
+ start_time TIMESTAMP DEFAULT (NULL),
+ end_time TIMESTAMP DEFAULT (NULL),
PRIMARY KEY (scenario_id, id),
FOREIGN KEY (scenario_id) REFERENCES scenarios (id)
@@ -46,7 +62,7 @@ CREATE TABLE runs
);
-- Metrics of the hypervisors reported per slice
-DROP TABLE IF EXISTS host_metrics;
+DROP TABLE IF EXISTS host_metrics CASCADE;
CREATE TABLE host_metrics
(
id BIGSERIAL PRIMARY KEY NOT NULL,
@@ -70,10 +86,11 @@ CREATE TABLE host_metrics
ON UPDATE CASCADE
);
+DROP INDEX IF EXISTS host_metrics_idx;
CREATE INDEX host_metrics_idx ON host_metrics (scenario_id, run_id, timestamp, host_id);
-- Metrics of the VMs reported per slice
-DROP TABLE IF EXISTS vm_metrics;
+DROP TABLE IF EXISTS vm_metrics CASCADE;
CREATE TABLE vm_metrics
(
id BIGSERIAL PRIMARY KEY NOT NULL,
@@ -96,27 +113,29 @@ CREATE TABLE vm_metrics
ON UPDATE CASCADE
);
+DROP INDEX IF EXISTS vm_metrics_idx;
CREATE INDEX vm_metrics_idx ON vm_metrics (scenario_id, run_id, timestamp, vm_id);
-- Metrics of the provisioner reported per change
-DROP TABLE IF EXISTS provisioner_metrics;
+DROP TABLE IF EXISTS provisioner_metrics CASCADE;
CREATE TABLE provisioner_metrics
(
- id BIGSERIAL PRIMARY KEY NOT NULL,
- scenario_id BIGINT NOT NULL,
- run_id INTEGER NOT NULL,
- timestamp TIMESTAMP NOT NULL,
- host_total_count INTEGER NOT NULL,
- host_available_count INTEGER NOT NULL,
- vm_total_count INTEGER NOT NULL,
- vm_active_count INTEGER NOT NULL,
- vm_inactive_count INTEGER NOT NULL,
- vm_waiting_count INTEGER NOT NULL,
- vm_failed_count INTEGER NOT NULL,
+ id BIGSERIAL PRIMARY KEY NOT NULL,
+ scenario_id BIGINT NOT NULL,
+ run_id INTEGER NOT NULL,
+ timestamp TIMESTAMP NOT NULL,
+ host_total_count INTEGER NOT NULL,
+ host_available_count INTEGER NOT NULL,
+ vm_total_count INTEGER NOT NULL,
+ vm_active_count INTEGER NOT NULL,
+ vm_inactive_count INTEGER NOT NULL,
+ vm_waiting_count INTEGER NOT NULL,
+ vm_failed_count INTEGER NOT NULL,
FOREIGN KEY (scenario_id, run_id) REFERENCES runs (scenario_id, id)
ON DELETE CASCADE
ON UPDATE CASCADE
);
+DROP INDEX IF EXISTS provisioner_metrics_idx;
CREATE INDEX provisioner_metrics_idx ON provisioner_metrics (scenario_id, run_id, timestamp);
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt
new file mode 100644
index 00000000..a9429367
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt
@@ -0,0 +1,202 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20
+
+import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
+import com.atlarge.opendc.compute.core.workload.VmWorkload
+import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporterProvider
+import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader
+import com.atlarge.opendc.experiments.sc20.util.DatabaseHelper
+import com.atlarge.opendc.format.environment.EnvironmentReader
+import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
+import com.atlarge.opendc.format.trace.TraceReader
+import kotlinx.coroutines.CoroutineDispatcher
+import kotlinx.coroutines.asCoroutineDispatcher
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.withContext
+import mu.KotlinLogging
+import java.io.Closeable
+import java.io.File
+import java.util.concurrent.Executors
+import java.util.concurrent.atomic.AtomicInteger
+import javax.sql.DataSource
+import kotlin.random.Random
+import kotlin.system.measureTimeMillis
+
+/**
+ * The logger for the experiment runner.
+ */
+private val logger = KotlinLogging.logger {}
+
+/**
+ * The experiment runner is responsible for orchestrating the simulation runs of an experiment.
+ *
+ * @param portfolios The portfolios to consider.
+ * @param ds The data source to write the experimental results to.
+ */
+public class ExperimentRunner(
+ private val portfolios: List<Portfolio>,
+ private val ds: DataSource,
+ private val reporterProvider: ExperimentReporterProvider,
+ private val environmentPath: File,
+ private val tracePath: File,
+ private val performanceInterferenceModel: PerformanceInterferenceModel
+) : Closeable {
+ /**
+ * The database helper to write the execution plan.
+ */
+ private val helper = DatabaseHelper(ds.connection)
+
+ /**
+ * The experiment identifier.
+ */
+ private var experimentId = -1L
+
+ /**
+ * The mapping of portfolios to their ids.
+ */
+ private val portfolioIds = mutableMapOf<Portfolio, Long>()
+
+ /**
+ * The mapping of scenarios to their ids.
+ */
+ private val scenarioIds = mutableMapOf<Scenario, Long>()
+
+ /**
+ * Create an execution plan
+ */
+ private fun createPlan(): List<Run> {
+ val runs = mutableListOf<Run>()
+
+ for (portfolio in portfolios) {
+ val portfolioId = helper.persist(portfolio, experimentId)
+ portfolioIds[portfolio] = portfolioId
+ var scenarios = 0
+ var runCount = 0
+
+ for (scenario in portfolio.scenarios) {
+ val scenarioId = helper.persist(scenario, portfolioId)
+ scenarioIds[scenario] = scenarioId
+ scenarios++
+
+ for (run in scenario.runs) {
+ helper.persist(run, scenarioId)
+ runCount++
+ runs.add(run)
+ }
+ }
+
+ logger.info { "Portfolio $portfolioId: ${portfolio.name} ($scenarios scenarios, $runCount runs total)" }
+ }
+
+ return runs
+ }
+
+ /**
+ * Create a trace reader for the specified trace.
+ */
+ private fun createTraceReader(
+ name: String,
+ performanceInterferenceModel: PerformanceInterferenceModel,
+ seed: Int
+ ): TraceReader<VmWorkload> {
+ return Sc20ParquetTraceReader(
+ File(tracePath, name),
+ performanceInterferenceModel,
+ emptyList(),
+ Random(seed)
+ )
+ }
+
+ /**
+ * Create the environment reader for the specified environment.
+ */
+ private fun createEnvironmentReader(name: String): EnvironmentReader {
+ return Sc20ClusterEnvironmentReader(File(environmentPath, "$name.txt"))
+ }
+
+ /**
+ * Run the specified run.
+ */
+ private fun run(run: Run) {
+ val reporter = reporterProvider.createReporter(ds, experimentId)
+ val traceReader = createTraceReader(run.scenario.workload.name, performanceInterferenceModel, run.seed)
+ val environmentReader = createEnvironmentReader(run.scenario.topology.name)
+ run.scenario(run, reporter, environmentReader, traceReader)
+ }
+
+ /**
+ * Run the portfolios.
+ */
+ @OptIn(ExperimentalStdlibApi::class)
+ public fun run() {
+ experimentId = helper.createExperiment()
+ logger.info { "Creating execution plan for experiment $experimentId" }
+
+ val plan = createPlan()
+ val total = plan.size
+ val finished = AtomicInteger()
+ val dispatcher = Executors.newWorkStealingPool().asCoroutineDispatcher()
+
+ runBlocking {
+ val mainDispatcher = coroutineContext[CoroutineDispatcher.Key]!!
+ for (run in plan) {
+ val scenarioId = scenarioIds[run.scenario]!!
+ launch(dispatcher) {
+ launch(mainDispatcher) {
+ helper.startRun(scenarioId, run.id)
+ }
+
+ logger.info { "[${finished.get()}/$total] Starting run ($scenarioId, ${run.id})" }
+
+ try {
+
+ val duration = measureTimeMillis {
+ run(run)
+ }
+
+ finished.incrementAndGet()
+ logger.info { "[${finished.get()}/$total] Finished run ($scenarioId, ${run.id}) in $duration milliseconds" }
+
+ withContext(mainDispatcher) {
+ helper.finishRun(scenarioId, run.id, hasFailed = false)
+ }
+ } catch (e: Throwable) {
+ logger.error("A run has failed", e)
+ finished.incrementAndGet()
+ withContext(mainDispatcher) {
+ helper.finishRun(scenarioId, run.id, hasFailed = true)
+ }
+ }
+ }
+ }
+ }
+ }
+
+ override fun close() {
+ helper.close()
+ }
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt
index b2fbba39..80a91dec 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt
@@ -24,52 +24,27 @@
package com.atlarge.opendc.experiments.sc20
-import com.atlarge.odcsim.SimulationEngineProvider
-import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
-import com.atlarge.opendc.compute.core.workload.VmWorkload
-import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy
-import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy
-import com.atlarge.opendc.compute.virt.service.allocation.NumberOfActiveServersAllocationPolicy
-import com.atlarge.opendc.compute.virt.service.allocation.ProvisionedCoresAllocationPolicy
-import com.atlarge.opendc.compute.virt.service.allocation.RandomAllocationPolicy
-import com.atlarge.opendc.compute.virt.service.allocation.ReplayAllocationPolicy
import com.atlarge.opendc.experiments.sc20.reporter.ExperimentParquetReporter
import com.atlarge.opendc.experiments.sc20.reporter.ExperimentPostgresReporter
import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporter
-import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader
-import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
-import com.atlarge.opendc.format.trace.TraceReader
+import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporterProvider
import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader
import com.atlarge.opendc.format.trace.sc20.Sc20VmPlacementReader
-import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
-import com.fasterxml.jackson.module.kotlin.readValue
import com.github.ajalt.clikt.core.CliktCommand
import com.github.ajalt.clikt.parameters.groups.OptionGroup
-import com.github.ajalt.clikt.parameters.groups.default
import com.github.ajalt.clikt.parameters.groups.groupChoice
-import com.github.ajalt.clikt.parameters.groups.mutuallyExclusiveOptions
import com.github.ajalt.clikt.parameters.groups.required
import com.github.ajalt.clikt.parameters.options.convert
import com.github.ajalt.clikt.parameters.options.default
import com.github.ajalt.clikt.parameters.options.defaultLazy
-import com.github.ajalt.clikt.parameters.options.flag
import com.github.ajalt.clikt.parameters.options.option
import com.github.ajalt.clikt.parameters.options.required
-import com.github.ajalt.clikt.parameters.types.choice
import com.github.ajalt.clikt.parameters.types.file
-import com.github.ajalt.clikt.parameters.types.int
-import com.github.ajalt.clikt.parameters.types.long
-import kotlinx.coroutines.cancel
-import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.runBlocking
+import com.zaxxer.hikari.HikariDataSource
import mu.KotlinLogging
import java.io.File
-import java.io.FileReader
import java.io.InputStream
-import java.sql.DriverManager
-import java.util.ServiceLoader
-import kotlin.random.Random
+import javax.sql.DataSource
/**
* The logger for this experiment.
@@ -80,10 +55,15 @@ private val logger = KotlinLogging.logger {}
* Represents the command for running the experiment.
*/
class ExperimentCli : CliktCommand(name = "sc20-experiment") {
- private val environment by option("--environment-file", help = "path to the environment file")
- .file()
+ private val environmentPath by option("--environment-path", help = "path to the environment directory")
+ .file(canBeFile = false)
+ .required()
+
+ private val tracePath by option("--trace-path", help = "path to the traces directory")
+ .file(canBeFile = false)
.required()
- private val performanceInterferenceStream by option("--performance-interference-file", help = "path to the performance interference file")
+
+ private val performanceInterferenceStream by option("--performance-interference-model", help = "path to the performance interference file")
.file()
.convert { it.inputStream() as InputStream }
.defaultLazy { ExperimentCli::class.java.getResourceAsStream("/env/performance-interference.json") }
@@ -95,180 +75,53 @@ class ExperimentCli : CliktCommand(name = "sc20-experiment") {
}
.default(emptyMap())
- private val selectedVms by mutuallyExclusiveOptions(
- option("--selected-vms", help = "the VMs to run").convert { parseVMs(it) },
- option("--selected-vms-file").file().convert { parseVMs(FileReader(it).readText()) }
- ).default(emptyList())
-
- private val seed by option(help = "the random seed")
- .int()
- .default(0)
- private val failures by option("-x", "--failures", help = "enable (correlated) machine failures")
- .flag()
- private val failureInterval by option(help = "expected number of hours between failures")
- .int()
- .default(24 * 7) // one week
- private val allocationPolicy by option(help = "name of VM allocation policy to use")
- .choice(
- "mem", "mem-inv",
- "core-mem", "core-mem-inv",
- "active-servers", "active-servers-inv",
- "provisioned-cores", "provisioned-cores-inv",
- "random", "replay"
- )
- .default("core-mem")
-
- private val trace by option().groupChoice(
- "sc20-parquet" to Trace.Sc20Parquet()
- ).required()
-
private val reporter by option().groupChoice(
"parquet" to Reporter.Parquet(),
"postgres" to Reporter.Postgres()
).required()
- private fun parseVMs(string: String): List<String> {
- // Handle case where VM list contains a VM name with an (escaped) single-quote in it
- val sanitizedString = string.replace("\\'", "\\\\[")
- .replace("'", "\"")
- .replace("\\\\[", "'")
- val vms: List<String> = jacksonObjectMapper().readValue(sanitizedString)
- return vms
- }
+ private val jdbcUrl by option("--jdbc-url", help = "JDBC connection url").required()
override fun run() {
- logger.info("seed: $seed")
- logger.info("failures: $failures")
- logger.info("allocation-policy: $allocationPolicy")
-
- val start = System.currentTimeMillis()
- val reporter: ExperimentReporter = reporter.createReporter()
-
- val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
- val system = provider("test")
- val root = system.newDomain("root")
-
- val chan = Channel<Unit>(Channel.CONFLATED)
- val allocationPolicy = when (this.allocationPolicy) {
- "mem" -> AvailableMemoryAllocationPolicy()
- "mem-inv" -> AvailableMemoryAllocationPolicy(true)
- "core-mem" -> AvailableCoreMemoryAllocationPolicy()
- "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true)
- "active-servers" -> NumberOfActiveServersAllocationPolicy()
- "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true)
- "provisioned-cores" -> ProvisionedCoresAllocationPolicy()
- "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true)
- "random" -> RandomAllocationPolicy(Random(seed))
- "replay" -> ReplayAllocationPolicy(vmPlacements)
- else -> throw IllegalArgumentException("Unknown policy ${this.allocationPolicy}")
- }
-
- val performanceInterferenceModel = try {
- Sc20PerformanceInterferenceReader(performanceInterferenceStream).construct()
- } catch (e: Throwable) {
- reporter.close()
- throw e
- }
- val environmentReader = Sc20ClusterEnvironmentReader(environment)
- val traceReader = try {
- trace.createTraceReader(performanceInterferenceModel, selectedVms, seed)
- } catch (e: Throwable) {
- reporter.close()
- throw e
- }
-
- root.launch {
- val (bareMetalProvisioner, scheduler) = createProvisioner(root, environmentReader, allocationPolicy)
-
- val failureDomain = if (failures) {
- logger.info("ENABLING failures")
- createFailureDomain(seed, failureInterval, bareMetalProvisioner, chan)
- } else {
- null
- }
-
- attachMonitor(scheduler, reporter)
- processTrace(traceReader, scheduler, chan, reporter, vmPlacements)
-
- logger.debug("SUBMIT=${scheduler.submittedVms}")
- logger.debug("FAIL=${scheduler.unscheduledVms}")
- logger.debug("QUEUED=${scheduler.queuedVms}")
- logger.debug("RUNNING=${scheduler.runningVms}")
- logger.debug("FINISHED=${scheduler.finishedVms}")
+ val ds = HikariDataSource()
+ ds.jdbcUrl = jdbcUrl
+ ds.addDataSourceProperty("reWriteBatchedInserts", "true")
+
+ val portfolios = listOf(
+ HorVerPortfolio // ,
+ // MoreVelocityPortfolio,
+ // MoreHpcPortfolio,
+ // OperationalPhenomenaPortfolio
+ )
- failureDomain?.cancel()
- scheduler.terminate()
- logger.info("Simulation took ${System.currentTimeMillis() - start} milliseconds")
- }
+ val performanceInterferenceModel = Sc20PerformanceInterferenceReader(performanceInterferenceStream)
+ .construct()
- runBlocking {
- system.run()
- system.terminate()
+ try {
+ val runner = ExperimentRunner(portfolios, ds, reporter, environmentPath, tracePath, performanceInterferenceModel)
+ runner.run()
+ } finally {
+ ds.close()
}
-
- // Explicitly close the monitor to flush its buffer
- reporter.close()
}
}
/**
* An option for specifying the type of reporter to use.
*/
-internal sealed class Reporter(name: String) : OptionGroup(name) {
- /**
- * Create the [ExperimentReporter] for this option.
- */
- abstract fun createReporter(): ExperimentReporter
-
+internal sealed class Reporter(name: String) : OptionGroup(name), ExperimentReporterProvider {
class Parquet : Reporter("Options for reporting using Parquet") {
- private val path by option("--parquet-path", help = "path to where the output should be stored")
+ private val path by option("--parquet-directory", help = "path to where the output should be stored")
.file()
- .defaultLazy { File("data/results-${System.currentTimeMillis()}.parquet") }
+ .defaultLazy { File("data") }
- override fun createReporter(): ExperimentReporter =
- ExperimentParquetReporter(path)
+ override fun createReporter(ds: DataSource, experimentId: Long): ExperimentReporter =
+ ExperimentParquetReporter(File(path, "results-${System.currentTimeMillis()}.parquet"))
}
class Postgres : Reporter("Options for reporting using PostgreSQL") {
- private val url by option("--postgres-url", help = "JDBC connection url").required()
- private val experimentId by option(help = "Experiment ID").long().required()
-
- override fun createReporter(): ExperimentReporter {
- val conn = DriverManager.getConnection(url)
- return ExperimentPostgresReporter(conn, experimentId)
- }
- }
-}
-
-/**
- * An option for specifying the type of trace to use.
- */
-internal sealed class Trace(type: String) : OptionGroup(type) {
- /**
- * Create a [TraceReader] for this type of trace.
- */
- abstract fun createTraceReader(performanceInterferenceModel: PerformanceInterferenceModel, vms: List<String>, seed: Int): TraceReader<VmWorkload>
-
- class Sc20Parquet : Trace("SC20 Parquet format") {
- /**
- * Path to trace directory.
- */
- private val path by option("--trace-path", help = "path to the trace directory")
- .file(canBeFile = false)
- .required()
-
- override fun createTraceReader(
- performanceInterferenceModel: PerformanceInterferenceModel,
- vms: List<String>,
- seed: Int
- ): TraceReader<VmWorkload> {
- return Sc20ParquetTraceReader(
- path,
- performanceInterferenceModel,
- vms,
- Random(seed)
- )
- }
+ override fun createReporter(ds: DataSource, experimentId: Long): ExperimentReporter =
+ ExperimentPostgresReporter(ds.connection, experimentId)
}
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolio.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolio.kt
new file mode 100644
index 00000000..34505fce
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolio.kt
@@ -0,0 +1,35 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20
+
+/**
+ * A portfolio represents a collection of scenarios are tested.
+ */
+public abstract class Portfolio(val name: String) {
+ /**
+ * The scenarios of this portfolio consists of.
+ */
+ abstract val scenarios: Sequence<Scenario>
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt
new file mode 100644
index 00000000..0e612ae0
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt
@@ -0,0 +1,163 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20
+
+abstract class AbstractSc20Portfolio(name: String) : Portfolio(name) {
+ abstract val topologies: List<Topology>
+ abstract val workloads: List<Workload>
+ abstract val operationalPhenomena: List<Pair<Boolean, Boolean>>
+ abstract val allocationPolicies: List<String>
+
+ open val repetitions = 2
+
+ override val scenarios: Sequence<Scenario> = sequence {
+ for (topology in topologies) {
+ for (workload in workloads) {
+ for ((hasFailures, hasInterference) in operationalPhenomena) {
+ for (allocationPolicy in allocationPolicies) {
+ yield(
+ Scenario(
+ this@AbstractSc20Portfolio,
+ repetitions,
+ topology,
+ workload,
+ allocationPolicy,
+ hasFailures,
+ hasInterference
+ )
+ )
+ }
+ }
+ }
+ }
+ }
+}
+
+object HorVerPortfolio : AbstractSc20Portfolio("horizontal_vs_vertical") {
+ override val topologies = listOf(
+ Topology("base"),
+ Topology("rep-vol-hor-hom"),
+ Topology("rep-vol-hor-het"),
+ Topology("rep-vol-ver-hom"),
+ Topology("rep-vol-ver-het"),
+ Topology("exp-vol-hor-hom"),
+ Topology("exp-vol-hor-het"),
+ Topology("exp-vol-ver-hom"),
+ Topology("exp-vol-ver-het")
+ )
+
+ override val workloads = listOf(
+ // Workload("solvinity", 0.1),
+ // Workload("solvinity", 0.25),
+ Workload("small-parquet", 0.5),
+ Workload("small-parquet", 1.0)
+ )
+
+ override val operationalPhenomena = listOf(
+ true to true
+ )
+
+ override val allocationPolicies = listOf(
+ "active-servers"
+ )
+}
+
+object MoreVelocityPortfolio : AbstractSc20Portfolio("more_velocity") {
+ override val topologies = listOf(
+ Topology("base"),
+ Topology("rep-vel-ver-hom"),
+ Topology("rep-vel-ver-het"),
+ Topology("exp-vel-ver-hom"),
+ Topology("exp-vel-ver-het")
+ )
+
+ override val workloads = listOf(
+ // Workload("solvinity", 0.1),
+ // Workload("solvinity", 0.25),
+ Workload("solvinity", 0.5),
+ Workload("solvinity", 1.0)
+ )
+
+ override val operationalPhenomena = listOf(
+ true to true
+ )
+
+ override val allocationPolicies = listOf(
+ "active-servers"
+ )
+}
+
+object MoreHpcPortfolio : AbstractSc20Portfolio("more_velocity") {
+ override val topologies = listOf(
+ Topology("base"),
+ Topology("exp-vol-hor-hom"),
+ Topology("exp-vol-ver-hom"),
+ Topology("exp-vel-ver-hom")
+ )
+
+ override val workloads = listOf(
+ // Workload("solvinity", 0.1),
+ // Workload("solvinity", 0.25),
+ Workload("solvinity", 0.5),
+ Workload("solvinity", 1.0)
+ )
+
+ override val operationalPhenomena = listOf(
+ true to true
+ )
+
+ override val allocationPolicies = listOf(
+ "active-servers"
+ )
+}
+
+object OperationalPhenomenaPortfolio : AbstractSc20Portfolio("more_velocity") {
+ override val topologies = listOf(
+ Topology("base")
+ )
+
+ override val workloads = listOf(
+ // Workload("solvinity", 0.1),
+ // Workload("solvinity", 0.25),
+ Workload("solvinity", 1.0)
+ )
+
+ override val operationalPhenomena = listOf(
+ true to true,
+ false to true,
+ true to false,
+ true to true
+ )
+
+ override val allocationPolicies = listOf(
+ "mem",
+ "mem-inv",
+ "core-mem",
+ "core-mem-inv",
+ "active-servers",
+ "active-servers-inv",
+ "random"
+ )
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Run.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Run.kt
new file mode 100644
index 00000000..b2151b16
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Run.kt
@@ -0,0 +1,31 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20
+
+/**
+ * An experiment run represent a single invocation of a trial and is used to distinguish between repetitions of the
+ * same set of parameters.
+ */
+public data class Run(val scenario: Scenario, val id: Int, val seed: Int)
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Scenario.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Scenario.kt
new file mode 100644
index 00000000..66a8babf
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Scenario.kt
@@ -0,0 +1,129 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20
+
+import com.atlarge.odcsim.SimulationEngineProvider
+import com.atlarge.opendc.compute.core.workload.VmWorkload
+import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy
+import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy
+import com.atlarge.opendc.compute.virt.service.allocation.NumberOfActiveServersAllocationPolicy
+import com.atlarge.opendc.compute.virt.service.allocation.ProvisionedCoresAllocationPolicy
+import com.atlarge.opendc.compute.virt.service.allocation.RandomAllocationPolicy
+import com.atlarge.opendc.compute.virt.service.allocation.ReplayAllocationPolicy
+import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporter
+import com.atlarge.opendc.format.environment.EnvironmentReader
+import com.atlarge.opendc.format.trace.TraceReader
+import kotlinx.coroutines.cancel
+import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.runBlocking
+import mu.KotlinLogging
+import java.util.ServiceLoader
+import kotlin.random.Random
+
+/**
+ * The logger for the experiment scenario.
+ */
+private val logger = KotlinLogging.logger {}
+
+/**
+ * The provider for the simulation engine to use.
+ */
+private val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
+
+/**
+ * A scenario represents a single point in the design space (a unique combination of parameters).
+ */
+public class Scenario(
+ val portfolio: Portfolio,
+ val repetitions: Int,
+ val topology: Topology,
+ val workload: Workload,
+ val allocationPolicy: String,
+ val hasFailures: Boolean,
+ val hasInterference: Boolean,
+ val failureInterval: Int = 24 * 7
+) {
+ /**
+ * The runs this scenario consists of.
+ */
+ public val runs: Sequence<Run> = sequence {
+ repeat(repetitions) { i ->
+ yield(Run(this@Scenario, i, i))
+ }
+ }
+
+ /**
+ * Perform a single run of this scenario.
+ */
+ public operator fun invoke(run: Run, reporter: ExperimentReporter, environment: EnvironmentReader, trace: TraceReader<VmWorkload>) {
+ val system = provider("experiment-${run.id}")
+ val root = system.newDomain("root")
+ val seeder = Random(run.seed)
+
+ val chan = Channel<Unit>(Channel.CONFLATED)
+ val allocationPolicy = when (this.allocationPolicy) {
+ "mem" -> AvailableMemoryAllocationPolicy()
+ "mem-inv" -> AvailableMemoryAllocationPolicy(true)
+ "core-mem" -> AvailableCoreMemoryAllocationPolicy()
+ "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true)
+ "active-servers" -> NumberOfActiveServersAllocationPolicy()
+ "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true)
+ "provisioned-cores" -> ProvisionedCoresAllocationPolicy()
+ "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true)
+ "random" -> RandomAllocationPolicy(Random(seeder.nextInt()))
+ "replay" -> ReplayAllocationPolicy(emptyMap())
+ else -> throw IllegalArgumentException("Unknown policy ${this.allocationPolicy}")
+ }
+
+ root.launch {
+ val (bareMetalProvisioner, scheduler) = createProvisioner(root, environment, allocationPolicy)
+
+ val failureDomain = if (hasFailures) {
+ logger.debug("ENABLING failures")
+ createFailureDomain(seeder.nextInt(), failureInterval, bareMetalProvisioner, chan)
+ } else {
+ null
+ }
+
+ attachMonitor(scheduler, reporter)
+ processTrace(trace, scheduler, chan, reporter, emptyMap())
+
+ logger.debug("SUBMIT=${scheduler.submittedVms}")
+ logger.debug("FAIL=${scheduler.unscheduledVms}")
+ logger.debug("QUEUED=${scheduler.queuedVms}")
+ logger.debug("RUNNING=${scheduler.runningVms}")
+ logger.debug("FINISHED=${scheduler.finishedVms}")
+
+ failureDomain?.cancel()
+ scheduler.terminate()
+ }
+
+ runBlocking {
+ system.run()
+ system.terminate()
+ }
+ }
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Topology.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Topology.kt
new file mode 100644
index 00000000..d2be9599
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Topology.kt
@@ -0,0 +1,30 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20
+
+/**
+ * The datacenter topology on which we test the workload.
+ */
+public data class Topology(val name: String)
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Workload.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Workload.kt
new file mode 100644
index 00000000..4ab5ec8c
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Workload.kt
@@ -0,0 +1,30 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20
+
+/**
+ * A workload that is considered for a scenario.
+ */
+public class Workload(val name: String, val fraction: Double)
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporterProvider.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporterProvider.kt
new file mode 100644
index 00000000..d0dfd2e8
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporterProvider.kt
@@ -0,0 +1,31 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.reporter
+
+import javax.sql.DataSource
+
+interface ExperimentReporterProvider {
+ public fun createReporter(ds: DataSource, experimentId: Long): ExperimentReporter
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt
index 8a204ca3..8a204ca3 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/util/DatabaseHelper.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/util/DatabaseHelper.kt
new file mode 100644
index 00000000..0292a2e3
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/util/DatabaseHelper.kt
@@ -0,0 +1,160 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.util
+
+import com.atlarge.opendc.experiments.sc20.Portfolio
+import com.atlarge.opendc.experiments.sc20.Run
+import com.atlarge.opendc.experiments.sc20.Scenario
+import java.io.Closeable
+import java.sql.Connection
+import java.sql.Statement
+
+/**
+ * A helper class for writing to the database.
+ */
+class DatabaseHelper(val conn: Connection) : Closeable {
+ /**
+ * Prepared statement to create experiment.
+ */
+ private val createExperiment = conn.prepareStatement("INSERT INTO experiments DEFAULT VALUES", arrayOf("id"))
+
+ /**
+ * Prepared statement for creating a portfolio.
+ */
+ private val createPortfolio = conn.prepareStatement("INSERT INTO portfolios (experiment_id, name) VALUES (?, ?)", arrayOf("id"))
+
+ /**
+ * Prepared statement for creating a scenario
+ */
+ private val createScenario = conn.prepareStatement("INSERT INTO scenarios (portfolio_id, repetitions, topology, workload_name, workload_fraction, allocation_policy, failures, interference) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", arrayOf("id"))
+
+ /**
+ * Prepared statement for creating a run.
+ */
+ private val createRun = conn.prepareStatement("INSERT INTO runs (id, scenario_id, seed) VALUES (?, ?, ?)", arrayOf("id"))
+
+ /**
+ * Prepared statement for starting a run.
+ */
+ private val startRun = conn.prepareStatement("UPDATE runs SET state = 'active'::run_state,start_time = now() WHERE id = ? AND scenario_id = ?")
+
+ /**
+ * Prepared statement for finishing a run.
+ */
+ private val finishRun = conn.prepareStatement("UPDATE runs SET state = ?::run_state,end_time = now() WHERE id = ? AND scenario_id = ?")
+
+ /**
+ * Create a new experiment and return its id.
+ */
+ fun createExperiment(): Long {
+ val affectedRows = createExperiment.executeUpdate()
+ check(affectedRows != 0)
+ return createExperiment.latestId
+ }
+
+ /**
+ * Persist a [Portfolio] and return its id.
+ */
+ fun persist(portfolio: Portfolio, experimentId: Long): Long {
+ createPortfolio.setLong(1, experimentId)
+ createPortfolio.setString(2, portfolio.name)
+
+ val affectedRows = createPortfolio.executeUpdate()
+ check(affectedRows != 0)
+ return createPortfolio.latestId
+ }
+
+ /**
+ * Persist a [Scenario] and return its id.
+ */
+ fun persist(scenario: Scenario, portfolioId: Long): Long {
+ createScenario.setLong(1, portfolioId)
+ createScenario.setInt(2, scenario.repetitions)
+ createScenario.setString(3, scenario.topology.name)
+ createScenario.setString(4, scenario.workload.name)
+ createScenario.setDouble(5, scenario.workload.fraction)
+ createScenario.setString(6, scenario.allocationPolicy)
+ createScenario.setBoolean(7, scenario.hasFailures)
+ createScenario.setBoolean(8, scenario.hasInterference)
+
+ val affectedRows = createScenario.executeUpdate()
+ check(affectedRows != 0)
+ return createScenario.latestId
+ }
+
+ /**
+ * Persist a [Run] and return its id.
+ */
+ fun persist(run: Run, scenarioId: Long): Int {
+ createRun.setInt(1, run.id)
+ createRun.setLong(2, scenarioId)
+ createRun.setInt(3, run.seed)
+
+ val affectedRows = createRun.executeUpdate()
+ check(affectedRows != 0)
+ return createRun.latestId.toInt()
+ }
+
+ /**
+ * Start run.
+ */
+ fun startRun(scenario: Long, run: Int) {
+ startRun.setInt(1, run)
+ startRun.setLong(2, scenario)
+
+ val affectedRows = startRun.executeUpdate()
+ check(affectedRows != 0)
+ }
+
+ /**
+ * Finish a run.
+ */
+ fun finishRun(scenario: Long, run: Int, hasFailed: Boolean) {
+ finishRun.setString(1, if (hasFailed) "fail" else "ok")
+ finishRun.setInt(2, run)
+ finishRun.setLong(3, scenario)
+
+ val affectedRows = finishRun.executeUpdate()
+ check(affectedRows != 0)
+ }
+
+ /**
+ * Obtain the latest identifier of the specified statement.
+ */
+ private val Statement.latestId: Long
+ get() {
+ val rs = generatedKeys
+ return try {
+ check(rs.next())
+ rs.getLong(1)
+ } finally {
+ rs.close()
+ }
+ }
+
+ override fun close() {
+ conn.close()
+ }
+}