summaryrefslogtreecommitdiff
path: root/opendc/opendc-experiments-sc20/src
diff options
context:
space:
mode:
Diffstat (limited to 'opendc/opendc-experiments-sc20/src')
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt13
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt (renamed from opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt)95
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporter.kt (renamed from opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Reporter.kt)7
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetExperimentReporter.kt (renamed from opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt)29
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt (renamed from opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20PostgresReporter.kt)4
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt (renamed from opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt)18
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt (renamed from opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt)11
-rw-r--r--opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt7
8 files changed, 139 insertions, 45 deletions
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt
index e37dea8b..e8222eb0 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt
@@ -39,6 +39,8 @@ import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy
import com.atlarge.opendc.core.failure.CorrelatedFaultInjector
import com.atlarge.opendc.core.failure.FailureDomain
import com.atlarge.opendc.core.failure.FaultInjector
+import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporter
+import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader
import com.atlarge.opendc.format.environment.EnvironmentReader
import com.atlarge.opendc.format.trace.TraceReader
import kotlinx.coroutines.ExperimentalCoroutinesApi
@@ -105,7 +107,12 @@ fun createFaultInjector(domain: Domain, random: Random, failureInterval: Int): F
* Create the trace reader from which the VM workloads are read.
*/
fun createTraceReader(path: File, performanceInterferenceModel: PerformanceInterferenceModel, vms: List<String>, seed: Int): Sc20ParquetTraceReader {
- return Sc20ParquetTraceReader(path, performanceInterferenceModel, vms, Random(seed))
+ return Sc20ParquetTraceReader(
+ path,
+ performanceInterferenceModel,
+ vms,
+ Random(seed)
+ )
}
/**
@@ -134,7 +141,7 @@ suspend fun createProvisioner(
* Attach the specified monitor to the VM provisioner.
*/
@OptIn(ExperimentalCoroutinesApi::class)
-suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: Sc20Reporter) {
+suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: ExperimentReporter) {
val domain = simulationContext.domain
val hypervisors = scheduler.drivers()
@@ -178,7 +185,7 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: Sc
/**
* Process the trace.
*/
-suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, reporter: Sc20Reporter, vmPlacements: Map<String, String> = emptyMap()) {
+suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, reporter: ExperimentReporter, vmPlacements: Map<String, String> = emptyMap()) {
val domain = simulationContext.domain
try {
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt
index 51448c9e..b2fbba39 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt
@@ -1,7 +1,7 @@
/*
* MIT License
*
- * Copyright (c) 2019 atlarge-research
+ * Copyright (c) 2020 atlarge-research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -25,13 +25,20 @@
package com.atlarge.opendc.experiments.sc20
import com.atlarge.odcsim.SimulationEngineProvider
+import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
+import com.atlarge.opendc.compute.core.workload.VmWorkload
import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy
import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy
import com.atlarge.opendc.compute.virt.service.allocation.NumberOfActiveServersAllocationPolicy
import com.atlarge.opendc.compute.virt.service.allocation.ProvisionedCoresAllocationPolicy
import com.atlarge.opendc.compute.virt.service.allocation.RandomAllocationPolicy
import com.atlarge.opendc.compute.virt.service.allocation.ReplayAllocationPolicy
+import com.atlarge.opendc.experiments.sc20.reporter.ExperimentParquetReporter
+import com.atlarge.opendc.experiments.sc20.reporter.ExperimentPostgresReporter
+import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporter
+import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader
import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
+import com.atlarge.opendc.format.trace.TraceReader
import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader
import com.atlarge.opendc.format.trace.sc20.Sc20VmPlacementReader
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
@@ -72,14 +79,14 @@ private val logger = KotlinLogging.logger {}
/**
* Represents the command for running the experiment.
*/
-class ExperimentCommand : CliktCommand(name = "sc20-experiment") {
+class ExperimentCli : CliktCommand(name = "sc20-experiment") {
private val environment by option("--environment-file", help = "path to the environment file")
.file()
.required()
private val performanceInterferenceStream by option("--performance-interference-file", help = "path to the performance interference file")
.file()
.convert { it.inputStream() as InputStream }
- .defaultLazy { ExperimentCommand::class.java.getResourceAsStream("/env/performance-interference.json") }
+ .defaultLazy { ExperimentCli::class.java.getResourceAsStream("/env/performance-interference.json") }
private val vmPlacements by option("--vm-placements-file", help = "path to the VM placement file")
.file()
@@ -111,13 +118,13 @@ class ExperimentCommand : CliktCommand(name = "sc20-experiment") {
)
.default("core-mem")
- private val trace by option("--trace-directory", help = "path to the trace directory")
- .file(canBeFile = false)
- .required()
+ private val trace by option().groupChoice(
+ "sc20-parquet" to Trace.Sc20Parquet()
+ ).required()
private val reporter by option().groupChoice(
- "parquet" to Parquet(),
- "postgres" to Postgres()
+ "parquet" to Reporter.Parquet(),
+ "postgres" to Reporter.Postgres()
).required()
private fun parseVMs(string: String): List<String> {
@@ -135,7 +142,7 @@ class ExperimentCommand : CliktCommand(name = "sc20-experiment") {
logger.info("allocation-policy: $allocationPolicy")
val start = System.currentTimeMillis()
- val reporter: Sc20Reporter = reporter.createReporter()
+ val reporter: ExperimentReporter = reporter.createReporter()
val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
val system = provider("test")
@@ -164,7 +171,7 @@ class ExperimentCommand : CliktCommand(name = "sc20-experiment") {
}
val environmentReader = Sc20ClusterEnvironmentReader(environment)
val traceReader = try {
- createTraceReader(trace, performanceInterferenceModel, selectedVms, seed)
+ trace.createTraceReader(performanceInterferenceModel, selectedVms, seed)
} catch (e: Throwable) {
reporter.close()
throw e
@@ -204,32 +211,68 @@ class ExperimentCommand : CliktCommand(name = "sc20-experiment") {
}
}
-sealed class Reporter(name: String) : OptionGroup(name) {
+/**
+ * An option for specifying the type of reporter to use.
+ */
+internal sealed class Reporter(name: String) : OptionGroup(name) {
/**
- * Create the [Sc20Reporter] for this option.
+ * Create the [ExperimentReporter] for this option.
*/
- abstract fun createReporter(): Sc20Reporter
-}
+ abstract fun createReporter(): ExperimentReporter
-class Parquet : Reporter("Options for reporting using Parquet") {
- private val path by option(help = "path to where the output should be stored")
- .file()
- .defaultLazy { File("data/results-${System.currentTimeMillis()}.parquet") }
+ class Parquet : Reporter("Options for reporting using Parquet") {
+ private val path by option("--parquet-path", help = "path to where the output should be stored")
+ .file()
+ .defaultLazy { File("data/results-${System.currentTimeMillis()}.parquet") }
+
+ override fun createReporter(): ExperimentReporter =
+ ExperimentParquetReporter(path)
+ }
+
+ class Postgres : Reporter("Options for reporting using PostgreSQL") {
+ private val url by option("--postgres-url", help = "JDBC connection url").required()
+ private val experimentId by option(help = "Experiment ID").long().required()
- override fun createReporter(): Sc20Reporter = Sc20ParquetReporter(path)
+ override fun createReporter(): ExperimentReporter {
+ val conn = DriverManager.getConnection(url)
+ return ExperimentPostgresReporter(conn, experimentId)
+ }
+ }
}
-class Postgres : Reporter("Options for reporting using PostgreSQL") {
- private val url by option(help = "JDBC connection url").required()
- private val experimentId by option(help = "Experiment ID").long().required()
+/**
+ * An option for specifying the type of trace to use.
+ */
+internal sealed class Trace(type: String) : OptionGroup(type) {
+ /**
+ * Create a [TraceReader] for this type of trace.
+ */
+ abstract fun createTraceReader(performanceInterferenceModel: PerformanceInterferenceModel, vms: List<String>, seed: Int): TraceReader<VmWorkload>
+
+ class Sc20Parquet : Trace("SC20 Parquet format") {
+ /**
+ * Path to trace directory.
+ */
+ private val path by option("--trace-path", help = "path to the trace directory")
+ .file(canBeFile = false)
+ .required()
- override fun createReporter(): Sc20Reporter {
- val conn = DriverManager.getConnection(url)
- return Sc20PostgresReporter(conn, experimentId)
+ override fun createTraceReader(
+ performanceInterferenceModel: PerformanceInterferenceModel,
+ vms: List<String>,
+ seed: Int
+ ): TraceReader<VmWorkload> {
+ return Sc20ParquetTraceReader(
+ path,
+ performanceInterferenceModel,
+ vms,
+ Random(seed)
+ )
+ }
}
}
/**
* Main entry point of the experiment.
*/
-fun main(args: Array<String>) = ExperimentCommand().main(args)
+fun main(args: Array<String>) = ExperimentCli().main(args)
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Reporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporter.kt
index 84500417..0403a3b5 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Reporter.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporter.kt
@@ -22,13 +22,16 @@
* SOFTWARE.
*/
-package com.atlarge.opendc.experiments.sc20
+package com.atlarge.opendc.experiments.sc20.reporter
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.virt.driver.VirtDriver
import java.io.Closeable
-interface Sc20Reporter : Closeable {
+/**
+ * A reporter used by experiments to report metrics.
+ */
+interface ExperimentReporter : Closeable {
/**
* This method is invoked when the state of a VM changes.
*/
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetExperimentReporter.kt
index f2139144..6b3351d4 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetExperimentReporter.kt
@@ -1,4 +1,28 @@
-package com.atlarge.opendc.experiments.sc20
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.reporter
import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Server
@@ -18,7 +42,8 @@ import kotlin.concurrent.thread
private val logger = KotlinLogging.logger {}
-class Sc20ParquetReporter(destination: File) : Sc20Reporter {
+class ExperimentParquetReporter(destination: File) :
+ ExperimentReporter {
private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>()
private val schema = SchemaBuilder
.record("slice")
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20PostgresReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt
index 1b91e843..18019aa5 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20PostgresReporter.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt
@@ -22,7 +22,7 @@
* SOFTWARE.
*/
-package com.atlarge.opendc.experiments.sc20
+package com.atlarge.opendc.experiments.sc20.reporter
import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Server
@@ -37,7 +37,7 @@ import kotlin.concurrent.thread
private val logger = KotlinLogging.logger {}
-class Sc20PostgresReporter(val conn: Connection, val experimentId: Long) : Sc20Reporter {
+class ExperimentPostgresReporter(val conn: Connection, val experimentId: Long) : ExperimentReporter {
private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>()
private val queue = ArrayBlockingQueue<Action>(2048)
private val writerThread = thread(start = true, name = "sc20-writer") {
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt
index 8ae1693c..8a204ca3 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt
@@ -1,7 +1,7 @@
/*
* MIT License
*
- * Copyright (c) 2019 atlarge-research
+ * Copyright (c) 2020 atlarge-research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -22,7 +22,7 @@
* SOFTWARE.
*/
-package com.atlarge.opendc.experiments.sc20
+package com.atlarge.opendc.experiments.sc20.trace
import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment
import com.atlarge.opendc.compute.core.image.VmImage
@@ -82,7 +82,11 @@ class Sc20ParquetTraceReader(
if (selectedVms.isEmpty())
null
else
- FilterCompat.get(FilterApi.userDefined(FilterApi.binaryColumn("id"), SelectedVmFilter(TreeSet(selectedVms))))
+ FilterCompat.get(FilterApi.userDefined(FilterApi.binaryColumn("id"),
+ SelectedVmFilter(
+ TreeSet(selectedVms)
+ )
+ ))
/**
* A poisonous fragment.
@@ -231,7 +235,8 @@ class Sc20ParquetTraceReader(
Random(random.nextInt())
)
val vmWorkload = VmWorkload(
- uid, "VM Workload $id", UnnamedUser,
+ uid, "VM Workload $id",
+ UnnamedUser,
VmImage(
uid,
id,
@@ -242,7 +247,10 @@ class Sc20ParquetTraceReader(
)
)
- TraceEntryImpl(submissionTime, vmWorkload)
+ TraceEntryImpl(
+ submissionTime,
+ vmWorkload
+ )
}
.sortedBy { it.submissionTime }
.toList()
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt
index c62f59f9..04cdd302 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt
@@ -22,7 +22,7 @@
* SOFTWARE.
*/
-package com.atlarge.opendc.experiments.sc20
+package com.atlarge.opendc.experiments.sc20.trace
import org.apache.avro.SchemaBuilder
import org.apache.avro.generic.GenericData
@@ -139,7 +139,14 @@ fun main(args: Array<String>) {
)
} else {
val fragment =
- Fragment(vmId, timestamp, flops, traceInterval, cpuUsage, cores)
+ Fragment(
+ vmId,
+ timestamp,
+ flops,
+ traceInterval,
+ cpuUsage,
+ cores
+ )
if (last != null) {
yield(last!!)
}
diff --git a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
index 239d018a..5177c04a 100644
--- a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
+++ b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
@@ -31,6 +31,7 @@ import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.workload.VmWorkload
import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService
import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy
+import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporter
import com.atlarge.opendc.format.environment.EnvironmentReader
import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
import com.atlarge.opendc.format.trace.TraceReader
@@ -63,7 +64,7 @@ class Sc20IntegrationTest {
/**
* The monitor used to keep track of the metrics.
*/
- private lateinit var monitor: TestSc20Reporter
+ private lateinit var monitor: TestExperimentReporter
/**
* Setup the experimental environment.
@@ -73,7 +74,7 @@ class Sc20IntegrationTest {
val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
simulationEngine = provider("test")
root = simulationEngine.newDomain("root")
- monitor = TestSc20Reporter()
+ monitor = TestExperimentReporter()
}
/**
@@ -151,7 +152,7 @@ class Sc20IntegrationTest {
return Sc20ClusterEnvironmentReader(stream)
}
- class TestSc20Reporter : Sc20Reporter {
+ class TestExperimentReporter : ExperimentReporter {
var totalRequestedBurst = 0L
var totalGrantedBurst = 0L
var totalOvercommissionedBurst = 0L