diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-05-12 20:35:46 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-05-12 21:02:49 +0200 |
| commit | 24fd4828d6798c19476543fa16df87d45811b54e (patch) | |
| tree | f982d0e65e8088bbf5251ec2a9b8a2ed9b26e8d2 /opendc/opendc-experiments-sc20/src | |
| parent | c26d278865ff8e6be35f6899337fe129889f887a (diff) | |
refactor: Restructure experiment setup
Diffstat (limited to 'opendc/opendc-experiments-sc20/src')
8 files changed, 139 insertions, 45 deletions
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt index e37dea8b..e8222eb0 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt @@ -39,6 +39,8 @@ import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy import com.atlarge.opendc.core.failure.CorrelatedFaultInjector import com.atlarge.opendc.core.failure.FailureDomain import com.atlarge.opendc.core.failure.FaultInjector +import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporter +import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader import com.atlarge.opendc.format.environment.EnvironmentReader import com.atlarge.opendc.format.trace.TraceReader import kotlinx.coroutines.ExperimentalCoroutinesApi @@ -105,7 +107,12 @@ fun createFaultInjector(domain: Domain, random: Random, failureInterval: Int): F * Create the trace reader from which the VM workloads are read. */ fun createTraceReader(path: File, performanceInterferenceModel: PerformanceInterferenceModel, vms: List<String>, seed: Int): Sc20ParquetTraceReader { - return Sc20ParquetTraceReader(path, performanceInterferenceModel, vms, Random(seed)) + return Sc20ParquetTraceReader( + path, + performanceInterferenceModel, + vms, + Random(seed) + ) } /** @@ -134,7 +141,7 @@ suspend fun createProvisioner( * Attach the specified monitor to the VM provisioner. */ @OptIn(ExperimentalCoroutinesApi::class) -suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: Sc20Reporter) { +suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: ExperimentReporter) { val domain = simulationContext.domain val hypervisors = scheduler.drivers() @@ -178,7 +185,7 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: Sc /** * Process the trace. */ -suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, reporter: Sc20Reporter, vmPlacements: Map<String, String> = emptyMap()) { +suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, reporter: ExperimentReporter, vmPlacements: Map<String, String> = emptyMap()) { val domain = simulationContext.domain try { diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt index 51448c9e..b2fbba39 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2019 atlarge-research + * Copyright (c) 2020 atlarge-research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -25,13 +25,20 @@ package com.atlarge.opendc.experiments.sc20 import com.atlarge.odcsim.SimulationEngineProvider +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel +import com.atlarge.opendc.compute.core.workload.VmWorkload import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy import com.atlarge.opendc.compute.virt.service.allocation.NumberOfActiveServersAllocationPolicy import com.atlarge.opendc.compute.virt.service.allocation.ProvisionedCoresAllocationPolicy import com.atlarge.opendc.compute.virt.service.allocation.RandomAllocationPolicy import com.atlarge.opendc.compute.virt.service.allocation.ReplayAllocationPolicy +import com.atlarge.opendc.experiments.sc20.reporter.ExperimentParquetReporter +import com.atlarge.opendc.experiments.sc20.reporter.ExperimentPostgresReporter +import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporter +import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader +import com.atlarge.opendc.format.trace.TraceReader import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader import com.atlarge.opendc.format.trace.sc20.Sc20VmPlacementReader import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper @@ -72,14 +79,14 @@ private val logger = KotlinLogging.logger {} /** * Represents the command for running the experiment. */ -class ExperimentCommand : CliktCommand(name = "sc20-experiment") { +class ExperimentCli : CliktCommand(name = "sc20-experiment") { private val environment by option("--environment-file", help = "path to the environment file") .file() .required() private val performanceInterferenceStream by option("--performance-interference-file", help = "path to the performance interference file") .file() .convert { it.inputStream() as InputStream } - .defaultLazy { ExperimentCommand::class.java.getResourceAsStream("/env/performance-interference.json") } + .defaultLazy { ExperimentCli::class.java.getResourceAsStream("/env/performance-interference.json") } private val vmPlacements by option("--vm-placements-file", help = "path to the VM placement file") .file() @@ -111,13 +118,13 @@ class ExperimentCommand : CliktCommand(name = "sc20-experiment") { ) .default("core-mem") - private val trace by option("--trace-directory", help = "path to the trace directory") - .file(canBeFile = false) - .required() + private val trace by option().groupChoice( + "sc20-parquet" to Trace.Sc20Parquet() + ).required() private val reporter by option().groupChoice( - "parquet" to Parquet(), - "postgres" to Postgres() + "parquet" to Reporter.Parquet(), + "postgres" to Reporter.Postgres() ).required() private fun parseVMs(string: String): List<String> { @@ -135,7 +142,7 @@ class ExperimentCommand : CliktCommand(name = "sc20-experiment") { logger.info("allocation-policy: $allocationPolicy") val start = System.currentTimeMillis() - val reporter: Sc20Reporter = reporter.createReporter() + val reporter: ExperimentReporter = reporter.createReporter() val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() val system = provider("test") @@ -164,7 +171,7 @@ class ExperimentCommand : CliktCommand(name = "sc20-experiment") { } val environmentReader = Sc20ClusterEnvironmentReader(environment) val traceReader = try { - createTraceReader(trace, performanceInterferenceModel, selectedVms, seed) + trace.createTraceReader(performanceInterferenceModel, selectedVms, seed) } catch (e: Throwable) { reporter.close() throw e @@ -204,32 +211,68 @@ class ExperimentCommand : CliktCommand(name = "sc20-experiment") { } } -sealed class Reporter(name: String) : OptionGroup(name) { +/** + * An option for specifying the type of reporter to use. + */ +internal sealed class Reporter(name: String) : OptionGroup(name) { /** - * Create the [Sc20Reporter] for this option. + * Create the [ExperimentReporter] for this option. */ - abstract fun createReporter(): Sc20Reporter -} + abstract fun createReporter(): ExperimentReporter -class Parquet : Reporter("Options for reporting using Parquet") { - private val path by option(help = "path to where the output should be stored") - .file() - .defaultLazy { File("data/results-${System.currentTimeMillis()}.parquet") } + class Parquet : Reporter("Options for reporting using Parquet") { + private val path by option("--parquet-path", help = "path to where the output should be stored") + .file() + .defaultLazy { File("data/results-${System.currentTimeMillis()}.parquet") } + + override fun createReporter(): ExperimentReporter = + ExperimentParquetReporter(path) + } + + class Postgres : Reporter("Options for reporting using PostgreSQL") { + private val url by option("--postgres-url", help = "JDBC connection url").required() + private val experimentId by option(help = "Experiment ID").long().required() - override fun createReporter(): Sc20Reporter = Sc20ParquetReporter(path) + override fun createReporter(): ExperimentReporter { + val conn = DriverManager.getConnection(url) + return ExperimentPostgresReporter(conn, experimentId) + } + } } -class Postgres : Reporter("Options for reporting using PostgreSQL") { - private val url by option(help = "JDBC connection url").required() - private val experimentId by option(help = "Experiment ID").long().required() +/** + * An option for specifying the type of trace to use. + */ +internal sealed class Trace(type: String) : OptionGroup(type) { + /** + * Create a [TraceReader] for this type of trace. + */ + abstract fun createTraceReader(performanceInterferenceModel: PerformanceInterferenceModel, vms: List<String>, seed: Int): TraceReader<VmWorkload> + + class Sc20Parquet : Trace("SC20 Parquet format") { + /** + * Path to trace directory. + */ + private val path by option("--trace-path", help = "path to the trace directory") + .file(canBeFile = false) + .required() - override fun createReporter(): Sc20Reporter { - val conn = DriverManager.getConnection(url) - return Sc20PostgresReporter(conn, experimentId) + override fun createTraceReader( + performanceInterferenceModel: PerformanceInterferenceModel, + vms: List<String>, + seed: Int + ): TraceReader<VmWorkload> { + return Sc20ParquetTraceReader( + path, + performanceInterferenceModel, + vms, + Random(seed) + ) + } } } /** * Main entry point of the experiment. */ -fun main(args: Array<String>) = ExperimentCommand().main(args) +fun main(args: Array<String>) = ExperimentCli().main(args) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Reporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporter.kt index 84500417..0403a3b5 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Reporter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporter.kt @@ -22,13 +22,16 @@ * SOFTWARE. */ -package com.atlarge.opendc.experiments.sc20 +package com.atlarge.opendc.experiments.sc20.reporter import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.virt.driver.VirtDriver import java.io.Closeable -interface Sc20Reporter : Closeable { +/** + * A reporter used by experiments to report metrics. + */ +interface ExperimentReporter : Closeable { /** * This method is invoked when the state of a VM changes. */ diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetExperimentReporter.kt index f2139144..6b3351d4 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetExperimentReporter.kt @@ -1,4 +1,28 @@ -package com.atlarge.opendc.experiments.sc20 +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.reporter import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Server @@ -18,7 +42,8 @@ import kotlin.concurrent.thread private val logger = KotlinLogging.logger {} -class Sc20ParquetReporter(destination: File) : Sc20Reporter { +class ExperimentParquetReporter(destination: File) : + ExperimentReporter { private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>() private val schema = SchemaBuilder .record("slice") diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20PostgresReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt index 1b91e843..18019aa5 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20PostgresReporter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt @@ -22,7 +22,7 @@ * SOFTWARE. */ -package com.atlarge.opendc.experiments.sc20 +package com.atlarge.opendc.experiments.sc20.reporter import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Server @@ -37,7 +37,7 @@ import kotlin.concurrent.thread private val logger = KotlinLogging.logger {} -class Sc20PostgresReporter(val conn: Connection, val experimentId: Long) : Sc20Reporter { +class ExperimentPostgresReporter(val conn: Connection, val experimentId: Long) : ExperimentReporter { private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>() private val queue = ArrayBlockingQueue<Action>(2048) private val writerThread = thread(start = true, name = "sc20-writer") { diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt index 8ae1693c..8a204ca3 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2019 atlarge-research + * Copyright (c) 2020 atlarge-research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,7 +22,7 @@ * SOFTWARE. */ -package com.atlarge.opendc.experiments.sc20 +package com.atlarge.opendc.experiments.sc20.trace import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment import com.atlarge.opendc.compute.core.image.VmImage @@ -82,7 +82,11 @@ class Sc20ParquetTraceReader( if (selectedVms.isEmpty()) null else - FilterCompat.get(FilterApi.userDefined(FilterApi.binaryColumn("id"), SelectedVmFilter(TreeSet(selectedVms)))) + FilterCompat.get(FilterApi.userDefined(FilterApi.binaryColumn("id"), + SelectedVmFilter( + TreeSet(selectedVms) + ) + )) /** * A poisonous fragment. @@ -231,7 +235,8 @@ class Sc20ParquetTraceReader( Random(random.nextInt()) ) val vmWorkload = VmWorkload( - uid, "VM Workload $id", UnnamedUser, + uid, "VM Workload $id", + UnnamedUser, VmImage( uid, id, @@ -242,7 +247,10 @@ class Sc20ParquetTraceReader( ) ) - TraceEntryImpl(submissionTime, vmWorkload) + TraceEntryImpl( + submissionTime, + vmWorkload + ) } .sortedBy { it.submissionTime } .toList() diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt index c62f59f9..04cdd302 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt @@ -22,7 +22,7 @@ * SOFTWARE. */ -package com.atlarge.opendc.experiments.sc20 +package com.atlarge.opendc.experiments.sc20.trace import org.apache.avro.SchemaBuilder import org.apache.avro.generic.GenericData @@ -139,7 +139,14 @@ fun main(args: Array<String>) { ) } else { val fragment = - Fragment(vmId, timestamp, flops, traceInterval, cpuUsage, cores) + Fragment( + vmId, + timestamp, + flops, + traceInterval, + cpuUsage, + cores + ) if (last != null) { yield(last!!) } diff --git a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt index 239d018a..5177c04a 100644 --- a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt +++ b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt @@ -31,6 +31,7 @@ import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.workload.VmWorkload import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy +import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporter import com.atlarge.opendc.format.environment.EnvironmentReader import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader import com.atlarge.opendc.format.trace.TraceReader @@ -63,7 +64,7 @@ class Sc20IntegrationTest { /** * The monitor used to keep track of the metrics. */ - private lateinit var monitor: TestSc20Reporter + private lateinit var monitor: TestExperimentReporter /** * Setup the experimental environment. @@ -73,7 +74,7 @@ class Sc20IntegrationTest { val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() simulationEngine = provider("test") root = simulationEngine.newDomain("root") - monitor = TestSc20Reporter() + monitor = TestExperimentReporter() } /** @@ -151,7 +152,7 @@ class Sc20IntegrationTest { return Sc20ClusterEnvironmentReader(stream) } - class TestSc20Reporter : Sc20Reporter { + class TestExperimentReporter : ExperimentReporter { var totalRequestedBurst = 0L var totalGrantedBurst = 0L var totalOvercommissionedBurst = 0L |
