From 287d85732a8bcd5d85a8628006828fa460baaff9 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 15 May 2020 02:18:45 +0200 Subject: refactor: Move entirely to Parquet --- opendc/opendc-experiments-sc20/build.gradle.kts | 13 +- opendc/opendc-experiments-sc20/schema.sql | 141 ----------- .../opendc/experiments/sc20/ExperimentHelpers.kt | 258 -------------------- .../opendc/experiments/sc20/ExperimentRunner.kt | 216 ----------------- .../opendc/experiments/sc20/ExperimentRunnerCli.kt | 219 ----------------- .../com/atlarge/opendc/experiments/sc20/Main.kt | 157 ++++++++++++ .../atlarge/opendc/experiments/sc20/Portfolio.kt | 35 --- .../atlarge/opendc/experiments/sc20/Portfolios.kt | 165 ------------- .../com/atlarge/opendc/experiments/sc20/Run.kt | 31 --- .../atlarge/opendc/experiments/sc20/Scenario.kt | 128 ---------- .../atlarge/opendc/experiments/sc20/Topology.kt | 30 --- .../atlarge/opendc/experiments/sc20/Workload.kt | 30 --- .../opendc/experiments/sc20/WorkloadSampler.kt | 68 ------ .../experiments/sc20/experiment/Experiment.kt | 78 ++++++ .../sc20/experiment/ExperimentHelpers.kt | 264 +++++++++++++++++++++ .../experiments/sc20/experiment/Portfolio.kt | 90 +++++++ .../experiments/sc20/experiment/Portfolios.kt | 136 +++++++++++ .../opendc/experiments/sc20/experiment/Run.kt | 143 +++++++++++ .../opendc/experiments/sc20/experiment/Scenario.kt | 48 ++++ .../sc20/experiment/model/OperationalPhenomena.kt | 33 +++ .../experiments/sc20/experiment/model/Topology.kt | 30 +++ .../experiments/sc20/experiment/model/Workload.kt | 30 +++ .../sc20/experiment/monitor/ExperimentMonitor.kt | 75 ++++++ .../experiment/monitor/ParquetExperimentMonitor.kt | 145 +++++++++++ .../sc20/reporter/ConsoleExperimentReporter.kt | 75 ++++++ .../sc20/reporter/ExperimentReporter.kt | 75 ------ .../sc20/reporter/ExperimentReporterProvider.kt | 40 ---- .../experiments/sc20/reporter/HostMetrics.kt | 44 ---- .../sc20/reporter/ParquetExperimentReporter.kt | 132 ----------- .../sc20/reporter/ParquetHostMetricsWriter.kt | 73 ------ .../sc20/reporter/ParquetMetricsWriter.kt | 114 --------- .../reporter/ParquetProvisionerMetricsWriter.kt | 65 ----- .../sc20/reporter/PostgresExperimentReporter.kt | 128 ---------- .../sc20/reporter/PostgresHostMetricsWriter.kt | 75 ------ .../sc20/reporter/PostgresMetricsWriter.kt | 124 ---------- .../reporter/PostgresProvisionerMetricsWriter.kt | 67 ------ .../sc20/reporter/ProvisionerMetrics.kt | 39 --- .../opendc/experiments/sc20/reporter/VmMetrics.kt | 43 ---- .../sc20/runner/ContainerExperimentDescriptor.kt | 68 ++++++ .../sc20/runner/ExperimentDescriptor.kt | 81 +++++++ .../experiments/sc20/runner/ExperimentRunner.kt | 51 ++++ .../sc20/runner/TrialExperimentDescriptor.kt | 32 +++ .../runner/execution/ExperimentExecutionContext.kt | 45 ++++ .../execution/ExperimentExecutionListener.kt | 48 ++++ .../runner/execution/ExperimentExecutionResult.kt | 42 ++++ .../sc20/runner/execution/ExperimentScheduler.kt | 59 +++++ .../execution/ThreadPoolExperimentScheduler.kt | 85 +++++++ .../runner/internal/DefaultExperimentRunner.kt | 62 +++++ .../opendc/experiments/sc20/telemetry/Event.kt | 35 +++ .../opendc/experiments/sc20/telemetry/HostEvent.kt | 44 ++++ .../experiments/sc20/telemetry/ProvisionerEvent.kt | 39 +++ .../opendc/experiments/sc20/telemetry/RunEvent.kt | 35 +++ .../opendc/experiments/sc20/telemetry/VmEvent.kt | 43 ++++ .../sc20/telemetry/parquet/ParquetEventWriter.kt | 121 ++++++++++ .../telemetry/parquet/ParquetHostEventWriter.kt | 81 +++++++ .../parquet/ParquetProvisionerEventWriter.kt | 67 ++++++ .../telemetry/parquet/ParquetRunEventWriter.kt | 78 ++++++ .../sc20/trace/Sc20ParquetTraceReader.kt | 3 +- .../experiments/sc20/trace/WorkloadSampler.kt | 69 ++++++ .../opendc/experiments/sc20/util/DatabaseHelper.kt | 160 ------------- .../opendc/experiments/sc20/Sc20IntegrationTest.kt | 36 ++- 61 files changed, 2526 insertions(+), 2515 deletions(-) delete mode 100644 opendc/opendc-experiments-sc20/schema.sql delete mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt delete mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt delete mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt delete mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolio.kt delete mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt delete mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Run.kt delete mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Scenario.kt delete mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Topology.kt delete mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Workload.kt delete mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/WorkloadSampler.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Experiment.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolio.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Scenario.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/OperationalPhenomena.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Topology.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Workload.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ExperimentMonitor.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt delete mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporter.kt delete mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporterProvider.kt delete mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/HostMetrics.kt delete mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetExperimentReporter.kt delete mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetHostMetricsWriter.kt delete mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetMetricsWriter.kt delete mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetProvisionerMetricsWriter.kt delete mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt delete mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresHostMetricsWriter.kt delete mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt delete mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresProvisionerMetricsWriter.kt delete mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ProvisionerMetrics.kt delete mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/VmMetrics.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ContainerExperimentDescriptor.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ExperimentDescriptor.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ExperimentRunner.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/TrialExperimentDescriptor.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionContext.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionListener.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionResult.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentScheduler.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/Event.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/HostEvent.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/ProvisionerEvent.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/RunEvent.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/VmEvent.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt delete mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/util/DatabaseHelper.kt (limited to 'opendc') diff --git a/opendc/opendc-experiments-sc20/build.gradle.kts b/opendc/opendc-experiments-sc20/build.gradle.kts index 2ba07554..46d99564 100644 --- a/opendc/opendc-experiments-sc20/build.gradle.kts +++ b/opendc/opendc-experiments-sc20/build.gradle.kts @@ -31,27 +31,26 @@ plugins { } application { - mainClassName = "com.atlarge.opendc.experiments.sc20.ExperimentRunnerCliKt" - applicationDefaultJvmArgs = listOf("-Xmx2500M", "-Xms1800M") + mainClassName = "com.atlarge.opendc.experiments.sc20.MainKt" + applicationDefaultJvmArgs = listOf("-Xms2500M") } dependencies { api(project(":opendc:opendc-core")) implementation(project(":opendc:opendc-format")) implementation(kotlin("stdlib")) + implementation("com.github.ajalt:clikt:2.6.0") + implementation("me.tongfei:progressbar:0.8.1") implementation("io.github.microutils:kotlin-logging:1.7.9") + implementation("org.apache.parquet:parquet-avro:1.11.0") implementation("org.apache.hadoop:hadoop-client:3.2.1") { exclude(group = "org.slf4j", module = "slf4j-log4j12") exclude(group = "log4j") } - implementation("com.zaxxer:HikariCP:3.4.5") - implementation("de.bytefish.pgbulkinsert:pgbulkinsert-core:5.1.0") - implementation("de.bytefish.pgbulkinsert:pgbulkinsert-rowwriter:5.1.0") - implementation("me.tongfei:progressbar:0.8.1") + runtimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:2.13.1") - runtimeOnly("org.postgresql:postgresql:42.2.12") runtimeOnly(project(":odcsim:odcsim-engine-omega")) testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}") diff --git a/opendc/opendc-experiments-sc20/schema.sql b/opendc/opendc-experiments-sc20/schema.sql deleted file mode 100644 index 92cb5d1f..00000000 --- a/opendc/opendc-experiments-sc20/schema.sql +++ /dev/null @@ -1,141 +0,0 @@ --- An experiment represents a collection of portfolios. -DROP TABLE IF EXISTS experiments CASCADE; -CREATE TABLE experiments -( - id BIGSERIAL PRIMARY KEY NOT NULL, - creation_time TIMESTAMP NOT NULL DEFAULT (now()) -); - --- A portfolio represents a collection of scenarios tested. -DROP TABLE IF EXISTS portfolios CASCADE; -CREATE TABLE portfolios -( - id BIGSERIAL PRIMARY KEY NOT NULL, - experiment_id BIGINT NOT NULL, - name TEXT NOT NULL, - - FOREIGN KEY (experiment_id) REFERENCES experiments (id) - ON DELETE CASCADE - ON UPDATE CASCADE -); - --- A scenario represents a single point in the design space (a unique combination of parameters) -DROP TABLE IF EXISTS scenarios CASCADE; -CREATE TABLE scenarios -( - id BIGSERIAL PRIMARY KEY NOT NULL, - portfolio_id BIGINT NOT NULL, - repetitions INTEGER NOT NULL, - topology TEXT NOT NULL, - workload_name TEXT NOT NULL, - workload_fraction DOUBLE PRECISION NOT NULL, - allocation_policy TEXT NOT NULL, - failure_frequency DOUBLE PRECISION NOT NULL, - interference BOOLEAN NOT NULL, - - FOREIGN KEY (portfolio_id) REFERENCES portfolios (id) - ON DELETE CASCADE - ON UPDATE CASCADE - -); - -DROP TYPE IF EXISTS run_state CASCADE; -CREATE TYPE run_state AS ENUM ('wait', 'active', 'fail', 'ok'); - --- An experiment run represent a single invocation of a trial and is used to distinguish between repetitions of the --- same set of parameters. -DROP TABLE IF EXISTS runs CASCADE; -CREATE TABLE runs -( - id INTEGER NOT NULL, - scenario_id BIGINT NOT NULL, - seed INTEGER NOT NULL, - state run_state NOT NULL DEFAULT 'wait'::run_state, - submit_time TIMESTAMP NOT NULL DEFAULT (now()), - start_time TIMESTAMP DEFAULT (NULL), - end_time TIMESTAMP DEFAULT (NULL), - - PRIMARY KEY (scenario_id, id), - FOREIGN KEY (scenario_id) REFERENCES scenarios (id) - ON DELETE CASCADE - ON UPDATE CASCADE -); - --- Metrics of the hypervisors reported per slice -DROP TABLE IF EXISTS host_metrics CASCADE; -CREATE TABLE host_metrics -( - id BIGSERIAL PRIMARY KEY NOT NULL, - scenario_id BIGINT NOT NULL, - run_id INTEGER NOT NULL, - host_id TEXT NOT NULL, - state TEXT NOT NULL, - timestamp TIMESTAMP NOT NULL, - duration BIGINT NOT NULL, - vm_count INTEGER NOT NULL, - requested_burst BIGINT NOT NULL, - granted_burst BIGINT NOT NULL, - overcommissioned_burst BIGINT NOT NULL, - interfered_burst BIGINT NOT NULL, - cpu_usage DOUBLE PRECISION NOT NULL, - cpu_demand DOUBLE PRECISION NOT NULL, - power_draw DOUBLE PRECISION NOT NULL, - - FOREIGN KEY (scenario_id, run_id) REFERENCES runs (scenario_id, id) - ON DELETE CASCADE - ON UPDATE CASCADE -); - -DROP INDEX IF EXISTS host_metrics_idx; -CREATE INDEX host_metrics_idx ON host_metrics (scenario_id, run_id, timestamp, host_id); - --- Metrics of the VMs reported per slice -DROP TABLE IF EXISTS vm_metrics CASCADE; -CREATE TABLE vm_metrics -( - id BIGSERIAL PRIMARY KEY NOT NULL, - scenario_id BIGINT NOT NULL, - run_id INTEGER NOT NULL, - vm_id TEXT NOT NULL, - host_id TEXT NOT NULL, - state TEXT NOT NULL, - timestamp TIMESTAMP NOT NULL, - duration BIGINT NOT NULL, - requested_burst BIGINT NOT NULL, - granted_burst BIGINT NOT NULL, - overcommissioned_burst BIGINT NOT NULL, - interfered_burst BIGINT NOT NULL, - cpu_usage DOUBLE PRECISION NOT NULL, - cpu_demand DOUBLE PRECISION NOT NULL, - - FOREIGN KEY (scenario_id, run_id) REFERENCES runs (scenario_id, id) - ON DELETE CASCADE - ON UPDATE CASCADE -); - -DROP INDEX IF EXISTS vm_metrics_idx; -CREATE INDEX vm_metrics_idx ON vm_metrics (scenario_id, run_id, timestamp, vm_id); - --- Metrics of the provisioner reported per change -DROP TABLE IF EXISTS provisioner_metrics CASCADE; -CREATE TABLE provisioner_metrics -( - id BIGSERIAL PRIMARY KEY NOT NULL, - scenario_id BIGINT NOT NULL, - run_id INTEGER NOT NULL, - timestamp TIMESTAMP NOT NULL, - host_total_count INTEGER NOT NULL, - host_available_count INTEGER NOT NULL, - vm_total_count INTEGER NOT NULL, - vm_active_count INTEGER NOT NULL, - vm_inactive_count INTEGER NOT NULL, - vm_waiting_count INTEGER NOT NULL, - vm_failed_count INTEGER NOT NULL, - - FOREIGN KEY (scenario_id, run_id) REFERENCES runs (scenario_id, id) - ON DELETE CASCADE - ON UPDATE CASCADE -); - -DROP INDEX IF EXISTS provisioner_metrics_idx; -CREATE INDEX provisioner_metrics_idx ON provisioner_metrics (scenario_id, run_id, timestamp); diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt deleted file mode 100644 index 1bc463c3..00000000 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt +++ /dev/null @@ -1,258 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.experiments.sc20 - -import com.atlarge.odcsim.Domain -import com.atlarge.odcsim.simulationContext -import com.atlarge.opendc.compute.core.Flavor -import com.atlarge.opendc.compute.core.ServerEvent -import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel -import com.atlarge.opendc.compute.core.workload.VmWorkload -import com.atlarge.opendc.compute.metal.NODE_CLUSTER -import com.atlarge.opendc.compute.metal.driver.BareMetalDriver -import com.atlarge.opendc.compute.metal.service.ProvisioningService -import com.atlarge.opendc.compute.virt.HypervisorEvent -import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver -import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService -import com.atlarge.opendc.compute.virt.service.VirtProvisioningEvent -import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy -import com.atlarge.opendc.core.failure.CorrelatedFaultInjector -import com.atlarge.opendc.core.failure.FailureDomain -import com.atlarge.opendc.core.failure.FaultInjector -import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporter -import com.atlarge.opendc.experiments.sc20.trace.Sc20StreamingParquetTraceReader -import com.atlarge.opendc.format.environment.EnvironmentReader -import com.atlarge.opendc.format.trace.TraceReader -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.collect -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.launch -import kotlinx.coroutines.withContext -import mu.KotlinLogging -import java.io.File -import java.util.TreeSet -import kotlin.math.ln -import kotlin.math.max -import kotlin.random.Random - -/** - * The logger for this experiment. - */ -private val logger = KotlinLogging.logger {} - -/** - * Construct the failure domain for the experiments. - */ -suspend fun createFailureDomain( - seed: Int, - failureInterval: Double, - bareMetalProvisioner: ProvisioningService, - chan: Channel -): Domain { - val root = simulationContext.domain - val domain = root.newDomain(name = "failures") - domain.launch { - chan.receive() - val random = Random(seed) - val injectors = mutableMapOf() - for (node in bareMetalProvisioner.nodes()) { - val cluster = node.metadata[NODE_CLUSTER] as String - val injector = - injectors.getOrPut(cluster) { createFaultInjector(simulationContext.domain, random, failureInterval) } - injector.enqueue(node.metadata["driver"] as FailureDomain) - } - } - return domain -} - -/** - * Obtain the [FaultInjector] to use for the experiments. - */ -fun createFaultInjector(domain: Domain, random: Random, failureInterval: Double): FaultInjector { - // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 - // GRID'5000 - return CorrelatedFaultInjector( - domain, - iatScale = ln(failureInterval), iatShape = 1.03, // Hours - sizeScale = 1.88, sizeShape = 1.25, - dScale = 9.51, dShape = 3.21, // Minutes - random = random - ) -} - -/** - * Create the trace reader from which the VM workloads are read. - */ -fun createTraceReader(path: File, performanceInterferenceModel: PerformanceInterferenceModel, vms: List, seed: Int): Sc20StreamingParquetTraceReader { - return Sc20StreamingParquetTraceReader( - path, - performanceInterferenceModel, - vms, - Random(seed) - ) -} - -/** - * Construct the environment for a VM provisioner and return the provisioner instance. - */ -suspend fun createProvisioner( - root: Domain, - environmentReader: EnvironmentReader, - allocationPolicy: AllocationPolicy -): Pair = withContext(root.coroutineContext) { - val environment = environmentReader.use { it.construct(root) } - val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService] - - // Wait for the bare metal nodes to be spawned - delay(10) - - val scheduler = SimpleVirtProvisioningService(allocationPolicy, simulationContext, bareMetalProvisioner) - - // Wait for the hypervisors to be spawned - delay(10) - - bareMetalProvisioner to scheduler -} - -/** - * Attach the specified monitor to the VM provisioner. - */ -@OptIn(ExperimentalCoroutinesApi::class) -suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: ExperimentReporter) { - val domain = simulationContext.domain - val clock = simulationContext.clock - val hypervisors = scheduler.drivers() - - // Monitor hypervisor events - for (hypervisor in hypervisors) { - // TODO Do not expose VirtDriver directly but use Hypervisor class. - reporter.reportHostStateChange(clock.millis(), hypervisor, (hypervisor as SimpleVirtDriver).server) - hypervisor.server.events - .onEach { event -> - val time = clock.millis() - when (event) { - is ServerEvent.StateChanged -> { - reporter.reportHostStateChange(time, hypervisor, event.server) - } - } - } - .launchIn(domain) - hypervisor.events - .onEach { event -> - when (event) { - is HypervisorEvent.SliceFinished -> reporter.reportHostSlice( - simulationContext.clock.millis(), - event.requestedBurst, - event.grantedBurst, - event.overcommissionedBurst, - event.interferedBurst, - event.cpuUsage, - event.cpuDemand, - event.numberOfDeployedImages, - event.hostServer - ) - } - } - .launchIn(domain) - - val driver = hypervisor.server.services[BareMetalDriver.Key] - driver.powerDraw - .onEach { reporter.reportPowerConsumption(hypervisor.server, it) } - .launchIn(domain) - } - - scheduler.events - .onEach { event -> - when (event) { - is VirtProvisioningEvent.MetricsAvailable -> - reporter.reportProvisionerMetrics(clock.millis(), event) - } - } - .launchIn(domain) -} - -/** - * Process the trace. - */ -suspend fun processTrace(reader: TraceReader, scheduler: SimpleVirtProvisioningService, chan: Channel, reporter: ExperimentReporter, vmPlacements: Map = emptyMap()) { - val domain = simulationContext.domain - - try { - var submitted = 0 - val finished = Channel(Channel.CONFLATED) - val hypervisors = TreeSet(scheduler.drivers().map { (it as SimpleVirtDriver).server.name }) - - while (reader.hasNext()) { - val (time, workload) = reader.next() - - if (vmPlacements.isNotEmpty()) { - val vmId = workload.name.replace("VM Workload ", "") - // Check if VM in topology - val clusterName = vmPlacements[vmId] - if (clusterName == null) { - logger.warn { "Could not find placement data in VM placement file for VM $vmId" } - continue - } - val machineInCluster = hypervisors.ceiling(clusterName)?.contains(clusterName) ?: false - if (machineInCluster) { - logger.info { "Ignored VM $vmId" } - continue - } - } - - submitted++ - delay(max(0, time - simulationContext.clock.millis())) - domain.launch { - chan.send(Unit) - val server = scheduler.deploy( - workload.image.name, workload.image, - Flavor(workload.image.maxCores, workload.image.requiredMemory) - ) - // Monitor server events - server.events - .onEach { - val time = simulationContext.clock.millis() - - if (it is ServerEvent.StateChanged) { - reporter.reportVmStateChange(time, it.server) - } - - delay(1) - finished.send(Unit) - } - .collect() - } - } - - while (scheduler.finishedVms + scheduler.unscheduledVms != submitted) { - finished.receive() - } - } finally { - reader.close() - } -} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt deleted file mode 100644 index e6fd504e..00000000 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt +++ /dev/null @@ -1,216 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.experiments.sc20 - -import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel -import com.atlarge.opendc.compute.core.workload.VmWorkload -import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporterProvider -import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader -import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader -import com.atlarge.opendc.experiments.sc20.util.DatabaseHelper -import com.atlarge.opendc.format.environment.EnvironmentReader -import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader -import com.atlarge.opendc.format.trace.TraceReader -import me.tongfei.progressbar.ProgressBar -import mu.KotlinLogging -import java.io.Closeable -import java.io.File -import java.util.concurrent.ExecutorCompletionService -import java.util.concurrent.Executors -import javax.sql.DataSource - -/** - * The logger for the experiment runner. - */ -private val logger = KotlinLogging.logger {} - -/** - * The experiment runner is responsible for orchestrating the simulation runs of an experiment. - * - * @param portfolios The portfolios to consider. - * @param ds The data source to write the experimental results to. - */ -public class ExperimentRunner( - private val portfolios: List, - private val ds: DataSource, - private val reporterProvider: ExperimentReporterProvider, - private val environmentPath: File, - private val tracePath: File, - private val performanceInterferenceModel: PerformanceInterferenceModel?, - private val parallelism: Int = Runtime.getRuntime().availableProcessors() -) : Closeable { - /** - * The database helper to write the execution plan. - */ - private val helper = DatabaseHelper(ds.connection) - - /** - * The experiment identifier. - */ - private var experimentId = -1L - - /** - * The mapping of portfolios to their ids. - */ - private val portfolioIds = mutableMapOf() - - /** - * The mapping of scenarios to their ids. - */ - private val scenarioIds = mutableMapOf() - - init { - reporterProvider.init(ds) - } - - /** - * Create an execution plan - */ - private fun createPlan(): List { - val runs = mutableListOf() - - for (portfolio in portfolios) { - val portfolioId = helper.persist(portfolio, experimentId) - portfolioIds[portfolio] = portfolioId - var scenarios = 0 - var runCount = 0 - - for (scenario in portfolio.scenarios) { - val scenarioId = helper.persist(scenario, portfolioId) - scenarioIds[scenario] = scenarioId - scenarios++ - - for (run in scenario.runs) { - helper.persist(run, scenarioId) - runCount++ - runs.add(run) - } - } - - logger.info { "Portfolio $portfolioId: ${portfolio.name} ($scenarios scenarios, $runCount runs total)" } - } - - return runs - } - - /** - * The raw parquet trace readers that are shared across simulations. - */ - private val rawTraceReaders = mutableMapOf() - - /** - * Create a trace reader for the specified trace. - */ - private fun createTraceReader( - name: String, - performanceInterferenceModel: PerformanceInterferenceModel?, - run: Run - ): TraceReader { - val raw = rawTraceReaders.getValue(name) - return Sc20ParquetTraceReader( - raw, - performanceInterferenceModel, - run - ) - } - - /** - * Create the environment reader for the specified environment. - */ - private fun createEnvironmentReader(name: String): EnvironmentReader { - return Sc20ClusterEnvironmentReader(File(environmentPath, "$name.txt")) - } - - /** - * Run the portfolios. - */ - @OptIn(ExperimentalStdlibApi::class) - public fun run() { - experimentId = helper.createExperiment() - logger.info { "Creating execution plan for experiment $experimentId" } - - val plan = createPlan() - val total = plan.size - val completionService = ExecutorCompletionService(Executors.newCachedThreadPool()) - val pb = ProgressBar("Experiment", total.toLong()) - - var running = 0 - - for (run in plan) { - if (running >= parallelism) { - completionService.take() - running-- - } - - val scenarioId = scenarioIds[run.scenario]!! - - rawTraceReaders.computeIfAbsent(run.scenario.workload.name) { name -> - logger.info { "Loading trace $name" } - Sc20RawParquetTraceReader(File(tracePath, name)) - } - - completionService.submit { - pb.extraMessage = "($scenarioId, ${run.id}) START" - - var hasFailed = false - synchronized(helper) { - helper.startRun(scenarioId, run.id) - } - - try { - val reporter = reporterProvider.createReporter(scenarioIds[run.scenario]!!, run.id) - val traceReader = - createTraceReader(run.scenario.workload.name, performanceInterferenceModel, run) - val environmentReader = createEnvironmentReader(run.scenario.topology.name) - - try { - run.scenario(run, reporter, environmentReader, traceReader) - } finally { - reporter.close() - } - - pb.extraMessage = "($scenarioId, ${run.id}) OK" - } catch (e: Throwable) { - logger.error("A run has failed", e) - hasFailed = true - pb.extraMessage = "($scenarioId, ${run.id}) FAIL" - } finally { - synchronized(helper) { - helper.finishRun(scenarioId, run.id, hasFailed = hasFailed) - } - - pb.step() - } - } - - running++ - } - } - - override fun close() { - reporterProvider.close() - helper.close() - } -} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt deleted file mode 100644 index 631c1085..00000000 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt +++ /dev/null @@ -1,219 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.experiments.sc20 - -import com.atlarge.opendc.experiments.sc20.reporter.ExperimentParquetReporter -import com.atlarge.opendc.experiments.sc20.reporter.ExperimentPostgresReporter -import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporter -import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporterProvider -import com.atlarge.opendc.experiments.sc20.reporter.ParquetHostMetricsWriter -import com.atlarge.opendc.experiments.sc20.reporter.ParquetProvisionerMetricsWriter -import com.atlarge.opendc.experiments.sc20.reporter.PostgresHostMetricsWriter -import com.atlarge.opendc.experiments.sc20.reporter.PostgresProvisionerMetricsWriter -import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader -import com.atlarge.opendc.format.trace.sc20.Sc20VmPlacementReader -import com.github.ajalt.clikt.core.CliktCommand -import com.github.ajalt.clikt.parameters.groups.OptionGroup -import com.github.ajalt.clikt.parameters.groups.groupChoice -import com.github.ajalt.clikt.parameters.groups.required -import com.github.ajalt.clikt.parameters.options.convert -import com.github.ajalt.clikt.parameters.options.default -import com.github.ajalt.clikt.parameters.options.defaultLazy -import com.github.ajalt.clikt.parameters.options.multiple -import com.github.ajalt.clikt.parameters.options.option -import com.github.ajalt.clikt.parameters.options.required -import com.github.ajalt.clikt.parameters.types.choice -import com.github.ajalt.clikt.parameters.types.file -import com.github.ajalt.clikt.parameters.types.int -import com.zaxxer.hikari.HikariDataSource -import mu.KotlinLogging -import java.io.File -import java.io.InputStream -import javax.sql.DataSource - -/** - * The logger for this experiment. - */ -private val logger = KotlinLogging.logger {} - -/** - * Represents the command for running the experiment. - */ -class ExperimentCli : CliktCommand(name = "sc20-experiment") { - /** - * The JDBC connection url to use. - */ - private val jdbcUrl by option("--jdbc-url", help = "JDBC connection url").required() - - /** - * The path to the directory where the topology descriptions are located. - */ - private val environmentPath by option("--environment-path", help = "path to the environment directory") - .file(canBeFile = false) - .required() - - /** - * The path to the directory where the traces are located. - */ - private val tracePath by option("--trace-path", help = "path to the traces directory") - .file(canBeFile = false) - .required() - - /** - * The path to the performance interference model. - */ - private val performanceInterferenceStream by option("--performance-interference-model", help = "path to the performance interference file") - .file() - .convert { it.inputStream() as InputStream } - - /** - * The path to the original VM placements file. - */ - private val vmPlacements by option("--vm-placements-file", help = "path to the VM placement file") - .file() - .convert { - Sc20VmPlacementReader(it.inputStream().buffered()).construct() - } - .default(emptyMap()) - - /** - * The type of reporter to use. - */ - private val reporter by option().groupChoice( - "parquet" to Reporter.Parquet(), - "postgres" to Reporter.Postgres() - ).required() - - /** - * The selected portfolios to run. - */ - private val portfolios by option("--portfolio") - .choice( - "hor-ver" to HorVerPortfolio, - "more-velocity" to MoreVelocityPortfolio, - "more-hpc" to MoreHpcPortfolio, - "operational-phenomena" to OperationalPhenomenaPortfolio, - ignoreCase = true - ) - .multiple() - - /** - * The maximum number of worker threads to use. - */ - private val workerParallelism by option("--worker-parallelism") - .int() - .default(Runtime.getRuntime().availableProcessors()) - - /** - * The maximum number of host writer threads to use. - */ - private val hostWriterParallelism by option("--host-writer-parallelism") - .int() - .default(8) - - /** - * The maximum number of provisioner writer threads to use. - */ - private val provisionerWriterParallelism by option("--provisioner-writer-parallelism") - .int() - .default(1) - - /** - * The buffer size for writing results. - */ - private val bufferSize by option("--buffer-size") - .int() - .default(4096) - - override fun run() { - val ds = HikariDataSource() - ds.maximumPoolSize = Runtime.getRuntime().availableProcessors() * 3 - ds.jdbcUrl = jdbcUrl - ds.addDataSourceProperty("reWriteBatchedInserts", "true") - - reporter.bufferSize = bufferSize - reporter.hostParallelism = hostWriterParallelism - reporter.provisionerParallelism = provisionerWriterParallelism - - val performanceInterferenceModel = - performanceInterferenceStream?.let { Sc20PerformanceInterferenceReader(it).construct() } - - val runner = ExperimentRunner(portfolios, ds, reporter, environmentPath, tracePath, performanceInterferenceModel, workerParallelism) - - try { - runner.run() - } finally { - runner.close() - ds.close() - } - } -} - -/** - * An option for specifying the type of reporter to use. - */ -internal sealed class Reporter(name: String) : OptionGroup(name), ExperimentReporterProvider { - var bufferSize = 4096 - var hostParallelism = 8 - var provisionerParallelism = 1 - - class Parquet : Reporter("Options for reporting using Parquet") { - private val path by option("--parquet-directory", help = "path to where the output should be stored") - .file() - .defaultLazy { File("data") } - - override fun createReporter(scenario: Long, run: Int): ExperimentReporter { - val hostWriter = ParquetHostMetricsWriter(File(path, "$scenario-$run-host.parquet"), bufferSize) - val provisionerWriter = ParquetProvisionerMetricsWriter(File(path, "$scenario-$run-provisioner.parquet"), bufferSize) - return ExperimentParquetReporter(scenario, run, hostWriter, provisionerWriter) - } - - override fun close() {} - } - - class Postgres : Reporter("Options for reporting using PostgreSQL") { - lateinit var hostWriter: PostgresHostMetricsWriter - lateinit var provisionerWriter: PostgresProvisionerMetricsWriter - - override fun init(ds: DataSource) { - hostWriter = PostgresHostMetricsWriter(ds, hostParallelism, bufferSize) - provisionerWriter = PostgresProvisionerMetricsWriter(ds, provisionerParallelism, bufferSize) - } - - override fun createReporter(scenario: Long, run: Int): ExperimentReporter { - return ExperimentPostgresReporter(scenario, run, hostWriter, provisionerWriter) - } - - override fun close() { - hostWriter.close() - provisionerWriter.close() - } - } -} - -/** - * Main entry point of the experiment. - */ -fun main(args: Array) = ExperimentCli().main(args) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt new file mode 100644 index 00000000..e17a145c --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt @@ -0,0 +1,157 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20 + +import com.atlarge.opendc.experiments.sc20.experiment.Experiment +import com.atlarge.opendc.experiments.sc20.experiment.HorVerPortfolio +import com.atlarge.opendc.experiments.sc20.experiment.MoreHpcPortfolio +import com.atlarge.opendc.experiments.sc20.experiment.MoreVelocityPortfolio +import com.atlarge.opendc.experiments.sc20.experiment.OperationalPhenomenaPortfolio +import com.atlarge.opendc.experiments.sc20.experiment.Portfolio +import com.atlarge.opendc.experiments.sc20.reporter.ConsoleExperimentReporter +import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor +import com.atlarge.opendc.experiments.sc20.runner.execution.ThreadPoolExperimentScheduler +import com.atlarge.opendc.experiments.sc20.runner.internal.DefaultExperimentRunner +import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader +import com.atlarge.opendc.format.trace.sc20.Sc20VmPlacementReader +import com.github.ajalt.clikt.core.CliktCommand +import com.github.ajalt.clikt.parameters.options.convert +import com.github.ajalt.clikt.parameters.options.default +import com.github.ajalt.clikt.parameters.options.defaultLazy +import com.github.ajalt.clikt.parameters.options.multiple +import com.github.ajalt.clikt.parameters.options.option +import com.github.ajalt.clikt.parameters.options.required +import com.github.ajalt.clikt.parameters.types.choice +import com.github.ajalt.clikt.parameters.types.file +import com.github.ajalt.clikt.parameters.types.int +import mu.KotlinLogging +import java.io.File +import java.io.InputStream + +/** + * The logger for this experiment. + */ +private val logger = KotlinLogging.logger {} + +/** + * Represents the command for running the experiment. + */ +class ExperimentCli : CliktCommand(name = "sc20-experiment") { + /** + * The path to the directory where the topology descriptions are located. + */ + private val environmentPath by option("--environment-path", help = "path to the environment directory") + .file(canBeFile = false) + .required() + + /** + * The path to the directory where the traces are located. + */ + private val tracePath by option("--trace-path", help = "path to the traces directory") + .file(canBeFile = false) + .required() + + /** + * The path to the performance interference model. + */ + private val performanceInterferenceStream by option("--performance-interference-model", help = "path to the performance interference file") + .file() + .convert { it.inputStream() as InputStream } + + /** + * The path to the original VM placements file. + */ + private val vmPlacements by option("--vm-placements-file", help = "path to the VM placement file") + .file() + .convert { + Sc20VmPlacementReader(it.inputStream().buffered()).construct() + } + .default(emptyMap()) + + /** + * The selected portfolios to run. + */ + private val portfolios by option("--portfolio") + .choice( + "hor-ver" to { experiment: Experiment, i: Int -> HorVerPortfolio(experiment, i) } as (Experiment, Int) -> Portfolio, + "more-velocity" to ({ experiment, i -> MoreVelocityPortfolio(experiment, i) }), + "more-hpc" to { experiment, i -> MoreHpcPortfolio(experiment, i) }, + "operational-phenomena" to { experiment, i -> OperationalPhenomenaPortfolio(experiment, i) }, + ignoreCase = true + ) + .multiple() + + /** + * The maximum number of worker threads to use. + */ + private val parallelism by option("--parallelism") + .int() + .default(Runtime.getRuntime().availableProcessors()) + + /** + * The buffer size for writing results. + */ + private val bufferSize by option("--buffer-size") + .int() + .default(4096) + + /** + * The path to the output directory. + */ + private val output by option("-O", "--output", help = "path to the output directory") + .file(canBeFile = false) + .defaultLazy { File("data") } + + override fun run() { + logger.info { "Constructing performance interference model" } + + val performanceInterferenceModel = + performanceInterferenceStream?.let { Sc20PerformanceInterferenceReader(it).construct() } + + logger.info { "Creating experiment descriptor" } + val descriptor = object : Experiment(environmentPath, tracePath, output, performanceInterferenceModel, vmPlacements, bufferSize) { + private val descriptor = this + override val children: Sequence = sequence { + for ((i, producer) in portfolios.withIndex()) { + yield(producer(descriptor, i)) + } + } + } + + logger.info { "Starting experiment runner [parallelism=$parallelism]" } + val scheduler = ThreadPoolExperimentScheduler(parallelism) + val runner = DefaultExperimentRunner(scheduler) + try { + runner.execute(descriptor, ConsoleExperimentReporter()) + } finally { + scheduler.close() + } + } +} + +/** + * Main entry point of the experiment. + */ +fun main(args: Array) = ExperimentCli().main(args) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolio.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolio.kt deleted file mode 100644 index 34505fce..00000000 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolio.kt +++ /dev/null @@ -1,35 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.experiments.sc20 - -/** - * A portfolio represents a collection of scenarios are tested. - */ -public abstract class Portfolio(val name: String) { - /** - * The scenarios of this portfolio consists of. - */ - abstract val scenarios: Sequence -} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt deleted file mode 100644 index 668304b6..00000000 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt +++ /dev/null @@ -1,165 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.experiments.sc20 - -abstract class AbstractSc20Portfolio(name: String) : Portfolio(name) { - abstract val topologies: List - abstract val workloads: List - abstract val operationalPhenomena: List> - abstract val allocationPolicies: List - - open val repetitions = 8 - - override val scenarios: Sequence = sequence { - for (topology in topologies) { - for (workload in workloads) { - for ((failureFrequency, hasInterference) in operationalPhenomena) { - for (allocationPolicy in allocationPolicies) { - yield( - Scenario( - this@AbstractSc20Portfolio, - repetitions, - topology, - workload, - allocationPolicy, - failureFrequency, - hasInterference - ) - ) - } - } - } - } - } -} - -private val defaultFailureInterval = 24.0 * 7 - -object HorVerPortfolio : AbstractSc20Portfolio("horizontal_vs_vertical") { - override val topologies = listOf( - Topology("base"), - Topology("rep-vol-hor-hom"), - Topology("rep-vol-hor-het"), - Topology("rep-vol-ver-hom"), - Topology("rep-vol-ver-het"), - Topology("exp-vol-hor-hom"), - Topology("exp-vol-hor-het"), - Topology("exp-vol-ver-hom"), - Topology("exp-vol-ver-het") - ) - - override val workloads = listOf( - // Workload("solvinity", 0.1), - // Workload("solvinity", 0.25), - Workload("solvinity", 0.5), - Workload("solvinity", 1.0) - ) - - override val operationalPhenomena = listOf( - defaultFailureInterval to true - ) - - override val allocationPolicies = listOf( - "active-servers" - ) -} - -object MoreVelocityPortfolio : AbstractSc20Portfolio("more_velocity") { - override val topologies = listOf( - Topology("base"), - Topology("rep-vel-ver-hom"), - Topology("rep-vel-ver-het"), - Topology("exp-vel-ver-hom"), - Topology("exp-vel-ver-het") - ) - - override val workloads = listOf( - // Workload("solvinity", 0.1), - // Workload("solvinity", 0.25), - Workload("solvinity", 0.5), - Workload("solvinity", 1.0) - ) - - override val operationalPhenomena = listOf( - defaultFailureInterval to true - ) - - override val allocationPolicies = listOf( - "active-servers" - ) -} - -object MoreHpcPortfolio : AbstractSc20Portfolio("more_hpc") { - override val topologies = listOf( - Topology("base"), - Topology("exp-vol-hor-hom"), - Topology("exp-vol-ver-hom"), - Topology("exp-vel-ver-hom") - ) - - override val workloads = listOf( - // Workload("solvinity", 0.1), - // Workload("solvinity", 0.25), - Workload("solvinity", 0.5), - Workload("solvinity", 1.0) - ) - - override val operationalPhenomena = listOf( - defaultFailureInterval to true - ) - - override val allocationPolicies = listOf( - "active-servers" - ) -} - -object OperationalPhenomenaPortfolio : AbstractSc20Portfolio("operational_phenomena") { - override val topologies = listOf( - Topology("base") - ) - - override val workloads = listOf( - // Workload("solvinity", 0.1), - // Workload("solvinity", 0.25), - Workload("solvinity", 1.0) - ) - - override val operationalPhenomena = listOf( - defaultFailureInterval to true, - 0.0 to true, - defaultFailureInterval to false, - defaultFailureInterval to true - ) - - override val allocationPolicies = listOf( - "mem", - "mem-inv", - "core-mem", - "core-mem-inv", - "active-servers", - "active-servers-inv", - "random" - ) -} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Run.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Run.kt deleted file mode 100644 index b2151b16..00000000 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Run.kt +++ /dev/null @@ -1,31 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.experiments.sc20 - -/** - * An experiment run represent a single invocation of a trial and is used to distinguish between repetitions of the - * same set of parameters. - */ -public data class Run(val scenario: Scenario, val id: Int, val seed: Int) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Scenario.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Scenario.kt deleted file mode 100644 index 457255cb..00000000 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Scenario.kt +++ /dev/null @@ -1,128 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.experiments.sc20 - -import com.atlarge.odcsim.SimulationEngineProvider -import com.atlarge.opendc.compute.core.workload.VmWorkload -import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy -import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy -import com.atlarge.opendc.compute.virt.service.allocation.NumberOfActiveServersAllocationPolicy -import com.atlarge.opendc.compute.virt.service.allocation.ProvisionedCoresAllocationPolicy -import com.atlarge.opendc.compute.virt.service.allocation.RandomAllocationPolicy -import com.atlarge.opendc.compute.virt.service.allocation.ReplayAllocationPolicy -import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporter -import com.atlarge.opendc.format.environment.EnvironmentReader -import com.atlarge.opendc.format.trace.TraceReader -import kotlinx.coroutines.cancel -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking -import mu.KotlinLogging -import java.util.ServiceLoader -import kotlin.random.Random - -/** - * The logger for the experiment scenario. - */ -private val logger = KotlinLogging.logger {} - -/** - * The provider for the simulation engine to use. - */ -private val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - -/** - * A scenario represents a single point in the design space (a unique combination of parameters). - */ -public class Scenario( - val portfolio: Portfolio, - val repetitions: Int, - val topology: Topology, - val workload: Workload, - val allocationPolicy: String, - val failureFrequency: Double, - val hasInterference: Boolean -) { - /** - * The runs this scenario consists of. - */ - public val runs: Sequence = sequence { - repeat(repetitions) { i -> - yield(Run(this@Scenario, i, i)) - } - } - - /** - * Perform a single run of this scenario. - */ - public operator fun invoke(run: Run, reporter: ExperimentReporter, environment: EnvironmentReader, trace: TraceReader) { - val system = provider("experiment-${run.id}") - val root = system.newDomain("root") - val seeder = Random(run.seed) - - val chan = Channel(Channel.CONFLATED) - val allocationPolicy = when (this.allocationPolicy) { - "mem" -> AvailableMemoryAllocationPolicy() - "mem-inv" -> AvailableMemoryAllocationPolicy(true) - "core-mem" -> AvailableCoreMemoryAllocationPolicy() - "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true) - "active-servers" -> NumberOfActiveServersAllocationPolicy() - "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true) - "provisioned-cores" -> ProvisionedCoresAllocationPolicy() - "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true) - "random" -> RandomAllocationPolicy(Random(seeder.nextInt())) - "replay" -> ReplayAllocationPolicy(emptyMap()) - else -> throw IllegalArgumentException("Unknown policy ${this.allocationPolicy}") - } - - root.launch { - val (bareMetalProvisioner, scheduler) = createProvisioner(root, environment, allocationPolicy) - - val failureDomain = if (failureFrequency > 0) { - logger.debug("ENABLING failures") - createFailureDomain(seeder.nextInt(), failureFrequency, bareMetalProvisioner, chan) - } else { - null - } - - attachMonitor(scheduler, reporter) - processTrace(trace, scheduler, chan, reporter, emptyMap()) - - logger.debug("SUBMIT=${scheduler.submittedVms}") - logger.debug("FAIL=${scheduler.unscheduledVms}") - logger.debug("QUEUED=${scheduler.queuedVms}") - logger.debug("RUNNING=${scheduler.runningVms}") - logger.debug("FINISHED=${scheduler.finishedVms}") - - failureDomain?.cancel() - scheduler.terminate() - } - - runBlocking { - system.run() - system.terminate() - } - } -} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Topology.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Topology.kt deleted file mode 100644 index d2be9599..00000000 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Topology.kt +++ /dev/null @@ -1,30 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.experiments.sc20 - -/** - * The datacenter topology on which we test the workload. - */ -public data class Topology(val name: String) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Workload.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Workload.kt deleted file mode 100644 index 4ab5ec8c..00000000 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Workload.kt +++ /dev/null @@ -1,30 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.experiments.sc20 - -/** - * A workload that is considered for a scenario. - */ -public class Workload(val name: String, val fraction: Double) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/WorkloadSampler.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/WorkloadSampler.kt deleted file mode 100644 index 99634e1b..00000000 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/WorkloadSampler.kt +++ /dev/null @@ -1,68 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.experiments.sc20 - -import com.atlarge.opendc.compute.core.workload.VmWorkload -import com.atlarge.opendc.format.trace.TraceEntry -import mu.KotlinLogging -import kotlin.random.Random - -private val logger = KotlinLogging.logger {} - -/** - * Sample the workload for the specified [run]. - */ -fun sampleWorkload(trace: List>, run: Run): List> { - return sampleRegularWorkload(trace, run) -} - -/** - * Sample a regular (non-HPC) workload. - */ -fun sampleRegularWorkload(trace: List>, run: Run): List> { - val fraction = run.scenario.workload.fraction - if (fraction >= 1) { - return trace - } - - val shuffled = trace.shuffled(Random(run.seed)) - val res = mutableListOf>() - val totalLoad = shuffled.sumByDouble { it.workload.image.tags.getValue("total-load") as Double } - var currentLoad = 0.0 - - for (entry in shuffled) { - val entryLoad = entry.workload.image.tags.getValue("total-load") as Double - if ((currentLoad + entryLoad) / totalLoad > fraction) { - break - } - - currentLoad += entryLoad - res += entry - } - - logger.info { "Sampled ${trace.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } - - return res -} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Experiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Experiment.kt new file mode 100644 index 00000000..5feb5917 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Experiment.kt @@ -0,0 +1,78 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.experiment + +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel +import com.atlarge.opendc.experiments.sc20.runner.ContainerExperimentDescriptor +import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor +import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionContext +import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionListener +import com.atlarge.opendc.experiments.sc20.telemetry.RunEvent +import com.atlarge.opendc.experiments.sc20.telemetry.parquet.ParquetRunEventWriter +import java.io.File + +/** + * The global configuration of the experiment. + * + * @param environments The path to the topologies directory. + * @param traces The path to the traces directory. + * @param output The output directory. + * @param performanceInterferenceModel The optional performance interference model that has been specified. + * @param vmPlacements Original VM placement in the trace. + * @param bufferSize The buffer size of the event reporters. + */ +public abstract class Experiment( + val environments: File, + val traces: File, + val output: File, + val performanceInterferenceModel: PerformanceInterferenceModel?, + val vmPlacements: Map, + val bufferSize: Int +) : ContainerExperimentDescriptor() { + override val parent: ExperimentDescriptor? = null + + override suspend fun invoke(context: ExperimentExecutionContext) { + val writer = ParquetRunEventWriter(File(output, "experiments.parquet"), bufferSize) + try { + val listener = object : ExperimentExecutionListener by context.listener { + override fun descriptorRegistered(descriptor: ExperimentDescriptor) { + if (descriptor is Run) { + writer.write(RunEvent(descriptor, System.currentTimeMillis())) + } + + context.listener.descriptorRegistered(descriptor) + } + } + + val newContext = object : ExperimentExecutionContext by context { + override val listener: ExperimentExecutionListener = listener + } + + super.invoke(newContext) + } finally { + writer.close() + } + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt new file mode 100644 index 00000000..32dc87ef --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt @@ -0,0 +1,264 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.experiment + +import com.atlarge.odcsim.Domain +import com.atlarge.odcsim.simulationContext +import com.atlarge.opendc.compute.core.Flavor +import com.atlarge.opendc.compute.core.ServerEvent +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel +import com.atlarge.opendc.compute.core.workload.VmWorkload +import com.atlarge.opendc.compute.metal.NODE_CLUSTER +import com.atlarge.opendc.compute.metal.driver.BareMetalDriver +import com.atlarge.opendc.compute.metal.service.ProvisioningService +import com.atlarge.opendc.compute.virt.HypervisorEvent +import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver +import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService +import com.atlarge.opendc.compute.virt.service.VirtProvisioningEvent +import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy +import com.atlarge.opendc.core.failure.CorrelatedFaultInjector +import com.atlarge.opendc.core.failure.FailureDomain +import com.atlarge.opendc.core.failure.FaultInjector +import com.atlarge.opendc.experiments.sc20.experiment.monitor.ExperimentMonitor +import com.atlarge.opendc.experiments.sc20.trace.Sc20StreamingParquetTraceReader +import com.atlarge.opendc.format.environment.EnvironmentReader +import com.atlarge.opendc.format.trace.TraceReader +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext +import mu.KotlinLogging +import java.io.File +import java.util.TreeSet +import kotlin.math.ln +import kotlin.math.max +import kotlin.random.Random + +/** + * The logger for this experiment. + */ +private val logger = KotlinLogging.logger {} + +/** + * Construct the failure domain for the experiments. + */ +suspend fun createFailureDomain( + seed: Int, + failureInterval: Double, + bareMetalProvisioner: ProvisioningService, + chan: Channel +): Domain { + val root = simulationContext.domain + val domain = root.newDomain(name = "failures") + domain.launch { + chan.receive() + val random = Random(seed) + val injectors = mutableMapOf() + for (node in bareMetalProvisioner.nodes()) { + val cluster = node.metadata[NODE_CLUSTER] as String + val injector = + injectors.getOrPut(cluster) { + createFaultInjector( + simulationContext.domain, + random, + failureInterval + ) + } + injector.enqueue(node.metadata["driver"] as FailureDomain) + } + } + return domain +} + +/** + * Obtain the [FaultInjector] to use for the experiments. + */ +fun createFaultInjector(domain: Domain, random: Random, failureInterval: Double): FaultInjector { + // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 + // GRID'5000 + return CorrelatedFaultInjector( + domain, + iatScale = ln(failureInterval), iatShape = 1.03, // Hours + sizeScale = 1.88, sizeShape = 1.25, + dScale = 9.51, dShape = 3.21, // Minutes + random = random + ) +} + +/** + * Create the trace reader from which the VM workloads are read. + */ +fun createTraceReader(path: File, performanceInterferenceModel: PerformanceInterferenceModel, vms: List, seed: Int): Sc20StreamingParquetTraceReader { + return Sc20StreamingParquetTraceReader( + path, + performanceInterferenceModel, + vms, + Random(seed) + ) +} + +/** + * Construct the environment for a VM provisioner and return the provisioner instance. + */ +suspend fun createProvisioner( + root: Domain, + environmentReader: EnvironmentReader, + allocationPolicy: AllocationPolicy +): Pair = withContext(root.coroutineContext) { + val environment = environmentReader.use { it.construct(root) } + val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService] + + // Wait for the bare metal nodes to be spawned + delay(10) + + val scheduler = SimpleVirtProvisioningService(allocationPolicy, simulationContext, bareMetalProvisioner) + + // Wait for the hypervisors to be spawned + delay(10) + + bareMetalProvisioner to scheduler +} + +/** + * Attach the specified monitor to the VM provisioner. + */ +@OptIn(ExperimentalCoroutinesApi::class) +suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: ExperimentMonitor) { + val domain = simulationContext.domain + val clock = simulationContext.clock + val hypervisors = scheduler.drivers() + + // Monitor hypervisor events + for (hypervisor in hypervisors) { + // TODO Do not expose VirtDriver directly but use Hypervisor class. + monitor.reportHostStateChange(clock.millis(), hypervisor, (hypervisor as SimpleVirtDriver).server) + hypervisor.server.events + .onEach { event -> + val time = clock.millis() + when (event) { + is ServerEvent.StateChanged -> { + monitor.reportHostStateChange(time, hypervisor, event.server) + } + } + } + .launchIn(domain) + hypervisor.events + .onEach { event -> + when (event) { + is HypervisorEvent.SliceFinished -> monitor.reportHostSlice( + simulationContext.clock.millis(), + event.requestedBurst, + event.grantedBurst, + event.overcommissionedBurst, + event.interferedBurst, + event.cpuUsage, + event.cpuDemand, + event.numberOfDeployedImages, + event.hostServer + ) + } + } + .launchIn(domain) + + val driver = hypervisor.server.services[BareMetalDriver.Key] + driver.powerDraw + .onEach { monitor.reportPowerConsumption(hypervisor.server, it) } + .launchIn(domain) + } + + scheduler.events + .onEach { event -> + when (event) { + is VirtProvisioningEvent.MetricsAvailable -> + monitor.reportProvisionerMetrics(clock.millis(), event) + } + } + .launchIn(domain) +} + +/** + * Process the trace. + */ +suspend fun processTrace(reader: TraceReader, scheduler: SimpleVirtProvisioningService, chan: Channel, monitor: ExperimentMonitor, vmPlacements: Map = emptyMap()) { + val domain = simulationContext.domain + + try { + var submitted = 0 + val finished = Channel(Channel.CONFLATED) + val hypervisors = TreeSet(scheduler.drivers().map { (it as SimpleVirtDriver).server.name }) + + while (reader.hasNext()) { + val (time, workload) = reader.next() + + if (vmPlacements.isNotEmpty()) { + val vmId = workload.name.replace("VM Workload ", "") + // Check if VM in topology + val clusterName = vmPlacements[vmId] + if (clusterName == null) { + logger.warn { "Could not find placement data in VM placement file for VM $vmId" } + continue + } + val machineInCluster = hypervisors.ceiling(clusterName)?.contains(clusterName) ?: false + if (machineInCluster) { + logger.info { "Ignored VM $vmId" } + continue + } + } + + submitted++ + delay(max(0, time - simulationContext.clock.millis())) + domain.launch { + chan.send(Unit) + val server = scheduler.deploy( + workload.image.name, workload.image, + Flavor(workload.image.maxCores, workload.image.requiredMemory) + ) + // Monitor server events + server.events + .onEach { + val time = simulationContext.clock.millis() + + if (it is ServerEvent.StateChanged) { + monitor.reportVmStateChange(time, it.server) + } + + delay(1) + finished.send(Unit) + } + .collect() + } + } + + while (scheduler.finishedVms + scheduler.unscheduledVms != submitted) { + finished.receive() + } + } finally { + reader.close() + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolio.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolio.kt new file mode 100644 index 00000000..6a40f5fb --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolio.kt @@ -0,0 +1,90 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.experiment + +import com.atlarge.opendc.experiments.sc20.experiment.model.OperationalPhenomena +import com.atlarge.opendc.experiments.sc20.experiment.model.Topology +import com.atlarge.opendc.experiments.sc20.experiment.model.Workload +import com.atlarge.opendc.experiments.sc20.runner.ContainerExperimentDescriptor + +/** + * A portfolio represents a collection of scenarios are tested. + */ +public abstract class Portfolio( + override val parent: Experiment, + val id: Int, + val name: String +) : ContainerExperimentDescriptor() { + /** + * The topologies to consider. + */ + protected abstract val topologies: List + + /** + * The workloads to consider. + */ + protected abstract val workloads: List + + /** + * The operational phenomenas to consider. + */ + protected abstract val operationalPhenomenas: List + + /** + * The allocation policies to consider. + */ + protected abstract val allocationPolicies: List + + /** + * The number of repetitions to perform. + */ + open val repetitions: Int = 32 + + /** + * Resolve the children of this container. + */ + override val children: Sequence = sequence { + var id = 0 + for (topology in topologies) { + for (workload in workloads) { + for (operationalPhenomena in operationalPhenomenas) { + for (allocationPolicy in allocationPolicies) { + yield( + Scenario( + this@Portfolio, + id++, + repetitions, + topology, + workload, + allocationPolicy, + operationalPhenomena + ) + ) + } + } + } + } + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt new file mode 100644 index 00000000..4800ceba --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt @@ -0,0 +1,136 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.experiment + +import com.atlarge.opendc.experiments.sc20.experiment.model.OperationalPhenomena +import com.atlarge.opendc.experiments.sc20.experiment.model.Topology +import com.atlarge.opendc.experiments.sc20.experiment.model.Workload + +public class HorVerPortfolio(parent: Experiment, id: Int) : Portfolio(parent, id, "horizontal_vs_vertical") { + override val topologies = listOf( + Topology("base"), + Topology("rep-vol-hor-hom"), + Topology("rep-vol-hor-het"), + Topology("rep-vol-ver-hom"), + Topology("rep-vol-ver-het"), + Topology("exp-vol-hor-hom"), + Topology("exp-vol-hor-het"), + Topology("exp-vol-ver-hom"), + Topology("exp-vol-ver-het") + ) + + override val workloads = listOf( + // Workload("solvinity", 0.1), + // Workload("solvinity", 0.25), + Workload("solvinity", 0.5), + Workload("solvinity", 1.0) + ) + + override val operationalPhenomenas = listOf( + OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true) + ) + + override val allocationPolicies = listOf( + "active-servers" + ) +} + +public class MoreVelocityPortfolio(parent: Experiment, id: Int) : Portfolio(parent, id, "more_velocity") { + override val topologies = listOf( + Topology("base"), + Topology("rep-vel-ver-hom"), + Topology("rep-vel-ver-het"), + Topology("exp-vel-ver-hom"), + Topology("exp-vel-ver-het") + ) + + override val workloads = listOf( + // Workload("solvinity", 0.1), + // Workload("solvinity", 0.25), + Workload("solvinity", 0.5), + Workload("solvinity", 1.0) + ) + + override val operationalPhenomenas = listOf( + OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true) + ) + + override val allocationPolicies = listOf( + "active-servers" + ) +} + +public class MoreHpcPortfolio(parent: Experiment, id: Int) : Portfolio(parent, id, "more_hpc") { + override val topologies = listOf( + Topology("base"), + Topology("exp-vol-hor-hom"), + Topology("exp-vol-ver-hom"), + Topology("exp-vel-ver-hom") + ) + + override val workloads = listOf( + // Workload("solvinity", 0.1), + // Workload("solvinity", 0.25), + Workload("solvinity", 0.5), + Workload("solvinity", 1.0) + ) + + override val operationalPhenomenas = listOf( + OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true) + ) + + override val allocationPolicies = listOf( + "active-servers" + ) +} + +public class OperationalPhenomenaPortfolio(parent: Experiment, id: Int) : Portfolio(parent, id, "operational_phenomena") { + override val topologies = listOf( + Topology("base") + ) + + override val workloads = listOf( + // Workload("solvinity", 0.1), + // Workload("solvinity", 0.25), + Workload("solvinity", 1.0) + ) + + override val operationalPhenomenas = listOf( + OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true), + OperationalPhenomena(failureFrequency = 0.0, hasInterference = true), + OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = false), + OperationalPhenomena(failureFrequency = 0.0, hasInterference = false) + ) + + override val allocationPolicies = listOf( + "mem", + "mem-inv", + "core-mem", + "core-mem-inv", + "active-servers", + "active-servers-inv", + "random" + ) +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt new file mode 100644 index 00000000..6d53fd17 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt @@ -0,0 +1,143 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.experiment + +import com.atlarge.odcsim.SimulationEngineProvider +import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy +import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy +import com.atlarge.opendc.compute.virt.service.allocation.NumberOfActiveServersAllocationPolicy +import com.atlarge.opendc.compute.virt.service.allocation.ProvisionedCoresAllocationPolicy +import com.atlarge.opendc.compute.virt.service.allocation.RandomAllocationPolicy +import com.atlarge.opendc.compute.virt.service.allocation.ReplayAllocationPolicy +import com.atlarge.opendc.experiments.sc20.experiment.monitor.ParquetExperimentMonitor +import com.atlarge.opendc.experiments.sc20.runner.TrialExperimentDescriptor +import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionContext +import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader +import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader +import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader +import kotlinx.coroutines.cancel +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.launch +import mu.KotlinLogging +import java.io.File +import java.util.ServiceLoader +import kotlin.random.Random + +/** + * The logger for the experiment scenario. + */ +private val logger = KotlinLogging.logger {} + +/** + * The provider for the simulation engine to use. + */ +private val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() + +/** + * An experiment run represent a single invocation of a trial and is used to distinguish between repetitions of the + * same set of parameters. + */ +public data class Run(override val parent: Scenario, val id: Int, val seed: Int) : TrialExperimentDescriptor() { + override suspend fun invoke(context: ExperimentExecutionContext) { + val experiment = parent.parent.parent + val system = provider("experiment-$id") + val root = system.newDomain("root") + val seeder = Random(seed) + val environment = Sc20ClusterEnvironmentReader(File(experiment.environments, "${parent.topology.name}.txt")) + + val chan = Channel(Channel.CONFLATED) + val allocationPolicy = when (parent.allocationPolicy) { + "mem" -> AvailableMemoryAllocationPolicy() + "mem-inv" -> AvailableMemoryAllocationPolicy(true) + "core-mem" -> AvailableCoreMemoryAllocationPolicy() + "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true) + "active-servers" -> NumberOfActiveServersAllocationPolicy() + "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true) + "provisioned-cores" -> ProvisionedCoresAllocationPolicy() + "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true) + "random" -> RandomAllocationPolicy(Random(seeder.nextInt())) + "replay" -> ReplayAllocationPolicy(emptyMap()) + else -> throw IllegalArgumentException("Unknown policy ${parent.allocationPolicy}") + } + + @Suppress("UNCHECKED_CAST") + val rawTraceReaders = context.cache.computeIfAbsent("raw-trace-readers") { mutableMapOf() } as MutableMap + val raw = synchronized(rawTraceReaders) { + val name = parent.workload.name + rawTraceReaders.computeIfAbsent(name) { + logger.info { "Loading trace $name" } + Sc20RawParquetTraceReader(File(experiment.traces, name)) + } + } + val trace = Sc20ParquetTraceReader(raw, experiment.performanceInterferenceModel, this) + + val monitor = ParquetExperimentMonitor(this) + + root.launch { + val (bareMetalProvisioner, scheduler) = createProvisioner( + root, + environment, + allocationPolicy + ) + + val failureDomain = if (parent.operationalPhenomena.failureFrequency > 0) { + logger.debug("ENABLING failures") + createFailureDomain( + seeder.nextInt(), + parent.operationalPhenomena.failureFrequency, + bareMetalProvisioner, + chan + ) + } else { + null + } + + attachMonitor(scheduler, monitor) + processTrace( + trace, + scheduler, + chan, + monitor, + experiment.vmPlacements + ) + + logger.debug("SUBMIT=${scheduler.submittedVms}") + logger.debug("FAIL=${scheduler.unscheduledVms}") + logger.debug("QUEUED=${scheduler.queuedVms}") + logger.debug("RUNNING=${scheduler.runningVms}") + logger.debug("FINISHED=${scheduler.finishedVms}") + + failureDomain?.cancel() + scheduler.terminate() + } + + try { + system.run() + } finally { + system.terminate() + monitor.close() + } + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Scenario.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Scenario.kt new file mode 100644 index 00000000..98bc7fc2 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Scenario.kt @@ -0,0 +1,48 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.experiment + +import com.atlarge.opendc.experiments.sc20.experiment.model.OperationalPhenomena +import com.atlarge.opendc.experiments.sc20.experiment.model.Topology +import com.atlarge.opendc.experiments.sc20.experiment.model.Workload +import com.atlarge.opendc.experiments.sc20.runner.ContainerExperimentDescriptor +import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor + +/** + * A scenario represents a single point in the design space (a unique combination of parameters). + */ +public class Scenario( + override val parent: Portfolio, + val id: Int, + val repetitions: Int, + val topology: Topology, + val workload: Workload, + val allocationPolicy: String, + val operationalPhenomena: OperationalPhenomena +) : ContainerExperimentDescriptor() { + override val children: Sequence = sequence { + repeat(repetitions) { i -> yield(Run(this@Scenario, i, i)) } + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/OperationalPhenomena.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/OperationalPhenomena.kt new file mode 100644 index 00000000..af99df84 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/OperationalPhenomena.kt @@ -0,0 +1,33 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.experiment.model + +/** + * Operation phenomena during experiments. + * + * @param failureFrequency The average time between failures in hours. + * @param hasInterference A flag to enable performance interference between VMs. + */ +public data class OperationalPhenomena(val failureFrequency: Double, val hasInterference: Boolean) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Topology.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Topology.kt new file mode 100644 index 00000000..3ed71e09 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Topology.kt @@ -0,0 +1,30 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.experiment.model + +/** + * The datacenter topology on which we test the workload. + */ +public data class Topology(val name: String) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Workload.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Workload.kt new file mode 100644 index 00000000..2dbdf570 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Workload.kt @@ -0,0 +1,30 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.experiment.model + +/** + * A workload that is considered for a scenario. + */ +public class Workload(val name: String, val fraction: Double) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ExperimentMonitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ExperimentMonitor.kt new file mode 100644 index 00000000..1f674f00 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ExperimentMonitor.kt @@ -0,0 +1,75 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.experiment.monitor + +import com.atlarge.opendc.compute.core.Server +import com.atlarge.opendc.compute.virt.driver.VirtDriver +import com.atlarge.opendc.compute.virt.service.VirtProvisioningEvent +import java.io.Closeable + +/** + * A monitor watches the events of an experiment. + */ +interface ExperimentMonitor : Closeable { + /** + * This method is invoked when the state of a VM changes. + */ + fun reportVmStateChange(time: Long, server: Server) {} + + /** + * This method is invoked when the state of a host changes. + */ + fun reportHostStateChange( + time: Long, + driver: VirtDriver, + server: Server + ) {} + + /** + * Report the power consumption of a host. + */ + fun reportPowerConsumption(host: Server, draw: Double) {} + + /** + * This method is invoked for a host for each slice that is finishes. + */ + fun reportHostSlice( + time: Long, + requestedBurst: Long, + grantedBurst: Long, + overcommissionedBurst: Long, + interferedBurst: Long, + cpuUsage: Double, + cpuDemand: Double, + numberOfDeployedImages: Int, + hostServer: Server, + duration: Long = 5 * 60 * 1000L + ) {} + + /** + * This method is invoked for a provisioner event. + */ + fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) {} +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt new file mode 100644 index 00000000..33978aab --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt @@ -0,0 +1,145 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.experiment.monitor + +import com.atlarge.opendc.compute.core.Server +import com.atlarge.opendc.compute.core.ServerState +import com.atlarge.opendc.compute.virt.driver.VirtDriver +import com.atlarge.opendc.compute.virt.service.VirtProvisioningEvent +import com.atlarge.opendc.experiments.sc20.experiment.Run +import com.atlarge.opendc.experiments.sc20.telemetry.HostEvent +import com.atlarge.opendc.experiments.sc20.telemetry.ProvisionerEvent +import com.atlarge.opendc.experiments.sc20.telemetry.parquet.ParquetHostEventWriter +import com.atlarge.opendc.experiments.sc20.telemetry.parquet.ParquetProvisionerEventWriter +import mu.KotlinLogging +import java.io.File + +/** + * The logger instance to use. + */ +private val logger = KotlinLogging.logger {} + +/** + * An [ExperimentMonitor] that logs the events to a Parquet file. + */ +class ParquetExperimentMonitor(val run: Run) : ExperimentMonitor { + private val partition = "portfolio_id=${run.parent.parent.id}/scenario_id=${run.parent.id}/run_id=${run.id}" + private val hostWriter = ParquetHostEventWriter( + File(run.parent.parent.parent.output, "host-metrics/$partition/data.parquet"), + run.parent.parent.parent.bufferSize + ) + private val provisionerWriter = ParquetProvisionerEventWriter( + File(run.parent.parent.parent.output, "provisioner-metrics/$partition/data.parquet"), + run.parent.parent.parent.bufferSize + ) + private val lastServerStates = mutableMapOf>() + + override fun reportVmStateChange(time: Long, server: Server) {} + + override fun reportHostStateChange( + time: Long, + driver: VirtDriver, + server: Server + ) { + logger.debug("Host ${server.uid} changed state ${server.state} [$time]") + + val lastServerState = lastServerStates[server] + if (server.state == ServerState.SHUTOFF && lastServerState != null) { + val duration = time - lastServerState.second + reportHostSlice( + time, + 0, + 0, + 0, + 0, + 0.0, + 0.0, + 0, + server, + duration + ) + + lastServerStates.remove(server) + lastPowerConsumption.remove(server) + } else { + lastServerStates[server] = Pair(server.state, time) + } + } + + private val lastPowerConsumption = mutableMapOf() + + override fun reportPowerConsumption(host: Server, draw: Double) { + lastPowerConsumption[host] = draw + } + + override fun reportHostSlice( + time: Long, + requestedBurst: Long, + grantedBurst: Long, + overcommissionedBurst: Long, + interferedBurst: Long, + cpuUsage: Double, + cpuDemand: Double, + numberOfDeployedImages: Int, + hostServer: Server, + duration: Long + ) { + hostWriter.write( + HostEvent( + time, + duration, + hostServer, + numberOfDeployedImages, + requestedBurst, + grantedBurst, + overcommissionedBurst, + interferedBurst, + cpuUsage, + cpuDemand, + lastPowerConsumption[hostServer] ?: 200.0 + ) + ) + } + + override fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) { + provisionerWriter.write( + ProvisionerEvent( + time, + event.totalHostCount, + event.availableHostCount, + event.totalVmCount, + event.activeVmCount, + event.inactiveVmCount, + event.waitingVmCount, + event.failedVmCount + ) + ) + } + + override fun close() { + hostWriter.close() + provisionerWriter.close() + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt new file mode 100644 index 00000000..f59402d5 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt @@ -0,0 +1,75 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.reporter + +import com.atlarge.opendc.experiments.sc20.experiment.Run +import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor +import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionListener +import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionResult +import me.tongfei.progressbar.ProgressBar +import me.tongfei.progressbar.ProgressBarBuilder + +/** + * A reporter that reports the experiment progress to the console. + */ +public class ConsoleExperimentReporter : ExperimentExecutionListener { + /** + * The active [Run]s. + */ + private val runs: MutableSet = mutableSetOf() + + /** + * The total number of runs. + */ + private var total = 0 + + /** + * The progress bar to keep track of the progress. + */ + private val pb: ProgressBar = ProgressBarBuilder() + .setTaskName("") + .setInitialMax(1) + .build() + + override fun descriptorRegistered(descriptor: ExperimentDescriptor) { + if (descriptor is Run) { + runs += descriptor + pb.maxHint((++total).toLong()) + } + } + + override fun executionFinished(descriptor: ExperimentDescriptor, result: ExperimentExecutionResult) { + if (descriptor is Run) { + runs -= descriptor + + pb.stepTo(total - runs.size.toLong()) + if (runs.isEmpty()) { + pb.close() + } + } + } + + override fun executionStarted(descriptor: ExperimentDescriptor) {} +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporter.kt deleted file mode 100644 index 049035cc..00000000 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporter.kt +++ /dev/null @@ -1,75 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.experiments.sc20.reporter - -import com.atlarge.opendc.compute.core.Server -import com.atlarge.opendc.compute.virt.driver.VirtDriver -import com.atlarge.opendc.compute.virt.service.VirtProvisioningEvent -import java.io.Closeable - -/** - * A reporter used by experiments to report metrics. - */ -interface ExperimentReporter : Closeable { - /** - * This method is invoked when the state of a VM changes. - */ - fun reportVmStateChange(time: Long, server: Server) {} - - /** - * This method is invoked when the state of a host changes. - */ - fun reportHostStateChange( - time: Long, - driver: VirtDriver, - server: Server - ) {} - - /** - * Report the power consumption of a host. - */ - fun reportPowerConsumption(host: Server, draw: Double) {} - - /** - * This method is invoked for a host for each slice that is finishes. - */ - fun reportHostSlice( - time: Long, - requestedBurst: Long, - grantedBurst: Long, - overcommissionedBurst: Long, - interferedBurst: Long, - cpuUsage: Double, - cpuDemand: Double, - numberOfDeployedImages: Int, - hostServer: Server, - duration: Long = 5 * 60 * 1000L - ) {} - - /** - * This method is invoked for a provisioner event. - */ - fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) {} -} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporterProvider.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporterProvider.kt deleted file mode 100644 index 8f42cdd4..00000000 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporterProvider.kt +++ /dev/null @@ -1,40 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.experiments.sc20.reporter - -import java.io.Closeable -import javax.sql.DataSource - -interface ExperimentReporterProvider : Closeable { - /** - * Initialize the provider with the specified data source. - */ - public fun init(ds: DataSource) {} - - /** - * Create a reporter for a single run. - */ - public fun createReporter(scenario: Long, run: Int): ExperimentReporter -} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/HostMetrics.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/HostMetrics.kt deleted file mode 100644 index 061f6cce..00000000 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/HostMetrics.kt +++ /dev/null @@ -1,44 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.experiments.sc20.reporter - -import com.atlarge.opendc.compute.core.Server - -/** - * A periodic report of the host machine metrics. - */ -data class HostMetrics( - val time: Long, - val duration: Long, - val host: Server, - val vmCount: Int, - val requestedBurst: Long, - val grantedBurst: Long, - val overcommissionedBurst: Long, - val interferedBurst: Long, - val cpuUsage: Double, - val cpuDemand: Double, - val powerDraw: Double -) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetExperimentReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetExperimentReporter.kt deleted file mode 100644 index 9426933e..00000000 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetExperimentReporter.kt +++ /dev/null @@ -1,132 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.experiments.sc20.reporter - -import com.atlarge.opendc.compute.core.Server -import com.atlarge.opendc.compute.core.ServerState -import com.atlarge.opendc.compute.virt.driver.VirtDriver -import com.atlarge.opendc.compute.virt.service.VirtProvisioningEvent -import mu.KotlinLogging - -private val logger = KotlinLogging.logger {} - -class ExperimentParquetReporter( - val scenario: Long, - val run: Int, - val hostWriter: ParquetHostMetricsWriter, - val provisionerWriter: ParquetProvisionerMetricsWriter -) : - ExperimentReporter { - private val lastServerStates = mutableMapOf>() - - override fun reportVmStateChange(time: Long, server: Server) {} - - override fun reportHostStateChange( - time: Long, - driver: VirtDriver, - server: Server - ) { - logger.debug("Host ${server.uid} changed state ${server.state} [$time]") - - val lastServerState = lastServerStates[server] - if (server.state == ServerState.SHUTOFF && lastServerState != null) { - val duration = time - lastServerState.second - reportHostSlice( - time, - 0, - 0, - 0, - 0, - 0.0, - 0.0, - 0, - server, - duration - ) - - lastServerStates.remove(server) - lastPowerConsumption.remove(server) - } else { - lastServerStates[server] = Pair(server.state, time) - } - } - - private val lastPowerConsumption = mutableMapOf() - - override fun reportPowerConsumption(host: Server, draw: Double) { - lastPowerConsumption[host] = draw - } - - override fun reportHostSlice( - time: Long, - requestedBurst: Long, - grantedBurst: Long, - overcommissionedBurst: Long, - interferedBurst: Long, - cpuUsage: Double, - cpuDemand: Double, - numberOfDeployedImages: Int, - hostServer: Server, - duration: Long - ) { - hostWriter.write( - scenario, run, HostMetrics( - time, - duration, - hostServer, - numberOfDeployedImages, - requestedBurst, - grantedBurst, - overcommissionedBurst, - interferedBurst, - cpuUsage, - cpuDemand, - lastPowerConsumption[hostServer] ?: 200.0 - ) - ) - } - - override fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) { - provisionerWriter.write( - scenario, - run, - ProvisionerMetrics( - time, - event.totalHostCount, - event.availableHostCount, - event.totalVmCount, - event.activeVmCount, - event.inactiveVmCount, - event.waitingVmCount, - event.failedVmCount - ) - ) - } - - override fun close() { - hostWriter.close() - provisionerWriter.close() - } -} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetHostMetricsWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetHostMetricsWriter.kt deleted file mode 100644 index 33839191..00000000 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetHostMetricsWriter.kt +++ /dev/null @@ -1,73 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.experiments.sc20.reporter - -import org.apache.avro.Schema -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData -import java.io.File - -private val schema: Schema = SchemaBuilder - .record("host_metrics") - .namespace("com.atlarge.opendc.experiments.sc20") - .fields() - .name("scenario_id").type().longType().noDefault() - .name("run_id").type().intType().noDefault() - .name("timestamp").type().longType().noDefault() - .name("duration").type().longType().noDefault() - .name("host_id").type().stringType().noDefault() - .name("state").type().stringType().noDefault() - .name("vm_count").type().intType().noDefault() - .name("requested_burst").type().longType().noDefault() - .name("granted_burst").type().longType().noDefault() - .name("overcommissioned_burst").type().longType().noDefault() - .name("interfered_burst").type().longType().noDefault() - .name("cpu_usage").type().doubleType().noDefault() - .name("cpu_demand").type().doubleType().noDefault() - .name("power_draw").type().doubleType().noDefault() - .endRecord() - -public class ParquetHostMetricsWriter(path: File, batchSize: Int) : - ParquetMetricsWriter(path, schema, batchSize) { - - override fun persist(action: Action.Write, row: GenericData.Record) { - row.put("scenario_id", action.scenario) - row.put("run_id", action.run) - row.put("host_id", action.metrics.host.name) - row.put("state", action.metrics.host.state.name) - row.put("timestamp", action.metrics.time) - row.put("duration", action.metrics.duration) - row.put("vm_count", action.metrics.vmCount) - row.put("requested_burst", action.metrics.requestedBurst) - row.put("granted_burst", action.metrics.grantedBurst) - row.put("overcommissioned_burst", action.metrics.overcommissionedBurst) - row.put("interfered_burst", action.metrics.interferedBurst) - row.put("cpu_usage", action.metrics.cpuUsage) - row.put("cpu_demand", action.metrics.cpuDemand) - row.put("power_draw", action.metrics.powerDraw) - } - - override fun toString(): String = "host-writer" -} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetMetricsWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetMetricsWriter.kt deleted file mode 100644 index e82e9e47..00000000 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetMetricsWriter.kt +++ /dev/null @@ -1,114 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.experiments.sc20.reporter - -import mu.KotlinLogging -import org.apache.avro.Schema -import org.apache.avro.generic.GenericData -import org.apache.hadoop.fs.Path -import org.apache.parquet.avro.AvroParquetWriter -import org.apache.parquet.hadoop.metadata.CompressionCodecName -import java.io.Closeable -import java.io.File -import java.util.concurrent.ArrayBlockingQueue -import java.util.concurrent.BlockingQueue -import kotlin.concurrent.thread - -private val logger = KotlinLogging.logger {} - -public abstract class ParquetMetricsWriter( - private val path: File, - private val schema: Schema, - private val bufferSize: Int = 4096 -) : Runnable, Closeable { - /** - * The queue of commands to process. - */ - private val queue: BlockingQueue = ArrayBlockingQueue(bufferSize) - private val writerThread = thread(start = true, name = "parquet-writer") { run() } - - /** - * Write the specified metrics to the database. - */ - public fun write(scenario: Long, run: Int, metrics: T) { - queue.put(Action.Write(scenario, run, metrics)) - } - - /** - * Signal the writer to stop. - */ - public override fun close() { - queue.put(Action.Stop) - writerThread.join() - } - - /** - * Persist the specified metrics to the given [row]. - */ - public abstract fun persist(action: Action.Write, row: GenericData.Record) - - /** - * Start the writer thread. - */ - override fun run() { - val writer = AvroParquetWriter.builder(Path(path.absolutePath)) - .withSchema(schema) - .withCompressionCodec(CompressionCodecName.SNAPPY) - .withPageSize(4 * 1024 * 1024) // For compression - .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) - .build() - - try { - loop@ while (true) { - val action = queue.take() - when (action) { - is Action.Stop -> break@loop - is Action.Write<*> -> { - val record = GenericData.Record(schema) - @Suppress("UNCHECKED_CAST") - persist(action as Action.Write, record) - writer.write(record) - } - } - } - } catch (e: Throwable) { - logger.error("Writer failed", e) - } finally { - writer.close() - } - } - - sealed class Action { - /** - * A poison pill that will stop the writer thread. - */ - object Stop : Action() - - /** - * Write the specified metrics to the database. - */ - data class Write(val scenario: Long, val run: Int, val metrics: T) : Action() - } -} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetProvisionerMetricsWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetProvisionerMetricsWriter.kt deleted file mode 100644 index 0c74b23e..00000000 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetProvisionerMetricsWriter.kt +++ /dev/null @@ -1,65 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.experiments.sc20.reporter - -import org.apache.avro.Schema -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData -import java.io.File - -private val schema: Schema = SchemaBuilder - .record("host_metrics") - .namespace("com.atlarge.opendc.experiments.sc20") - .fields() - .name("scenario_id").type().longType().noDefault() - .name("run_id").type().intType().noDefault() - .name("timestamp").type().longType().noDefault() - .name("host_total_count").type().intType().noDefault() - .name("host_available_count").type().intType().noDefault() - .name("vm_total_count").type().intType().noDefault() - .name("vm_active_count").type().intType().noDefault() - .name("vm_inactive_count").type().intType().noDefault() - .name("vm_waiting_count").type().intType().noDefault() - .name("vm_failed_count").type().intType().noDefault() - .endRecord() - -public class ParquetProvisionerMetricsWriter(path: File, batchSize: Int) : - ParquetMetricsWriter(path, schema, batchSize) { - - override fun persist(action: Action.Write, row: GenericData.Record) { - row.put("scenario_id", action.scenario) - row.put("run_id", action.run) - row.put("timestamp", action.metrics.time) - row.put("host_total_count", action.metrics.totalHostCount) - row.put("host_available_count", action.metrics.availableHostCount) - row.put("vm_total_count", action.metrics.totalVmCount) - row.put("vm_active_count", action.metrics.activeVmCount) - row.put("vm_inactive_count", action.metrics.inactiveVmCount) - row.put("vm_waiting_count", action.metrics.waitingVmCount) - row.put("vm_failed_count", action.metrics.failedVmCount) - } - - override fun toString(): String = "host-writer" -} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt deleted file mode 100644 index 595b9777..00000000 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt +++ /dev/null @@ -1,128 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.experiments.sc20.reporter - -import com.atlarge.opendc.compute.core.Server -import com.atlarge.opendc.compute.core.ServerState -import com.atlarge.opendc.compute.virt.driver.VirtDriver -import com.atlarge.opendc.compute.virt.service.VirtProvisioningEvent -import mu.KotlinLogging - -private val logger = KotlinLogging.logger {} - -class ExperimentPostgresReporter( - val scenario: Long, - val run: Int, - val hostWriter: PostgresHostMetricsWriter, - val provisionerWriter: PostgresProvisionerMetricsWriter -) : ExperimentReporter { - private val lastServerStates = mutableMapOf>() - - override fun reportVmStateChange(time: Long, server: Server) {} - - override fun reportHostStateChange( - time: Long, - driver: VirtDriver, - server: Server - ) { - val lastServerState = lastServerStates[server] - logger.debug("Host ${server.uid} changed state ${server.state} [$time]") - - if (server.state == ServerState.SHUTOFF && lastServerState != null) { - val duration = time - lastServerState.second - reportHostSlice( - time, - 0, - 0, - 0, - 0, - 0.0, - 0.0, - 0, - server, - duration - ) - - lastServerStates.remove(server) - lastPowerConsumption.remove(server) - } else { - lastServerStates[server] = Pair(server.state, time) - } - } - - private val lastPowerConsumption = mutableMapOf() - - override fun reportPowerConsumption(host: Server, draw: Double) { - lastPowerConsumption[host] = draw - } - - override fun reportHostSlice( - time: Long, - requestedBurst: Long, - grantedBurst: Long, - overcommissionedBurst: Long, - interferedBurst: Long, - cpuUsage: Double, - cpuDemand: Double, - numberOfDeployedImages: Int, - hostServer: Server, - duration: Long - ) { - hostWriter.write( - scenario, run, HostMetrics( - time, - duration, - hostServer, - numberOfDeployedImages, - requestedBurst, - grantedBurst, - overcommissionedBurst, - interferedBurst, - cpuUsage, - cpuDemand, - lastPowerConsumption[hostServer] ?: 200.0 - ) - ) - } - - override fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) { - provisionerWriter.write( - scenario, - run, - ProvisionerMetrics( - time, - event.totalHostCount, - event.availableHostCount, - event.totalVmCount, - event.activeVmCount, - event.inactiveVmCount, - event.waitingVmCount, - event.failedVmCount - ) - ) - } - - override fun close() {} -} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresHostMetricsWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresHostMetricsWriter.kt deleted file mode 100644 index 57e665ae..00000000 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresHostMetricsWriter.kt +++ /dev/null @@ -1,75 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.experiments.sc20.reporter - -import de.bytefish.pgbulkinsert.row.SimpleRow -import de.bytefish.pgbulkinsert.row.SimpleRowWriter -import javax.sql.DataSource - -private val table: SimpleRowWriter.Table = SimpleRowWriter.Table( - "host_metrics", - *arrayOf( - "scenario_id", - "run_id", - "host_id", - "state", - "timestamp", - "duration", - "vm_count", - "requested_burst", - "granted_burst", - "overcommissioned_burst", - "interfered_burst", - "cpu_usage", - "cpu_demand", - "power_draw" - ) -) - -/** - * A [PostgresMetricsWriter] for persisting [HostMetrics]. - */ -public class PostgresHostMetricsWriter(ds: DataSource, parallelism: Int, batchSize: Int) : - PostgresMetricsWriter(ds, table, parallelism, batchSize) { - - override fun persist(action: Action.Write, row: SimpleRow) { - row.setLong("scenario_id", action.scenario) - row.setInteger("run_id", action.run) - row.setText("host_id", action.metrics.host.name) - row.setText("state", action.metrics.host.state.name) - row.setLong("timestamp", action.metrics.time) - row.setLong("duration", action.metrics.duration) - row.setInteger("vm_count", action.metrics.vmCount) - row.setLong("requested_burst", action.metrics.requestedBurst) - row.setLong("granted_burst", action.metrics.grantedBurst) - row.setLong("overcommissioned_burst", action.metrics.overcommissionedBurst) - row.setLong("interfered_burst", action.metrics.interferedBurst) - row.setDouble("cpu_usage", action.metrics.cpuUsage) - row.setDouble("cpu_demand", action.metrics.cpuDemand) - row.setDouble("power_draw", action.metrics.powerDraw) - } - - override fun toString(): String = "host-writer" -} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt deleted file mode 100644 index bee01e51..00000000 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt +++ /dev/null @@ -1,124 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.experiments.sc20.reporter - -import de.bytefish.pgbulkinsert.row.SimpleRow -import de.bytefish.pgbulkinsert.row.SimpleRowWriter -import org.postgresql.PGConnection -import java.io.Closeable -import java.util.concurrent.ArrayBlockingQueue -import java.util.concurrent.BlockingQueue -import java.util.concurrent.Executors -import java.util.concurrent.TimeUnit -import javax.sql.DataSource - -/** - * The experiment writer is a separate thread that is responsible for writing the results to the - * database. - */ -public abstract class PostgresMetricsWriter( - private val ds: DataSource, - private val table: SimpleRowWriter.Table, - private val parallelism: Int = 8, - private val bufferSize: Int = 4096 -) : Runnable, Closeable { - /** - * The queue of commands to process. - */ - private val queue: BlockingQueue = ArrayBlockingQueue(parallelism * bufferSize) - - /** - * The executor service to use. - */ - private val executorService = Executors.newFixedThreadPool(parallelism) - - /** - * Write the specified metrics to the database. - */ - public fun write(scenario: Long, run: Int, metrics: T) { - queue.put(Action.Write(scenario, run, metrics)) - } - - /** - * Signal the writer to stop. - */ - public override fun close() { - repeat(parallelism) { - queue.put(Action.Stop) - } - executorService.shutdown() - executorService.awaitTermination(5, TimeUnit.MINUTES) - } - - /** - * Persist the specified metrics to the given [row]. - */ - public abstract fun persist(action: Action.Write, row: SimpleRow) - - init { - repeat(parallelism) { - executorService.submit { run() } - } - } - - /** - * Start the writer thread. - */ - override fun run() { - val conn = ds.connection - - val writer = SimpleRowWriter(table) - writer.open(conn.unwrap(PGConnection::class.java)) - - try { - - loop@ while (true) { - val action = queue.take() - when (action) { - is Action.Stop -> break@loop - is Action.Write<*> -> writer.startRow { - @Suppress("UNCHECKED_CAST") - persist(action as Action.Write, it) - } - } - } - } finally { - writer.close() - conn.close() - } - } - - sealed class Action { - /** - * A poison pill that will stop the writer thread. - */ - object Stop : Action() - - /** - * Write the specified metrics to the database. - */ - data class Write(val scenario: Long, val run: Int, val metrics: T) : Action() - } -} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresProvisionerMetricsWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresProvisionerMetricsWriter.kt deleted file mode 100644 index 17788112..00000000 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresProvisionerMetricsWriter.kt +++ /dev/null @@ -1,67 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.experiments.sc20.reporter - -import de.bytefish.pgbulkinsert.row.SimpleRow -import de.bytefish.pgbulkinsert.row.SimpleRowWriter -import javax.sql.DataSource - -private val table: SimpleRowWriter.Table = SimpleRowWriter.Table( - "provisioner_metrics", - *arrayOf( - "scenario_id", - "run_id", - "timestamp", - "host_total_count", - "host_available_count", - "vm_total_count", - "vm_active_count", - "vm_inactive_count", - "vm_waiting_count", - "vm_failed_count" - ) -) - -/** - * A [PostgresMetricsWriter] for persisting [ProvisionerMetrics]. - */ -public class PostgresProvisionerMetricsWriter(ds: DataSource, parallelism: Int, batchSize: Int) : - PostgresMetricsWriter(ds, table, parallelism, batchSize) { - - override fun persist(action: Action.Write, row: SimpleRow) { - row.setLong("scenario_id", action.scenario) - row.setInteger("run_id", action.run) - row.setLong("timestamp", action.metrics.time) - row.setInteger("host_total_count", action.metrics.totalHostCount) - row.setInteger("host_available_count", action.metrics.availableHostCount) - row.setInteger("vm_total_count", action.metrics.totalVmCount) - row.setInteger("vm_active_count", action.metrics.activeVmCount) - row.setInteger("vm_inactive_count", action.metrics.inactiveVmCount) - row.setInteger("vm_waiting_count", action.metrics.waitingVmCount) - row.setInteger("vm_failed_count", action.metrics.failedVmCount) - } - - override fun toString(): String = "provisioner-writer" -} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ProvisionerMetrics.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ProvisionerMetrics.kt deleted file mode 100644 index 966662cd..00000000 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ProvisionerMetrics.kt +++ /dev/null @@ -1,39 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.experiments.sc20.reporter - -/** - * A periodic report of the provisioner's metrics. - */ -data class ProvisionerMetrics( - val time: Long, - val totalHostCount: Int, - val availableHostCount: Int, - val totalVmCount: Int, - val activeVmCount: Int, - val inactiveVmCount: Int, - val waitingVmCount: Int, - val failedVmCount: Int -) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/VmMetrics.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/VmMetrics.kt deleted file mode 100644 index 5f963206..00000000 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/VmMetrics.kt +++ /dev/null @@ -1,43 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.experiments.sc20.reporter - -import com.atlarge.opendc.compute.core.Server - -/** - * A periodic report of a virtual machine's metrics. - */ -data class VmMetrics( - val time: Long, - val duration: Long, - val vm: Server, - val host: Server, - val requestedBurst: Long, - val grantedBurst: Long, - val overcommissionedBurst: Long, - val interferedBurst: Long, - val cpuUsage: Double, - val cpuDemand: Double -) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ContainerExperimentDescriptor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ContainerExperimentDescriptor.kt new file mode 100644 index 00000000..dac32586 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ContainerExperimentDescriptor.kt @@ -0,0 +1,68 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.runner + +import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionContext +import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionResult +import kotlinx.coroutines.launch +import kotlinx.coroutines.supervisorScope + +/** + * An abstract [ExperimentDescriptor] specifically for containers. + */ +public abstract class ContainerExperimentDescriptor : ExperimentDescriptor() { + /** + * The child descriptors of this container. + */ + public abstract val children: Sequence + + override val type: Type = Type.CONTAINER + + override suspend fun invoke(context: ExperimentExecutionContext) { + val materializedChildren = children.toList() + for (child in materializedChildren) { + context.listener.descriptorRegistered(child) + } + + supervisorScope { + for (child in materializedChildren) { + if (child.isTrial) { + launch { + val worker = context.scheduler.allocate() + context.listener.executionStarted(child) + try { + worker(child, context) + context.listener.executionFinished(child, ExperimentExecutionResult.Success) + } catch (e: Throwable) { + context.listener.executionFinished(child, ExperimentExecutionResult.Failed(e)) + } + } + } else { + launch { child(context) } + } + } + } + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ExperimentDescriptor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ExperimentDescriptor.kt new file mode 100644 index 00000000..64b6b767 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ExperimentDescriptor.kt @@ -0,0 +1,81 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.runner + +import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionContext +import java.io.Serializable + +/** + * An immutable description of an experiment in the **odcsim* simulation framework, which may be a single atomic trial + * or a composition of multiple trials. + * + * This class represents a dynamic tree-like structure where the children of the nodes are not known at instantiation + * since they might be generated dynamically. + */ +public abstract class ExperimentDescriptor : Serializable { + /** + * The parent of this descriptor, or `null` if it has no parent. + */ + public abstract val parent: ExperimentDescriptor? + + /** + * The type of descriptor. + */ + abstract val type: Type + + /** + * A flag to indicate that this descriptor is a root descriptor. + */ + public open val isRoot: Boolean + get() = parent == null + + /** + * A flag to indicate that this descriptor describes an experiment trial. + */ + val isTrial: Boolean + get() = type == Type.TRIAL + + /** + * Execute this [ExperimentDescriptor]. + * + * @param context The context to execute the descriptor in. + */ + public abstract suspend operator fun invoke(context: ExperimentExecutionContext) + + /** + * The types of experiment descriptors. + */ + enum class Type { + /** + * A composition of multiple experiment descriptions whose invocation happens on a single thread. + */ + CONTAINER, + + /** + * An invocation of a single scenario of an experiment whose invocation may happen on different threads. + */ + TRIAL + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ExperimentRunner.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ExperimentRunner.kt new file mode 100644 index 00000000..77f970fe --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ExperimentRunner.kt @@ -0,0 +1,51 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.runner + +import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionListener + +/** + * An [ExperimentRunner] facilitates discovery and execution of experiments. + */ +public interface ExperimentRunner { + /** + * The unique identifier of this runner. + */ + val id: String + + /** + * The version of this runner. + */ + val version: String? + get() = null + + /** + * Execute the specified experiment represented as [ExperimentDescriptor]. + * + * @param root The experiment to execute. + * @param listener The listener to report events to. + */ + public fun execute(root: ExperimentDescriptor, listener: ExperimentExecutionListener) +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/TrialExperimentDescriptor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/TrialExperimentDescriptor.kt new file mode 100644 index 00000000..cf05416a --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/TrialExperimentDescriptor.kt @@ -0,0 +1,32 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.runner + +/** + * An abstract [ExperimentDescriptor] specifically for trials. + */ +public abstract class TrialExperimentDescriptor : ExperimentDescriptor() { + override val type: Type = Type.TRIAL +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionContext.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionContext.kt new file mode 100644 index 00000000..9a04c491 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionContext.kt @@ -0,0 +1,45 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.runner.execution + +/** + * The execution context of an experiment. + */ +public interface ExperimentExecutionContext { + /** + * The execution listener to use. + */ + public val listener: ExperimentExecutionListener + + /** + * The experiment scheduler to use. + */ + public val scheduler: ExperimentScheduler + + /** + * A cache for objects within a single runner. + */ + public val cache: MutableMap +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionListener.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionListener.kt new file mode 100644 index 00000000..f6df0524 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionListener.kt @@ -0,0 +1,48 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.runner.execution + +import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor + +/** + * Listener to be notified of experiment execution events by experiment runners. + */ +interface ExperimentExecutionListener { + /** + * A method that is invoked when a new [ExperimentDescriptor] is registered. + */ + fun descriptorRegistered(descriptor: ExperimentDescriptor) + + /** + * A method that is invoked when when the execution of a leaf or subtree of the experiment tree has finished, + * regardless of the outcome. + */ + fun executionFinished(descriptor: ExperimentDescriptor, result: ExperimentExecutionResult) + + /** + * A method that is invoked when the execution of a leaf or subtree of the experiment tree is about to be started. + */ + fun executionStarted(descriptor: ExperimentDescriptor) +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionResult.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionResult.kt new file mode 100644 index 00000000..057e1f92 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionResult.kt @@ -0,0 +1,42 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.runner.execution + +import java.io.Serializable + +/** + * The result of executing an experiment. + */ +public sealed class ExperimentExecutionResult : Serializable { + /** + * The experiment executed successfully + */ + public object Success : ExperimentExecutionResult() + + /** + * The experiment failed during execution. + */ + public data class Failed(val throwable: Throwable) : ExperimentExecutionResult() +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentScheduler.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentScheduler.kt new file mode 100644 index 00000000..0346a7f8 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentScheduler.kt @@ -0,0 +1,59 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.runner.execution + +import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor +import java.io.Closeable + +/** + * A interface for scheduling the execution of experiment trials over compute resources (threads/containers/vms) + */ +interface ExperimentScheduler : Closeable { + /** + * Allocate a [Worker] for executing an experiment trial. This method may suspend in case no resources are directly + * available at the moment. + * + * @return The available worker. + */ + suspend fun allocate(): ExperimentScheduler.Worker + + /** + * An isolated worker of an [ExperimentScheduler] that is responsible for executing a single experiment trial. + */ + interface Worker { + /** + * Dispatch the specified [ExperimentDescriptor] to execute some time in the future and return the results of + * the trial. + * + * @param descriptor The descriptor to execute. + * @param context The context to execute the descriptor in. + * @return The results of the experiment trial. + */ + suspend operator fun invoke( + descriptor: ExperimentDescriptor, + context: ExperimentExecutionContext + ): ExperimentExecutionResult + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt new file mode 100644 index 00000000..31632b8c --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt @@ -0,0 +1,85 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.runner.execution + +import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor +import kotlinx.coroutines.asCoroutineDispatcher +import kotlinx.coroutines.launch +import kotlinx.coroutines.supervisorScope +import kotlinx.coroutines.sync.Semaphore +import kotlinx.coroutines.withContext +import java.util.concurrent.Executors + +/** + * An [ExperimentScheduler] that runs experiments using a local thread pool. + * + * @param parallelism The maximum amount of parallel workers (default is the number of available processors). + */ +class ThreadPoolExperimentScheduler(parallelism: Int = Runtime.getRuntime().availableProcessors() + 1) : ExperimentScheduler { + private val dispatcher = Executors.newCachedThreadPool().asCoroutineDispatcher() + private val tickets = Semaphore(parallelism) + + override suspend fun allocate(): ExperimentScheduler.Worker { + tickets.acquire() + return object : ExperimentScheduler.Worker { + override suspend fun invoke( + descriptor: ExperimentDescriptor, + context: ExperimentExecutionContext + ): ExperimentExecutionResult = supervisorScope { + val listener = + object : ExperimentExecutionListener { + override fun descriptorRegistered(descriptor: ExperimentDescriptor) { + launch { context.listener.descriptorRegistered(descriptor) } + } + + override fun executionFinished(descriptor: ExperimentDescriptor, result: ExperimentExecutionResult) { + launch { context.listener.executionFinished(descriptor, result) } + } + + override fun executionStarted(descriptor: ExperimentDescriptor) { + launch { context.listener.executionStarted(descriptor) } + } + } + + val newContext = object : ExperimentExecutionContext by context { + override val listener: ExperimentExecutionListener = listener + } + + try { + withContext(dispatcher) { + descriptor(newContext) + ExperimentExecutionResult.Success + } + } catch (e: Throwable) { + ExperimentExecutionResult.Failed(e) + } finally { + tickets.release() + } + } + } + } + + override fun close() = dispatcher.close() +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt new file mode 100644 index 00000000..3b80276f --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt @@ -0,0 +1,62 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.runner.internal + +import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor +import com.atlarge.opendc.experiments.sc20.runner.ExperimentRunner +import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionContext +import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionListener +import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionResult +import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentScheduler +import kotlinx.coroutines.runBlocking +import java.util.concurrent.ConcurrentHashMap + +/** + * The default implementation of the [ExperimentRunner] interface. + * + * @param scheduler The scheduler to use. + */ +public class DefaultExperimentRunner(val scheduler: ExperimentScheduler) : ExperimentRunner { + override val id: String = "default" + + override val version: String? = "1.0" + + override fun execute(root: ExperimentDescriptor, listener: ExperimentExecutionListener) = runBlocking { + val context = object : ExperimentExecutionContext { + override val listener: ExperimentExecutionListener = listener + override val scheduler: ExperimentScheduler = this@DefaultExperimentRunner.scheduler + override val cache: MutableMap = ConcurrentHashMap() + } + + listener.descriptorRegistered(root) + context.listener.executionStarted(root) + try { + root(context) + context.listener.executionFinished(root, ExperimentExecutionResult.Success) + } catch (e: Throwable) { + context.listener.executionFinished(root, ExperimentExecutionResult.Failed(e)) + } + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/Event.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/Event.kt new file mode 100644 index 00000000..c1e14e2a --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/Event.kt @@ -0,0 +1,35 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.telemetry + +/** + * An event that occurs within the system. + */ +public abstract class Event(val name: String) { + /** + * The time of occurrence of this event. + */ + public abstract val timestamp: Long +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/HostEvent.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/HostEvent.kt new file mode 100644 index 00000000..8e91bca2 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/HostEvent.kt @@ -0,0 +1,44 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.telemetry + +import com.atlarge.opendc.compute.core.Server + +/** + * A periodic report of the host machine metrics. + */ +data class HostEvent( + override val timestamp: Long, + val duration: Long, + val host: Server, + val vmCount: Int, + val requestedBurst: Long, + val grantedBurst: Long, + val overcommissionedBurst: Long, + val interferedBurst: Long, + val cpuUsage: Double, + val cpuDemand: Double, + val powerDraw: Double +) : Event("host-metrics") diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/ProvisionerEvent.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/ProvisionerEvent.kt new file mode 100644 index 00000000..df619632 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/ProvisionerEvent.kt @@ -0,0 +1,39 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.telemetry + +/** + * A periodic report of the provisioner's metrics. + */ +data class ProvisionerEvent( + override val timestamp: Long, + val totalHostCount: Int, + val availableHostCount: Int, + val totalVmCount: Int, + val activeVmCount: Int, + val inactiveVmCount: Int, + val waitingVmCount: Int, + val failedVmCount: Int +) : Event("provisioner-metrics") diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/RunEvent.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/RunEvent.kt new file mode 100644 index 00000000..497d2c3f --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/RunEvent.kt @@ -0,0 +1,35 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.telemetry + +import com.atlarge.opendc.experiments.sc20.experiment.Run + +/** + * A periodic report of the host machine metrics. + */ +data class RunEvent( + val run: Run, + override val timestamp: Long +) : Event("run") diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/VmEvent.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/VmEvent.kt new file mode 100644 index 00000000..7289fb21 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/VmEvent.kt @@ -0,0 +1,43 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.telemetry + +import com.atlarge.opendc.compute.core.Server + +/** + * A periodic report of a virtual machine's metrics. + */ +data class VmEvent( + override val timestamp: Long, + val duration: Long, + val vm: Server, + val host: Server, + val requestedBurst: Long, + val grantedBurst: Long, + val overcommissionedBurst: Long, + val interferedBurst: Long, + val cpuUsage: Double, + val cpuDemand: Double +) : Event("vm-metrics") diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt new file mode 100644 index 00000000..a69bd4b2 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt @@ -0,0 +1,121 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.telemetry.parquet + +import com.atlarge.opendc.experiments.sc20.telemetry.Event +import mu.KotlinLogging +import org.apache.avro.Schema +import org.apache.avro.generic.GenericData +import org.apache.hadoop.fs.Path +import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.metadata.CompressionCodecName +import java.io.Closeable +import java.io.File +import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.BlockingQueue +import kotlin.concurrent.thread + +/** + * The logging instance to use. + */ +private val logger = KotlinLogging.logger {} + +/** + * A writer that writes events in Parquet format. + */ +public open class ParquetEventWriter( + private val path: File, + private val schema: Schema, + private val converter: (T, GenericData.Record) -> Unit, + private val bufferSize: Int = 4096 +) : Runnable, Closeable { + /** + * The queue of commands to process. + */ + private val queue: BlockingQueue = ArrayBlockingQueue(bufferSize) + + /** + * The thread that is responsible for writing the Parquet records. + */ + private val writerThread = thread(start = true, name = "parquet-writer") { run() } + + /** + * Write the specified metrics to the database. + */ + public fun write(event: T) { + queue.put(Action.Write(event)) + } + + /** + * Signal the writer to stop. + */ + public override fun close() { + queue.put(Action.Stop) + writerThread.join() + } + + /** + * Start the writer thread. + */ + override fun run() { + val writer = AvroParquetWriter.builder(Path(path.absolutePath)) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withPageSize(4 * 1024 * 1024) // For compression + .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) + .build() + + try { + loop@ while (true) { + val action = queue.take() + when (action) { + is Action.Stop -> break@loop + is Action.Write<*> -> { + val record = GenericData.Record(schema) + @Suppress("UNCHECKED_CAST") + converter(action.event as T, record) + writer.write(record) + } + } + } + } catch (e: Throwable) { + logger.error("Writer failed", e) + } finally { + writer.close() + } + } + + sealed class Action { + /** + * A poison pill that will stop the writer thread. + */ + object Stop : Action() + + /** + * Write the specified metrics to the database. + */ + data class Write(val event: T) : Action() + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt new file mode 100644 index 00000000..7e5ad911 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt @@ -0,0 +1,81 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.telemetry.parquet + +import com.atlarge.opendc.experiments.sc20.telemetry.HostEvent +import org.apache.avro.Schema +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericData +import java.io.File + +/** + * A Parquet event writer for [HostEvent]s. + */ +public class ParquetHostEventWriter(path: File, bufferSize: Int) : + ParquetEventWriter(path, schema, convert, bufferSize) { + + override fun toString(): String = "host-writer" + + companion object { + val convert: (HostEvent, GenericData.Record) -> Unit = { event, record -> + // record.put("portfolio_id", event.run.parent.parent.id) + // record.put("scenario_id", event.run.parent.id) + // record.put("run_id", event.run.id) + record.put("host_id", event.host.name) + record.put("state", event.host.state.name) + record.put("timestamp", event.timestamp) + record.put("duration", event.duration) + record.put("vm_count", event.vmCount) + record.put("requested_burst", event.requestedBurst) + record.put("granted_burst", event.grantedBurst) + record.put("overcommissioned_burst", event.overcommissionedBurst) + record.put("interfered_burst", event.interferedBurst) + record.put("cpu_usage", event.cpuUsage) + record.put("cpu_demand", event.cpuDemand) + record.put("power_draw", event.powerDraw) + } + + val schema: Schema = SchemaBuilder + .record("host_metrics") + .namespace("com.atlarge.opendc.experiments.sc20") + .fields() + // .name("portfolio_id").type().intType().noDefault() + // .name("scenario_id").type().intType().noDefault() + // .name("run_id").type().intType().noDefault() + .name("timestamp").type().longType().noDefault() + .name("duration").type().longType().noDefault() + .name("host_id").type().stringType().noDefault() + .name("state").type().stringType().noDefault() + .name("vm_count").type().intType().noDefault() + .name("requested_burst").type().longType().noDefault() + .name("granted_burst").type().longType().noDefault() + .name("overcommissioned_burst").type().longType().noDefault() + .name("interfered_burst").type().longType().noDefault() + .name("cpu_usage").type().doubleType().noDefault() + .name("cpu_demand").type().doubleType().noDefault() + .name("power_draw").type().doubleType().noDefault() + .endRecord() + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt new file mode 100644 index 00000000..1f3b0472 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt @@ -0,0 +1,67 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.telemetry.parquet + +import com.atlarge.opendc.experiments.sc20.telemetry.ProvisionerEvent +import org.apache.avro.Schema +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericData +import java.io.File + +/** + * A Parquet event writer for [ProvisionerEvent]s. + */ +public class ParquetProvisionerEventWriter(path: File, bufferSize: Int) : + ParquetEventWriter(path, schema, convert, bufferSize) { + + override fun toString(): String = "provisioner-writer" + + companion object { + val convert: (ProvisionerEvent, GenericData.Record) -> Unit = { event, record -> + record.put("timestamp", event.timestamp) + record.put("host_total_count", event.totalHostCount) + record.put("host_available_count", event.availableHostCount) + record.put("vm_total_count", event.totalVmCount) + record.put("vm_active_count", event.activeVmCount) + record.put("vm_inactive_count", event.inactiveVmCount) + record.put("vm_waiting_count", event.waitingVmCount) + record.put("vm_failed_count", event.failedVmCount) + } + + val schema: Schema = SchemaBuilder + .record("provisioner_metrics") + .namespace("com.atlarge.opendc.experiments.sc20") + .fields() + .name("timestamp").type().longType().noDefault() + .name("host_total_count").type().intType().noDefault() + .name("host_available_count").type().intType().noDefault() + .name("vm_total_count").type().intType().noDefault() + .name("vm_active_count").type().intType().noDefault() + .name("vm_inactive_count").type().intType().noDefault() + .name("vm_waiting_count").type().intType().noDefault() + .name("vm_failed_count").type().intType().noDefault() + .endRecord() + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt new file mode 100644 index 00000000..1549b8d2 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt @@ -0,0 +1,78 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.telemetry.parquet + +import com.atlarge.opendc.experiments.sc20.telemetry.RunEvent +import org.apache.avro.Schema +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericData +import java.io.File + +/** + * A Parquet event writer for [RunEvent]s. + */ +public class ParquetRunEventWriter(path: File, bufferSize: Int) : + ParquetEventWriter(path, schema, convert, bufferSize) { + + override fun toString(): String = "run-writer" + + companion object { + val convert: (RunEvent, GenericData.Record) -> Unit = { event, record -> + val run = event.run + val scenario = run.parent + val portfolio = scenario.parent + record.put("portfolio_id", portfolio.id) + record.put("portfolio_name", portfolio.name) + record.put("scenario_id", scenario.id) + record.put("run_id", run.id) + record.put("repetitions", scenario.repetitions) + record.put("topology", scenario.topology.name) + record.put("workload_name", scenario.workload.name) + record.put("workload_fraction", scenario.workload.fraction) + record.put("allocation_policy", scenario.allocationPolicy) + record.put("failure_frequency", scenario.operationalPhenomena.failureFrequency) + record.put("interference", scenario.operationalPhenomena.hasInterference) + record.put("seed", run.seed) + } + + val schema: Schema = SchemaBuilder + .record("runs") + .namespace("com.atlarge.opendc.experiments.sc20") + .fields() + .name("portfolio_id").type().intType().noDefault() + .name("portfolio_name").type().stringType().noDefault() + .name("scenario_id").type().intType().noDefault() + .name("run_id").type().intType().noDefault() + .name("repetitions").type().intType().noDefault() + .name("topology").type().stringType().noDefault() + .name("workload_name").type().stringType().noDefault() + .name("workload_fraction").type().doubleType().noDefault() + .name("allocation_policy").type().stringType().noDefault() + .name("failure_frequency").type().doubleType().noDefault() + .name("interference").type().booleanType().noDefault() + .name("seed").type().intType().noDefault() + .endRecord() + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt index 28026fde..96b6b426 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt @@ -28,8 +28,7 @@ import com.atlarge.opendc.compute.core.image.VmImage import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel import com.atlarge.opendc.compute.core.workload.VmWorkload -import com.atlarge.opendc.experiments.sc20.Run -import com.atlarge.opendc.experiments.sc20.sampleWorkload +import com.atlarge.opendc.experiments.sc20.experiment.Run import com.atlarge.opendc.format.trace.TraceEntry import com.atlarge.opendc.format.trace.TraceReader import kotlin.random.Random diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt new file mode 100644 index 00000000..e03c59bc --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt @@ -0,0 +1,69 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.trace + +import com.atlarge.opendc.compute.core.workload.VmWorkload +import com.atlarge.opendc.experiments.sc20.experiment.Run +import com.atlarge.opendc.format.trace.TraceEntry +import mu.KotlinLogging +import kotlin.random.Random + +private val logger = KotlinLogging.logger {} + +/** + * Sample the workload for the specified [run]. + */ +fun sampleWorkload(trace: List>, run: Run): List> { + return sampleRegularWorkload(trace, run) +} + +/** + * Sample a regular (non-HPC) workload. + */ +fun sampleRegularWorkload(trace: List>, run: Run): List> { + val fraction = run.parent.workload.fraction + if (fraction >= 1) { + return trace + } + + val shuffled = trace.shuffled(Random(run.seed)) + val res = mutableListOf>() + val totalLoad = shuffled.sumByDouble { it.workload.image.tags.getValue("total-load") as Double } + var currentLoad = 0.0 + + for (entry in shuffled) { + val entryLoad = entry.workload.image.tags.getValue("total-load") as Double + if ((currentLoad + entryLoad) / totalLoad > fraction) { + break + } + + currentLoad += entryLoad + res += entry + } + + logger.info { "Sampled ${trace.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } + + return res +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/util/DatabaseHelper.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/util/DatabaseHelper.kt deleted file mode 100644 index 4fcfdb6b..00000000 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/util/DatabaseHelper.kt +++ /dev/null @@ -1,160 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.experiments.sc20.util - -import com.atlarge.opendc.experiments.sc20.Portfolio -import com.atlarge.opendc.experiments.sc20.Run -import com.atlarge.opendc.experiments.sc20.Scenario -import java.io.Closeable -import java.sql.Connection -import java.sql.Statement - -/** - * A helper class for writing to the database. - */ -class DatabaseHelper(val conn: Connection) : Closeable { - /** - * Prepared statement to create experiment. - */ - private val createExperiment = conn.prepareStatement("INSERT INTO experiments DEFAULT VALUES", arrayOf("id")) - - /** - * Prepared statement for creating a portfolio. - */ - private val createPortfolio = conn.prepareStatement("INSERT INTO portfolios (experiment_id, name) VALUES (?, ?)", arrayOf("id")) - - /** - * Prepared statement for creating a scenario - */ - private val createScenario = conn.prepareStatement("INSERT INTO scenarios (portfolio_id, repetitions, topology, workload_name, workload_fraction, allocation_policy, failure_frequency, interference) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", arrayOf("id")) - - /** - * Prepared statement for creating a run. - */ - private val createRun = conn.prepareStatement("INSERT INTO runs (id, scenario_id, seed) VALUES (?, ?, ?)", arrayOf("id")) - - /** - * Prepared statement for starting a run. - */ - private val startRun = conn.prepareStatement("UPDATE runs SET state = 'active'::run_state,start_time = now() WHERE id = ? AND scenario_id = ?") - - /** - * Prepared statement for finishing a run. - */ - private val finishRun = conn.prepareStatement("UPDATE runs SET state = ?::run_state,end_time = now() WHERE id = ? AND scenario_id = ?") - - /** - * Create a new experiment and return its id. - */ - fun createExperiment(): Long { - val affectedRows = createExperiment.executeUpdate() - check(affectedRows != 0) - return createExperiment.latestId - } - - /** - * Persist a [Portfolio] and return its id. - */ - fun persist(portfolio: Portfolio, experimentId: Long): Long { - createPortfolio.setLong(1, experimentId) - createPortfolio.setString(2, portfolio.name) - - val affectedRows = createPortfolio.executeUpdate() - check(affectedRows != 0) - return createPortfolio.latestId - } - - /** - * Persist a [Scenario] and return its id. - */ - fun persist(scenario: Scenario, portfolioId: Long): Long { - createScenario.setLong(1, portfolioId) - createScenario.setInt(2, scenario.repetitions) - createScenario.setString(3, scenario.topology.name) - createScenario.setString(4, scenario.workload.name) - createScenario.setDouble(5, scenario.workload.fraction) - createScenario.setString(6, scenario.allocationPolicy) - createScenario.setDouble(7, scenario.failureFrequency) - createScenario.setBoolean(8, scenario.hasInterference) - - val affectedRows = createScenario.executeUpdate() - check(affectedRows != 0) - return createScenario.latestId - } - - /** - * Persist a [Run] and return its id. - */ - fun persist(run: Run, scenarioId: Long): Int { - createRun.setInt(1, run.id) - createRun.setLong(2, scenarioId) - createRun.setInt(3, run.seed) - - val affectedRows = createRun.executeUpdate() - check(affectedRows != 0) - return createRun.latestId.toInt() - } - - /** - * Start run. - */ - fun startRun(scenario: Long, run: Int) { - startRun.setInt(1, run) - startRun.setLong(2, scenario) - - val affectedRows = startRun.executeUpdate() - check(affectedRows != 0) - } - - /** - * Finish a run. - */ - fun finishRun(scenario: Long, run: Int, hasFailed: Boolean) { - finishRun.setString(1, if (hasFailed) "fail" else "ok") - finishRun.setInt(2, run) - finishRun.setLong(3, scenario) - - val affectedRows = finishRun.executeUpdate() - check(affectedRows != 0) - } - - /** - * Obtain the latest identifier of the specified statement. - */ - private val Statement.latestId: Long - get() { - val rs = generatedKeys - return try { - check(rs.next()) - rs.getLong(1) - } finally { - rs.close() - } - } - - override fun close() { - conn.close() - } -} diff --git a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt index fd617115..ae3d6db1 100644 --- a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt +++ b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt @@ -31,7 +31,12 @@ import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.workload.VmWorkload import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy -import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporter +import com.atlarge.opendc.experiments.sc20.experiment.attachMonitor +import com.atlarge.opendc.experiments.sc20.experiment.createFailureDomain +import com.atlarge.opendc.experiments.sc20.experiment.createProvisioner +import com.atlarge.opendc.experiments.sc20.experiment.createTraceReader +import com.atlarge.opendc.experiments.sc20.experiment.monitor.ExperimentMonitor +import com.atlarge.opendc.experiments.sc20.experiment.processTrace import com.atlarge.opendc.format.environment.EnvironmentReader import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader import com.atlarge.opendc.format.trace.TraceReader @@ -96,19 +101,33 @@ class Sc20IntegrationTest { lateinit var scheduler: SimpleVirtProvisioningService root.launch { - val res = createProvisioner(root, environmentReader, allocationPolicy) + val res = createProvisioner( + root, + environmentReader, + allocationPolicy + ) val bareMetalProvisioner = res.first scheduler = res.second val failureDomain = if (failures) { println("ENABLING failures") - createFailureDomain(seed, 24.0 * 7, bareMetalProvisioner, chan) + createFailureDomain( + seed, + 24.0 * 7, + bareMetalProvisioner, + chan + ) } else { null } attachMonitor(scheduler, monitor) - processTrace(traceReader, scheduler, chan, monitor) + processTrace( + traceReader, + scheduler, + chan, + monitor + ) println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") @@ -141,7 +160,12 @@ class Sc20IntegrationTest { val performanceInterferenceStream = object {}.javaClass.getResourceAsStream("/env/performance-interference.json") val performanceInterferenceModel = Sc20PerformanceInterferenceReader(performanceInterferenceStream) .construct() - return createTraceReader(File("src/test/resources/trace"), performanceInterferenceModel, emptyList(), 0) + return createTraceReader( + File("src/test/resources/trace"), + performanceInterferenceModel, + emptyList(), + 0 + ) } /** @@ -152,7 +176,7 @@ class Sc20IntegrationTest { return Sc20ClusterEnvironmentReader(stream) } - class TestExperimentReporter : ExperimentReporter { + class TestExperimentReporter : ExperimentMonitor { var totalRequestedBurst = 0L var totalGrantedBurst = 0L var totalOvercommissionedBurst = 0L -- cgit v1.2.3