summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--buildSrc/src/main/kotlin/library.kt2
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt36
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt2
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt103
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningEvent.kt49
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt6
-rw-r--r--opendc/opendc-experiments-sc20/build.gradle.kts9
-rw-r--r--opendc/opendc-experiments-sc20/schema.sql22
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt159
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt235
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt151
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20PostgresReporter.kt200
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Experiment.kt78
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt (renamed from opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt)69
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolio.kt90
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt155
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt146
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Scenario.kt48
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/OperationalPhenomena.kt33
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Topology.kt30
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Workload.kt30
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ExperimentMonitor.kt (renamed from opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Reporter.kt)35
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt145
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt75
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ContainerExperimentDescriptor.kt68
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ExperimentDescriptor.kt81
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ExperimentRunner.kt51
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/TrialExperimentDescriptor.kt32
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionContext.kt45
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionListener.kt48
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionResult.kt42
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentScheduler.kt59
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt85
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt62
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/Event.kt35
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/HostEvent.kt44
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/ProvisionerEvent.kt39
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/RunEvent.kt35
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/VmEvent.kt43
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt121
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt81
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt67
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt78
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt88
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt167
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt (renamed from opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt)24
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt (renamed from opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt)11
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt69
-rw-r--r--opendc/opendc-experiments-sc20/src/main/resources/env/performance-interference.json7
-rw-r--r--opendc/opendc-experiments-sc20/src/main/resources/env/setup-small.json21
-rw-r--r--opendc/opendc-experiments-sc20/src/main/resources/env/setup-test.json36
-rw-r--r--opendc/opendc-experiments-sc20/src/main/resources/log4j2.xml5
-rw-r--r--opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt55
-rw-r--r--opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/PerformanceInterferenceModelReader.kt3
-rw-r--r--opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt2
-rw-r--r--opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt30
-rw-r--r--opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt2
57 files changed, 2757 insertions, 787 deletions
diff --git a/buildSrc/src/main/kotlin/library.kt b/buildSrc/src/main/kotlin/library.kt
index 3b05f3a4..de88f114 100644
--- a/buildSrc/src/main/kotlin/library.kt
+++ b/buildSrc/src/main/kotlin/library.kt
@@ -45,5 +45,5 @@ object Library {
/**
* Kotlin coroutines support
*/
- val KOTLINX_COROUTINES = "1.3.5"
+ val KOTLINX_COROUTINES = "1.3.6"
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt
index 45024a49..f458877b 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt
@@ -40,30 +40,12 @@ const val IMAGE_PERF_INTERFERENCE_MODEL = "image:performance-interference"
* @param items The [PerformanceInterferenceModelItem]s that make up this model.
*/
class PerformanceInterferenceModel(
- items: Set<PerformanceInterferenceModelItem>,
+ val items: SortedSet<PerformanceInterferenceModelItem>,
val random: Random = Random(0)
) {
private var intersectingItems: List<PerformanceInterferenceModelItem> = emptyList()
- private val comparator = Comparator<PerformanceInterferenceModelItem> { lhs, rhs ->
- var cmp = lhs.performanceScore.compareTo(rhs.performanceScore)
- if (cmp != 0) {
- return@Comparator cmp
- }
-
- cmp = lhs.minServerLoad.compareTo(rhs.minServerLoad)
- if (cmp != 0) {
- return@Comparator cmp
- }
-
- lhs.hashCode().compareTo(rhs.hashCode())
- }
- val items = TreeSet(comparator)
private val colocatedWorkloads = TreeSet<String>()
- init {
- this.items.addAll(items)
- }
-
fun vmStarted(server: Server) {
colocatedWorkloads.add(server.image.name)
intersectingItems = items.filter { item -> doesMatch(item) }
@@ -113,7 +95,7 @@ data class PerformanceInterferenceModelItem(
val workloadNames: SortedSet<String>,
val minServerLoad: Double,
val performanceScore: Double
-) {
+) : Comparable<PerformanceInterferenceModelItem> {
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (javaClass != other?.javaClass) return false
@@ -126,4 +108,18 @@ data class PerformanceInterferenceModelItem(
}
override fun hashCode(): Int = workloadNames.hashCode()
+
+ override fun compareTo(other: PerformanceInterferenceModelItem): Int {
+ var cmp = performanceScore.compareTo(other.performanceScore)
+ if (cmp != 0) {
+ return cmp
+ }
+
+ cmp = minServerLoad.compareTo(other.minServerLoad)
+ if (cmp != 0) {
+ return cmp
+ }
+
+ return hashCode().compareTo(other.hashCode())
+ }
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
index 53fa463b..ce814dd8 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
@@ -339,7 +339,7 @@ class SimpleVirtDriver(
min(totalRequestedSubBurst, totalGrantedBurst), // We can run more than requested due to timing
totalOvercommissionedBurst,
totalInterferedBurst, // Might be smaller than zero due to FP rounding errors,
- totalAllocatedUsage,
+ min(totalAllocatedUsage, totalRequestedUsage), // The allocated usage might be slightly higher due to FP rounding
totalRequestedUsage,
vmCount, // Some VMs might already have finished, so keep initial VM count
server
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
index 3603ae69..c3d9c745 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
@@ -1,6 +1,7 @@
package com.atlarge.opendc.compute.virt.service
import com.atlarge.odcsim.SimulationContext
+import com.atlarge.odcsim.flow.EventFlow
import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.Server
@@ -19,6 +20,7 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
+import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
@@ -57,11 +59,11 @@ class SimpleVirtProvisioningService(
*/
private val activeImages: MutableSet<ImageView> = mutableSetOf()
- public var submittedVms = 0L
- public var queuedVms = 0L
- public var runningVms = 0L
- public var finishedVms = 0L
- public var unscheduledVms = 0L
+ public var submittedVms = 0
+ public var queuedVms = 0
+ public var runningVms = 0
+ public var finishedVms = 0
+ public var unscheduledVms = 0
private var maxCores = 0
private var maxMemory = 0L
@@ -71,6 +73,13 @@ class SimpleVirtProvisioningService(
*/
private val allocationLogic = allocationPolicy()
+ /**
+ * The [EventFlow] to emit the events.
+ */
+ internal val eventFlow = EventFlow<VirtProvisioningEvent>()
+
+ override val events: Flow<VirtProvisioningEvent> = eventFlow
+
init {
launch {
val provisionedNodes = provisioningService.nodes()
@@ -96,8 +105,17 @@ class SimpleVirtProvisioningService(
image: Image,
flavor: Flavor
): Server = withContext(coroutineContext) {
- submittedVms++
- queuedVms++
+ eventFlow.emit(VirtProvisioningEvent.MetricsAvailable(
+ this@SimpleVirtProvisioningService,
+ hypervisors.size,
+ availableHypervisors.size,
+ ++submittedVms,
+ runningVms,
+ finishedVms,
+ ++queuedVms,
+ unscheduledVms
+ ))
+
suspendCancellableCoroutine<Server> { cont ->
val vmInstance = ImageView(name, image, flavor, cont)
incomingImages += vmInstance
@@ -141,7 +159,17 @@ class SimpleVirtProvisioningService(
if (selectedHv == null) {
if (requiredMemory > maxMemory || imageInstance.flavor.cpuCount > maxCores) {
- unscheduledVms++
+ eventFlow.emit(VirtProvisioningEvent.MetricsAvailable(
+ this@SimpleVirtProvisioningService,
+ hypervisors.size,
+ availableHypervisors.size,
+ submittedVms,
+ runningVms,
+ finishedVms,
+ queuedVms,
+ ++unscheduledVms
+ ))
+
incomingImages -= imageInstance
logger.warn("Failed to spawn ${imageInstance.image}: does not fit [${clock.millis()}]")
@@ -168,8 +196,17 @@ class SimpleVirtProvisioningService(
)
imageInstance.server = server
imageInstance.continuation.resume(server)
- queuedVms--
- runningVms++
+
+ eventFlow.emit(VirtProvisioningEvent.MetricsAvailable(
+ this@SimpleVirtProvisioningService,
+ hypervisors.size,
+ availableHypervisors.size,
+ submittedVms,
+ ++runningVms,
+ finishedVms,
+ --queuedVms,
+ unscheduledVms
+ ))
activeImages += imageInstance
server.events
@@ -178,8 +215,17 @@ class SimpleVirtProvisioningService(
is ServerEvent.StateChanged -> {
if (event.server.state == ServerState.SHUTOFF) {
logger.info { "Server ${event.server.uid} ${event.server.name} ${event.server.flavor} finished." }
- runningVms--
- finishedVms++
+
+ eventFlow.emit(VirtProvisioningEvent.MetricsAvailable(
+ this@SimpleVirtProvisioningService,
+ hypervisors.size,
+ availableHypervisors.size,
+ submittedVms,
+ --runningVms,
+ ++finishedVms,
+ queuedVms,
+ unscheduledVms
+ ))
activeImages -= imageInstance
selectedHv.provisionedCores -= server.flavor.cpuCount
@@ -223,11 +269,33 @@ class SimpleVirtProvisioningService(
maxMemory = max(maxMemory, server.flavor.memorySize)
hypervisors[server] = hv
}
+
+ eventFlow.emit(VirtProvisioningEvent.MetricsAvailable(
+ this@SimpleVirtProvisioningService,
+ hypervisors.size,
+ availableHypervisors.size,
+ submittedVms,
+ runningVms,
+ finishedVms,
+ queuedVms,
+ unscheduledVms
+ ))
}
ServerState.SHUTOFF, ServerState.ERROR -> {
val hv = hypervisors[server] ?: return
availableHypervisors -= hv
+ eventFlow.emit(VirtProvisioningEvent.MetricsAvailable(
+ this@SimpleVirtProvisioningService,
+ hypervisors.size,
+ availableHypervisors.size,
+ submittedVms,
+ runningVms,
+ finishedVms,
+ queuedVms,
+ unscheduledVms
+ ))
+
if (incomingImages.isNotEmpty()) {
requestCycle()
}
@@ -242,6 +310,17 @@ class SimpleVirtProvisioningService(
hv.driver = server.services[VirtDriver]
availableHypervisors += hv
+ eventFlow.emit(VirtProvisioningEvent.MetricsAvailable(
+ this@SimpleVirtProvisioningService,
+ hypervisors.size,
+ availableHypervisors.size,
+ submittedVms,
+ runningVms,
+ finishedVms,
+ queuedVms,
+ unscheduledVms
+ ))
+
hv.driver.events
.onEach { event ->
if (event is HypervisorEvent.VmsUpdated) {
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningEvent.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningEvent.kt
new file mode 100644
index 00000000..c3fb99f9
--- /dev/null
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningEvent.kt
@@ -0,0 +1,49 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.compute.virt.service
+
+/**
+ * An event that is emitted by the [VirtProvisioningService].
+ */
+public sealed class VirtProvisioningEvent {
+ /**
+ * The service that has emitted the event.
+ */
+ public abstract val provisioner: VirtProvisioningService
+
+ /**
+ * An event emitted for writing metrics.
+ */
+ data class MetricsAvailable(
+ override val provisioner: VirtProvisioningService,
+ public val totalHostCount: Int,
+ public val availableHostCount: Int,
+ public val totalVmCount: Int,
+ public val activeVmCount: Int,
+ public val inactiveVmCount: Int,
+ public val waitingVmCount: Int,
+ public val failedVmCount: Int
+ ) : VirtProvisioningEvent()
+}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt
index 2ad7df84..c4cbd711 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt
@@ -5,6 +5,7 @@ import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.compute.virt.driver.VirtDriver
import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy
+import kotlinx.coroutines.flow.Flow
/**
* A service for VM provisioning on a cloud.
@@ -16,6 +17,11 @@ interface VirtProvisioningService {
val allocationPolicy: AllocationPolicy
/**
+ * The events emitted by the service.
+ */
+ public val events: Flow<VirtProvisioningEvent>
+
+ /**
* Obtain the active hypervisors for this provisioner.
*/
public suspend fun drivers(): Set<VirtDriver>
diff --git a/opendc/opendc-experiments-sc20/build.gradle.kts b/opendc/opendc-experiments-sc20/build.gradle.kts
index 6b6366a7..46d99564 100644
--- a/opendc/opendc-experiments-sc20/build.gradle.kts
+++ b/opendc/opendc-experiments-sc20/build.gradle.kts
@@ -31,23 +31,26 @@ plugins {
}
application {
- mainClassName = "com.atlarge.opendc.experiments.sc20.Sc20ExperimentKt"
- applicationDefaultJvmArgs = listOf("-Xmx2500M", "-Xms1800M")
+ mainClassName = "com.atlarge.opendc.experiments.sc20.MainKt"
+ applicationDefaultJvmArgs = listOf("-Xms2500M")
}
dependencies {
api(project(":opendc:opendc-core"))
implementation(project(":opendc:opendc-format"))
implementation(kotlin("stdlib"))
+
implementation("com.github.ajalt:clikt:2.6.0")
+ implementation("me.tongfei:progressbar:0.8.1")
implementation("io.github.microutils:kotlin-logging:1.7.9")
+
implementation("org.apache.parquet:parquet-avro:1.11.0")
implementation("org.apache.hadoop:hadoop-client:3.2.1") {
exclude(group = "org.slf4j", module = "slf4j-log4j12")
exclude(group = "log4j")
}
+
runtimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:2.13.1")
- runtimeOnly("org.postgresql:postgresql:42.2.12")
runtimeOnly(project(":odcsim:odcsim-engine-omega"))
testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}")
diff --git a/opendc/opendc-experiments-sc20/schema.sql b/opendc/opendc-experiments-sc20/schema.sql
deleted file mode 100644
index 51990a75..00000000
--- a/opendc/opendc-experiments-sc20/schema.sql
+++ /dev/null
@@ -1,22 +0,0 @@
-DROP TABLE IF EXISTS host_reports;
-CREATE TABLE host_reports (
- id BIGSERIAL PRIMARY KEY NOT NULL,
- experiment_id BIGINT NOT NULL,
- time BIGINT NOT NULL,
- duration BIGINT NOT NULL,
- requested_burst BIGINT NOT NULL,
- granted_burst BIGINT NOT NULL,
- overcommissioned_burst BIGINT NOT NULL,
- interfered_burst BIGINT NOT NULL,
- cpu_usage DOUBLE PRECISION NOT NULL,
- cpu_demand DOUBLE PRECISION NOT NULL,
- image_count INTEGER NOT NULL,
- server TEXT NOT NULL,
- host_state TEXT NOT NULL,
- host_usage DOUBLE PRECISION NOT NULL,
- power_draw DOUBLE PRECISION NOT NULL,
- total_submitted_vms BIGINT NOT NULL,
- total_queued_vms BIGINT NOT NULL,
- total_running_vms BIGINT NOT NULL,
- total_finished_vms BIGINT NOT NULL
-);
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt
new file mode 100644
index 00000000..14de52b8
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt
@@ -0,0 +1,159 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20
+
+import com.atlarge.opendc.experiments.sc20.experiment.Experiment
+import com.atlarge.opendc.experiments.sc20.experiment.HorVerPortfolio
+import com.atlarge.opendc.experiments.sc20.experiment.MoreHpcPortfolio
+import com.atlarge.opendc.experiments.sc20.experiment.MoreVelocityPortfolio
+import com.atlarge.opendc.experiments.sc20.experiment.OperationalPhenomenaPortfolio
+import com.atlarge.opendc.experiments.sc20.experiment.Portfolio
+import com.atlarge.opendc.experiments.sc20.experiment.TestPortfolio
+import com.atlarge.opendc.experiments.sc20.reporter.ConsoleExperimentReporter
+import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor
+import com.atlarge.opendc.experiments.sc20.runner.execution.ThreadPoolExperimentScheduler
+import com.atlarge.opendc.experiments.sc20.runner.internal.DefaultExperimentRunner
+import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader
+import com.atlarge.opendc.format.trace.sc20.Sc20VmPlacementReader
+import com.github.ajalt.clikt.core.CliktCommand
+import com.github.ajalt.clikt.parameters.options.convert
+import com.github.ajalt.clikt.parameters.options.default
+import com.github.ajalt.clikt.parameters.options.defaultLazy
+import com.github.ajalt.clikt.parameters.options.multiple
+import com.github.ajalt.clikt.parameters.options.option
+import com.github.ajalt.clikt.parameters.options.required
+import com.github.ajalt.clikt.parameters.types.choice
+import com.github.ajalt.clikt.parameters.types.file
+import com.github.ajalt.clikt.parameters.types.int
+import mu.KotlinLogging
+import java.io.File
+import java.io.InputStream
+
+/**
+ * The logger for this experiment.
+ */
+private val logger = KotlinLogging.logger {}
+
+/**
+ * Represents the command for running the experiment.
+ */
+class ExperimentCli : CliktCommand(name = "sc20-experiment") {
+ /**
+ * The path to the directory where the topology descriptions are located.
+ */
+ private val environmentPath by option("--environment-path", help = "path to the environment directory")
+ .file(canBeFile = false)
+ .required()
+
+ /**
+ * The path to the directory where the traces are located.
+ */
+ private val tracePath by option("--trace-path", help = "path to the traces directory")
+ .file(canBeFile = false)
+ .required()
+
+ /**
+ * The path to the performance interference model.
+ */
+ private val performanceInterferenceStream by option("--performance-interference-model", help = "path to the performance interference file")
+ .file()
+ .convert { it.inputStream() as InputStream }
+
+ /**
+ * The path to the original VM placements file.
+ */
+ private val vmPlacements by option("--vm-placements-file", help = "path to the VM placement file")
+ .file()
+ .convert {
+ Sc20VmPlacementReader(it.inputStream().buffered()).construct()
+ }
+ .default(emptyMap())
+
+ /**
+ * The selected portfolios to run.
+ */
+ private val portfolios by option("--portfolio")
+ .choice(
+ "hor-ver" to { experiment: Experiment, i: Int -> HorVerPortfolio(experiment, i) } as (Experiment, Int) -> Portfolio,
+ "more-velocity" to { experiment, i -> MoreVelocityPortfolio(experiment, i) },
+ "more-hpc" to { experiment, i -> MoreHpcPortfolio(experiment, i) },
+ "operational-phenomena" to { experiment, i -> OperationalPhenomenaPortfolio(experiment, i) },
+ "test" to { experiment, i -> TestPortfolio(experiment, i) },
+ ignoreCase = true
+ )
+ .multiple()
+
+ /**
+ * The maximum number of worker threads to use.
+ */
+ private val parallelism by option("--parallelism")
+ .int()
+ .default(Runtime.getRuntime().availableProcessors())
+
+ /**
+ * The buffer size for writing results.
+ */
+ private val bufferSize by option("--buffer-size")
+ .int()
+ .default(4096)
+
+ /**
+ * The path to the output directory.
+ */
+ private val output by option("-O", "--output", help = "path to the output directory")
+ .file(canBeFile = false)
+ .defaultLazy { File("data") }
+
+ override fun run() {
+ logger.info { "Constructing performance interference model" }
+
+ val performanceInterferenceModel =
+ performanceInterferenceStream?.let { Sc20PerformanceInterferenceReader(it) }
+
+ logger.info { "Creating experiment descriptor" }
+ val descriptor = object : Experiment(environmentPath, tracePath, output, performanceInterferenceModel, vmPlacements, bufferSize) {
+ private val descriptor = this
+ override val children: Sequence<ExperimentDescriptor> = sequence {
+ for ((i, producer) in portfolios.withIndex()) {
+ yield(producer(descriptor, i))
+ }
+ }
+ }
+
+ logger.info { "Starting experiment runner [parallelism=$parallelism]" }
+ val scheduler = ThreadPoolExperimentScheduler(parallelism)
+ val runner = DefaultExperimentRunner(scheduler)
+ try {
+ runner.execute(descriptor, ConsoleExperimentReporter())
+ } finally {
+ scheduler.close()
+ }
+ }
+}
+
+/**
+ * Main entry point of the experiment.
+ */
+fun main(args: Array<String>) = ExperimentCli().main(args)
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt
deleted file mode 100644
index 51448c9e..00000000
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2019 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package com.atlarge.opendc.experiments.sc20
-
-import com.atlarge.odcsim.SimulationEngineProvider
-import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy
-import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy
-import com.atlarge.opendc.compute.virt.service.allocation.NumberOfActiveServersAllocationPolicy
-import com.atlarge.opendc.compute.virt.service.allocation.ProvisionedCoresAllocationPolicy
-import com.atlarge.opendc.compute.virt.service.allocation.RandomAllocationPolicy
-import com.atlarge.opendc.compute.virt.service.allocation.ReplayAllocationPolicy
-import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
-import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader
-import com.atlarge.opendc.format.trace.sc20.Sc20VmPlacementReader
-import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
-import com.fasterxml.jackson.module.kotlin.readValue
-import com.github.ajalt.clikt.core.CliktCommand
-import com.github.ajalt.clikt.parameters.groups.OptionGroup
-import com.github.ajalt.clikt.parameters.groups.default
-import com.github.ajalt.clikt.parameters.groups.groupChoice
-import com.github.ajalt.clikt.parameters.groups.mutuallyExclusiveOptions
-import com.github.ajalt.clikt.parameters.groups.required
-import com.github.ajalt.clikt.parameters.options.convert
-import com.github.ajalt.clikt.parameters.options.default
-import com.github.ajalt.clikt.parameters.options.defaultLazy
-import com.github.ajalt.clikt.parameters.options.flag
-import com.github.ajalt.clikt.parameters.options.option
-import com.github.ajalt.clikt.parameters.options.required
-import com.github.ajalt.clikt.parameters.types.choice
-import com.github.ajalt.clikt.parameters.types.file
-import com.github.ajalt.clikt.parameters.types.int
-import com.github.ajalt.clikt.parameters.types.long
-import kotlinx.coroutines.cancel
-import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.runBlocking
-import mu.KotlinLogging
-import java.io.File
-import java.io.FileReader
-import java.io.InputStream
-import java.sql.DriverManager
-import java.util.ServiceLoader
-import kotlin.random.Random
-
-/**
- * The logger for this experiment.
- */
-private val logger = KotlinLogging.logger {}
-
-/**
- * Represents the command for running the experiment.
- */
-class ExperimentCommand : CliktCommand(name = "sc20-experiment") {
- private val environment by option("--environment-file", help = "path to the environment file")
- .file()
- .required()
- private val performanceInterferenceStream by option("--performance-interference-file", help = "path to the performance interference file")
- .file()
- .convert { it.inputStream() as InputStream }
- .defaultLazy { ExperimentCommand::class.java.getResourceAsStream("/env/performance-interference.json") }
-
- private val vmPlacements by option("--vm-placements-file", help = "path to the VM placement file")
- .file()
- .convert {
- Sc20VmPlacementReader(it.inputStream().buffered()).construct()
- }
- .default(emptyMap())
-
- private val selectedVms by mutuallyExclusiveOptions(
- option("--selected-vms", help = "the VMs to run").convert { parseVMs(it) },
- option("--selected-vms-file").file().convert { parseVMs(FileReader(it).readText()) }
- ).default(emptyList())
-
- private val seed by option(help = "the random seed")
- .int()
- .default(0)
- private val failures by option("-x", "--failures", help = "enable (correlated) machine failures")
- .flag()
- private val failureInterval by option(help = "expected number of hours between failures")
- .int()
- .default(24 * 7) // one week
- private val allocationPolicy by option(help = "name of VM allocation policy to use")
- .choice(
- "mem", "mem-inv",
- "core-mem", "core-mem-inv",
- "active-servers", "active-servers-inv",
- "provisioned-cores", "provisioned-cores-inv",
- "random", "replay"
- )
- .default("core-mem")
-
- private val trace by option("--trace-directory", help = "path to the trace directory")
- .file(canBeFile = false)
- .required()
-
- private val reporter by option().groupChoice(
- "parquet" to Parquet(),
- "postgres" to Postgres()
- ).required()
-
- private fun parseVMs(string: String): List<String> {
- // Handle case where VM list contains a VM name with an (escaped) single-quote in it
- val sanitizedString = string.replace("\\'", "\\\\[")
- .replace("'", "\"")
- .replace("\\\\[", "'")
- val vms: List<String> = jacksonObjectMapper().readValue(sanitizedString)
- return vms
- }
-
- override fun run() {
- logger.info("seed: $seed")
- logger.info("failures: $failures")
- logger.info("allocation-policy: $allocationPolicy")
-
- val start = System.currentTimeMillis()
- val reporter: Sc20Reporter = reporter.createReporter()
-
- val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
- val system = provider("test")
- val root = system.newDomain("root")
-
- val chan = Channel<Unit>(Channel.CONFLATED)
- val allocationPolicy = when (this.allocationPolicy) {
- "mem" -> AvailableMemoryAllocationPolicy()
- "mem-inv" -> AvailableMemoryAllocationPolicy(true)
- "core-mem" -> AvailableCoreMemoryAllocationPolicy()
- "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true)
- "active-servers" -> NumberOfActiveServersAllocationPolicy()
- "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true)
- "provisioned-cores" -> ProvisionedCoresAllocationPolicy()
- "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true)
- "random" -> RandomAllocationPolicy(Random(seed))
- "replay" -> ReplayAllocationPolicy(vmPlacements)
- else -> throw IllegalArgumentException("Unknown policy ${this.allocationPolicy}")
- }
-
- val performanceInterferenceModel = try {
- Sc20PerformanceInterferenceReader(performanceInterferenceStream).construct()
- } catch (e: Throwable) {
- reporter.close()
- throw e
- }
- val environmentReader = Sc20ClusterEnvironmentReader(environment)
- val traceReader = try {
- createTraceReader(trace, performanceInterferenceModel, selectedVms, seed)
- } catch (e: Throwable) {
- reporter.close()
- throw e
- }
-
- root.launch {
- val (bareMetalProvisioner, scheduler) = createProvisioner(root, environmentReader, allocationPolicy)
-
- val failureDomain = if (failures) {
- logger.info("ENABLING failures")
- createFailureDomain(seed, failureInterval, bareMetalProvisioner, chan)
- } else {
- null
- }
-
- attachMonitor(scheduler, reporter)
- processTrace(traceReader, scheduler, chan, reporter, vmPlacements)
-
- logger.debug("SUBMIT=${scheduler.submittedVms}")
- logger.debug("FAIL=${scheduler.unscheduledVms}")
- logger.debug("QUEUED=${scheduler.queuedVms}")
- logger.debug("RUNNING=${scheduler.runningVms}")
- logger.debug("FINISHED=${scheduler.finishedVms}")
-
- failureDomain?.cancel()
- scheduler.terminate()
- logger.info("Simulation took ${System.currentTimeMillis() - start} milliseconds")
- }
-
- runBlocking {
- system.run()
- system.terminate()
- }
-
- // Explicitly close the monitor to flush its buffer
- reporter.close()
- }
-}
-
-sealed class Reporter(name: String) : OptionGroup(name) {
- /**
- * Create the [Sc20Reporter] for this option.
- */
- abstract fun createReporter(): Sc20Reporter
-}
-
-class Parquet : Reporter("Options for reporting using Parquet") {
- private val path by option(help = "path to where the output should be stored")
- .file()
- .defaultLazy { File("data/results-${System.currentTimeMillis()}.parquet") }
-
- override fun createReporter(): Sc20Reporter = Sc20ParquetReporter(path)
-}
-
-class Postgres : Reporter("Options for reporting using PostgreSQL") {
- private val url by option(help = "JDBC connection url").required()
- private val experimentId by option(help = "Experiment ID").long().required()
-
- override fun createReporter(): Sc20Reporter {
- val conn = DriverManager.getConnection(url)
- return Sc20PostgresReporter(conn, experimentId)
- }
-}
-
-/**
- * Main entry point of the experiment.
- */
-fun main(args: Array<String>) = ExperimentCommand().main(args)
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt
deleted file mode 100644
index f2139144..00000000
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt
+++ /dev/null
@@ -1,151 +0,0 @@
-package com.atlarge.opendc.experiments.sc20
-
-import com.atlarge.odcsim.simulationContext
-import com.atlarge.opendc.compute.core.Server
-import com.atlarge.opendc.compute.core.ServerState
-import com.atlarge.opendc.compute.metal.driver.BareMetalDriver
-import com.atlarge.opendc.compute.virt.driver.VirtDriver
-import kotlinx.coroutines.flow.first
-import mu.KotlinLogging
-import org.apache.avro.SchemaBuilder
-import org.apache.avro.generic.GenericData
-import org.apache.hadoop.fs.Path
-import org.apache.parquet.avro.AvroParquetWriter
-import org.apache.parquet.hadoop.metadata.CompressionCodecName
-import java.io.File
-import java.util.concurrent.ArrayBlockingQueue
-import kotlin.concurrent.thread
-
-private val logger = KotlinLogging.logger {}
-
-class Sc20ParquetReporter(destination: File) : Sc20Reporter {
- private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>()
- private val schema = SchemaBuilder
- .record("slice")
- .namespace("com.atlarge.opendc.experiments.sc20")
- .fields()
- .name("time").type().longType().noDefault()
- .name("duration").type().longType().noDefault()
- .name("requested_burst").type().longType().noDefault()
- .name("granted_burst").type().longType().noDefault()
- .name("overcommissioned_burst").type().longType().noDefault()
- .name("interfered_burst").type().longType().noDefault()
- .name("cpu_usage").type().doubleType().noDefault()
- .name("cpu_demand").type().doubleType().noDefault()
- .name("image_count").type().intType().noDefault()
- .name("server").type().stringType().noDefault()
- .name("host_state").type().stringType().noDefault()
- .name("host_usage").type().doubleType().noDefault()
- .name("power_draw").type().doubleType().noDefault()
- .name("total_submitted_vms").type().longType().noDefault()
- .name("total_queued_vms").type().longType().noDefault()
- .name("total_running_vms").type().longType().noDefault()
- .name("total_finished_vms").type().longType().noDefault()
- .endRecord()
- private val writer = AvroParquetWriter.builder<GenericData.Record>(Path(destination.absolutePath))
- .withSchema(schema)
- .withCompressionCodec(CompressionCodecName.SNAPPY)
- .withPageSize(4 * 1024 * 1024) // For compression
- .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
- .build()
- private val queue = ArrayBlockingQueue<GenericData.Record>(2048)
- private val writerThread = thread(start = true, name = "sc20-writer") {
- try {
- while (true) {
- val record = queue.take()
- writer.write(record)
- }
- } catch (e: InterruptedException) {
- // Do not rethrow this
- } finally {
- writer.close()
- }
- }
-
- override suspend fun reportVmStateChange(server: Server) {}
-
- override suspend fun reportHostStateChange(
- driver: VirtDriver,
- server: Server,
- submittedVms: Long,
- queuedVms: Long,
- runningVms: Long,
- finishedVms: Long
- ) {
- val lastServerState = lastServerStates[server]
- if (server.state == ServerState.SHUTOFF && lastServerState != null) {
- val duration = simulationContext.clock.millis() - lastServerState.second
- reportHostSlice(
- simulationContext.clock.millis(),
- 0,
- 0,
- 0,
- 0,
- 0.0,
- 0.0,
- 0,
- server,
- submittedVms,
- queuedVms,
- runningVms,
- finishedVms,
- duration
- )
- }
-
- logger.info("Host ${server.uid} changed state ${server.state} [${simulationContext.clock.millis()}]")
-
- lastServerStates[server] = Pair(server.state, simulationContext.clock.millis())
- }
-
- override suspend fun reportHostSlice(
- time: Long,
- requestedBurst: Long,
- grantedBurst: Long,
- overcommissionedBurst: Long,
- interferedBurst: Long,
- cpuUsage: Double,
- cpuDemand: Double,
- numberOfDeployedImages: Int,
- hostServer: Server,
- submittedVms: Long,
- queuedVms: Long,
- runningVms: Long,
- finishedVms: Long,
- duration: Long
- ) {
- // Assume for now that the host is not virtualized and measure the current power draw
- val driver = hostServer.services[BareMetalDriver.Key]
- val usage = driver.usage.first()
- val powerDraw = driver.powerDraw.first()
-
- val record = GenericData.Record(schema)
- record.put("time", time)
- record.put("duration", duration)
- record.put("requested_burst", requestedBurst)
- record.put("granted_burst", grantedBurst)
- record.put("overcommissioned_burst", overcommissionedBurst)
- record.put("interfered_burst", interferedBurst)
- record.put("cpu_usage", cpuUsage)
- record.put("cpu_demand", cpuDemand)
- record.put("image_count", numberOfDeployedImages)
- record.put("server", hostServer.uid)
- record.put("host_state", hostServer.state)
- record.put("host_usage", usage)
- record.put("power_draw", powerDraw)
- record.put("total_submitted_vms", submittedVms)
- record.put("total_queued_vms", queuedVms)
- record.put("total_running_vms", runningVms)
- record.put("total_finished_vms", finishedVms)
-
- queue.put(record)
- }
-
- override fun close() {
- // Busy loop to wait for writer thread to finish
- while (queue.isNotEmpty()) {
- Thread.sleep(500)
- }
- writerThread.interrupt()
- }
-}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20PostgresReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20PostgresReporter.kt
deleted file mode 100644
index 5c5e6ceb..00000000
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20PostgresReporter.kt
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2020 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package com.atlarge.opendc.experiments.sc20
-
-import com.atlarge.odcsim.simulationContext
-import com.atlarge.opendc.compute.core.Server
-import com.atlarge.opendc.compute.core.ServerState
-import com.atlarge.opendc.compute.metal.driver.BareMetalDriver
-import com.atlarge.opendc.compute.virt.driver.VirtDriver
-import kotlinx.coroutines.flow.first
-import mu.KotlinLogging
-import java.sql.Connection
-import java.util.concurrent.ArrayBlockingQueue
-import java.util.concurrent.atomic.AtomicBoolean
-import kotlin.concurrent.thread
-
-private val logger = KotlinLogging.logger {}
-
-class Sc20PostgresReporter(val conn: Connection, val experimentId: Long) : Sc20Reporter {
- private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>()
- private val queue = ArrayBlockingQueue<Report>(2048)
- private val stop = AtomicBoolean(false)
- private val writerThread = thread(start = true, name = "sc20-writer") {
- val stmt = try {
- conn.prepareStatement(
- """
- INSERT INTO host_reports (experiment_id, time, duration, requested_burst, granted_burst, overcommissioned_burst, interfered_burst, cpu_usage, cpu_demand, image_count, server, host_state, host_usage, power_draw, total_submitted_vms, total_queued_vms, total_running_vms, total_finished_vms)
- VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
- """.trimIndent()
- )
- } catch (e: Throwable) {
- conn.close()
- throw e
- }
-
- val batchSize = 4096
- var batch = 0
-
- try {
- while (!stop.get()) {
- val record = queue.take()
- stmt.setLong(1, experimentId)
- stmt.setLong(2, record.time)
- stmt.setLong(3, record.duration)
- stmt.setLong(4, record.requestedBurst)
- stmt.setLong(5, record.grantedBurst)
- stmt.setLong(6, record.overcommissionedBurst)
- stmt.setLong(7, record.interferedBurst)
- stmt.setDouble(8, record.cpuUsage)
- stmt.setDouble(9, record.cpuDemand)
- stmt.setInt(10, record.numberOfDeployedImages)
- stmt.setString(11, record.hostServer.uid.toString())
- stmt.setString(12, record.hostServer.state.name)
- stmt.setDouble(13, record.hostUsage)
- stmt.setDouble(14, record.powerDraw)
- stmt.setLong(15, record.submittedVms)
- stmt.setLong(16, record.queuedVms)
- stmt.setLong(17, record.runningVms)
- stmt.setLong(18, record.finishedVms)
- stmt.addBatch()
- batch++
-
- if (batch > batchSize) {
- stmt.executeBatch()
- batch = 0
- }
- }
- } finally {
- stmt.executeBatch()
- stmt.close()
- conn.close()
- }
- }
-
- override suspend fun reportVmStateChange(server: Server) {}
-
- override suspend fun reportHostStateChange(
- driver: VirtDriver,
- server: Server,
- submittedVms: Long,
- queuedVms: Long,
- runningVms: Long,
- finishedVms: Long
- ) {
- val lastServerState = lastServerStates[server]
- if (server.state == ServerState.SHUTOFF && lastServerState != null) {
- val duration = simulationContext.clock.millis() - lastServerState.second
- reportHostSlice(
- simulationContext.clock.millis(),
- 0,
- 0,
- 0,
- 0,
- 0.0,
- 0.0,
- 0,
- server,
- submittedVms,
- queuedVms,
- runningVms,
- finishedVms,
- duration
- )
- }
-
- logger.info("Host ${server.uid} changed state ${server.state} [${simulationContext.clock.millis()}]")
-
- lastServerStates[server] = Pair(server.state, simulationContext.clock.millis())
- }
-
- override suspend fun reportHostSlice(
- time: Long,
- requestedBurst: Long,
- grantedBurst: Long,
- overcommissionedBurst: Long,
- interferedBurst: Long,
- cpuUsage: Double,
- cpuDemand: Double,
- numberOfDeployedImages: Int,
- hostServer: Server,
- submittedVms: Long,
- queuedVms: Long,
- runningVms: Long,
- finishedVms: Long,
- duration: Long
- ) {
- // Assume for now that the host is not virtualized and measure the current power draw
- val driver = hostServer.services[BareMetalDriver.Key]
- val usage = driver.usage.first()
- val powerDraw = driver.powerDraw.first()
-
- queue.put(
- Report(
- time,
- duration,
- requestedBurst,
- grantedBurst,
- overcommissionedBurst,
- interferedBurst,
- cpuUsage,
- cpuDemand,
- numberOfDeployedImages,
- hostServer,
- usage,
- powerDraw,
- submittedVms,
- queuedVms,
- runningVms,
- finishedVms
- )
- )
- }
-
- override fun close() {
- // Busy loop to wait for writer thread to finish
- stop.set(true)
- writerThread.join()
- }
-
- data class Report(
- val time: Long,
- val duration: Long,
- val requestedBurst: Long,
- val grantedBurst: Long,
- val overcommissionedBurst: Long,
- val interferedBurst: Long,
- val cpuUsage: Double,
- val cpuDemand: Double,
- val numberOfDeployedImages: Int,
- val hostServer: Server,
- val hostUsage: Double,
- val powerDraw: Double,
- val submittedVms: Long,
- val queuedVms: Long,
- val runningVms: Long,
- val finishedVms: Long
- )
-}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Experiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Experiment.kt
new file mode 100644
index 00000000..f3ac2554
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Experiment.kt
@@ -0,0 +1,78 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.experiment
+
+import com.atlarge.opendc.experiments.sc20.runner.ContainerExperimentDescriptor
+import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor
+import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionContext
+import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionListener
+import com.atlarge.opendc.experiments.sc20.telemetry.RunEvent
+import com.atlarge.opendc.experiments.sc20.telemetry.parquet.ParquetRunEventWriter
+import com.atlarge.opendc.format.trace.PerformanceInterferenceModelReader
+import java.io.File
+
+/**
+ * The global configuration of the experiment.
+ *
+ * @param environments The path to the topologies directory.
+ * @param traces The path to the traces directory.
+ * @param output The output directory.
+ * @param performanceInterferenceModel The optional performance interference model that has been specified.
+ * @param vmPlacements Original VM placement in the trace.
+ * @param bufferSize The buffer size of the event reporters.
+ */
+public abstract class Experiment(
+ val environments: File,
+ val traces: File,
+ val output: File,
+ val performanceInterferenceModel: PerformanceInterferenceModelReader?,
+ val vmPlacements: Map<String, String>,
+ val bufferSize: Int
+) : ContainerExperimentDescriptor() {
+ override val parent: ExperimentDescriptor? = null
+
+ override suspend fun invoke(context: ExperimentExecutionContext) {
+ val writer = ParquetRunEventWriter(File(output, "experiments.parquet"), bufferSize)
+ try {
+ val listener = object : ExperimentExecutionListener by context.listener {
+ override fun descriptorRegistered(descriptor: ExperimentDescriptor) {
+ if (descriptor is Run) {
+ writer.write(RunEvent(descriptor, System.currentTimeMillis()))
+ }
+
+ context.listener.descriptorRegistered(descriptor)
+ }
+ }
+
+ val newContext = object : ExperimentExecutionContext by context {
+ override val listener: ExperimentExecutionListener = listener
+ }
+
+ super.invoke(newContext)
+ } finally {
+ writer.close()
+ }
+ }
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
index e37dea8b..83952d43 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
@@ -22,7 +22,7 @@
* SOFTWARE.
*/
-package com.atlarge.opendc.experiments.sc20
+package com.atlarge.opendc.experiments.sc20.experiment
import com.atlarge.odcsim.Domain
import com.atlarge.odcsim.simulationContext
@@ -31,14 +31,18 @@ import com.atlarge.opendc.compute.core.ServerEvent
import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
import com.atlarge.opendc.compute.core.workload.VmWorkload
import com.atlarge.opendc.compute.metal.NODE_CLUSTER
+import com.atlarge.opendc.compute.metal.driver.BareMetalDriver
import com.atlarge.opendc.compute.metal.service.ProvisioningService
import com.atlarge.opendc.compute.virt.HypervisorEvent
import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver
import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService
+import com.atlarge.opendc.compute.virt.service.VirtProvisioningEvent
import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy
import com.atlarge.opendc.core.failure.CorrelatedFaultInjector
import com.atlarge.opendc.core.failure.FailureDomain
import com.atlarge.opendc.core.failure.FaultInjector
+import com.atlarge.opendc.experiments.sc20.experiment.monitor.ExperimentMonitor
+import com.atlarge.opendc.experiments.sc20.trace.Sc20StreamingParquetTraceReader
import com.atlarge.opendc.format.environment.EnvironmentReader
import com.atlarge.opendc.format.trace.TraceReader
import kotlinx.coroutines.ExperimentalCoroutinesApi
@@ -66,7 +70,7 @@ private val logger = KotlinLogging.logger {}
*/
suspend fun createFailureDomain(
seed: Int,
- failureInterval: Int,
+ failureInterval: Double,
bareMetalProvisioner: ProvisioningService,
chan: Channel<Unit>
): Domain {
@@ -79,7 +83,13 @@ suspend fun createFailureDomain(
for (node in bareMetalProvisioner.nodes()) {
val cluster = node.metadata[NODE_CLUSTER] as String
val injector =
- injectors.getOrPut(cluster) { createFaultInjector(simulationContext.domain, random, failureInterval) }
+ injectors.getOrPut(cluster) {
+ createFaultInjector(
+ simulationContext.domain,
+ random,
+ failureInterval
+ )
+ }
injector.enqueue(node.metadata["driver"] as FailureDomain)
}
}
@@ -89,13 +99,13 @@ suspend fun createFailureDomain(
/**
* Obtain the [FaultInjector] to use for the experiments.
*/
-fun createFaultInjector(domain: Domain, random: Random, failureInterval: Int): FaultInjector {
+fun createFaultInjector(domain: Domain, random: Random, failureInterval: Double): FaultInjector {
// Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009
// GRID'5000
return CorrelatedFaultInjector(
domain,
- iatScale = ln(failureInterval.toDouble()), iatShape = 1.03, // Hours
- sizeScale = 1.88, sizeShape = 1.25,
+ iatScale = ln(failureInterval), iatShape = 1.03, // Hours
+ sizeScale = ln(2.0), sizeShape = ln(1.0), // Expect 2 machines, with variation of 1
dScale = 9.51, dShape = 3.21, // Minutes
random = random
)
@@ -104,8 +114,13 @@ fun createFaultInjector(domain: Domain, random: Random, failureInterval: Int): F
/**
* Create the trace reader from which the VM workloads are read.
*/
-fun createTraceReader(path: File, performanceInterferenceModel: PerformanceInterferenceModel, vms: List<String>, seed: Int): Sc20ParquetTraceReader {
- return Sc20ParquetTraceReader(path, performanceInterferenceModel, vms, Random(seed))
+fun createTraceReader(path: File, performanceInterferenceModel: PerformanceInterferenceModel, vms: List<String>, seed: Int): Sc20StreamingParquetTraceReader {
+ return Sc20StreamingParquetTraceReader(
+ path,
+ performanceInterferenceModel,
+ vms,
+ Random(seed)
+ )
}
/**
@@ -134,19 +149,21 @@ suspend fun createProvisioner(
* Attach the specified monitor to the VM provisioner.
*/
@OptIn(ExperimentalCoroutinesApi::class)
-suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: Sc20Reporter) {
+suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: ExperimentMonitor) {
val domain = simulationContext.domain
+ val clock = simulationContext.clock
val hypervisors = scheduler.drivers()
// Monitor hypervisor events
for (hypervisor in hypervisors) {
// TODO Do not expose VirtDriver directly but use Hypervisor class.
- reporter.reportHostStateChange(hypervisor, (hypervisor as SimpleVirtDriver).server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms)
+ monitor.reportHostStateChange(clock.millis(), hypervisor, (hypervisor as SimpleVirtDriver).server)
hypervisor.server.events
.onEach { event ->
+ val time = clock.millis()
when (event) {
is ServerEvent.StateChanged -> {
- reporter.reportHostStateChange(hypervisor, event.server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms)
+ monitor.reportHostStateChange(time, hypervisor, event.server)
}
}
}
@@ -154,7 +171,7 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: Sc
hypervisor.events
.onEach { event ->
when (event) {
- is HypervisorEvent.SliceFinished -> reporter.reportHostSlice(
+ is HypervisorEvent.SliceFinished -> monitor.reportHostSlice(
simulationContext.clock.millis(),
event.requestedBurst,
event.grantedBurst,
@@ -163,26 +180,36 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: Sc
event.cpuUsage,
event.cpuDemand,
event.numberOfDeployedImages,
- event.hostServer,
- scheduler.submittedVms,
- scheduler.queuedVms,
- scheduler.runningVms,
- scheduler.finishedVms
+ event.hostServer
)
}
}
.launchIn(domain)
+
+ val driver = hypervisor.server.services[BareMetalDriver.Key]
+ driver.powerDraw
+ .onEach { monitor.reportPowerConsumption(hypervisor.server, it) }
+ .launchIn(domain)
}
+
+ scheduler.events
+ .onEach { event ->
+ when (event) {
+ is VirtProvisioningEvent.MetricsAvailable ->
+ monitor.reportProvisionerMetrics(clock.millis(), event)
+ }
+ }
+ .launchIn(domain)
}
/**
* Process the trace.
*/
-suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, reporter: Sc20Reporter, vmPlacements: Map<String, String> = emptyMap()) {
+suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, monitor: ExperimentMonitor, vmPlacements: Map<String, String> = emptyMap()) {
val domain = simulationContext.domain
try {
- var submitted = 0L
+ var submitted = 0
val finished = Channel<Unit>(Channel.CONFLATED)
val hypervisors = TreeSet(scheduler.drivers().map { (it as SimpleVirtDriver).server.name })
@@ -215,8 +242,10 @@ suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtP
// Monitor server events
server.events
.onEach {
+ val time = simulationContext.clock.millis()
+
if (it is ServerEvent.StateChanged) {
- reporter.reportVmStateChange(it.server)
+ monitor.reportVmStateChange(time, it.server)
}
delay(1)
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolio.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolio.kt
new file mode 100644
index 00000000..6a40f5fb
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolio.kt
@@ -0,0 +1,90 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.experiment
+
+import com.atlarge.opendc.experiments.sc20.experiment.model.OperationalPhenomena
+import com.atlarge.opendc.experiments.sc20.experiment.model.Topology
+import com.atlarge.opendc.experiments.sc20.experiment.model.Workload
+import com.atlarge.opendc.experiments.sc20.runner.ContainerExperimentDescriptor
+
+/**
+ * A portfolio represents a collection of scenarios are tested.
+ */
+public abstract class Portfolio(
+ override val parent: Experiment,
+ val id: Int,
+ val name: String
+) : ContainerExperimentDescriptor() {
+ /**
+ * The topologies to consider.
+ */
+ protected abstract val topologies: List<Topology>
+
+ /**
+ * The workloads to consider.
+ */
+ protected abstract val workloads: List<Workload>
+
+ /**
+ * The operational phenomenas to consider.
+ */
+ protected abstract val operationalPhenomenas: List<OperationalPhenomena>
+
+ /**
+ * The allocation policies to consider.
+ */
+ protected abstract val allocationPolicies: List<String>
+
+ /**
+ * The number of repetitions to perform.
+ */
+ open val repetitions: Int = 32
+
+ /**
+ * Resolve the children of this container.
+ */
+ override val children: Sequence<Scenario> = sequence {
+ var id = 0
+ for (topology in topologies) {
+ for (workload in workloads) {
+ for (operationalPhenomena in operationalPhenomenas) {
+ for (allocationPolicy in allocationPolicies) {
+ yield(
+ Scenario(
+ this@Portfolio,
+ id++,
+ repetitions,
+ topology,
+ workload,
+ allocationPolicy,
+ operationalPhenomena
+ )
+ )
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt
new file mode 100644
index 00000000..362144ae
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt
@@ -0,0 +1,155 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.experiment
+
+import com.atlarge.opendc.experiments.sc20.experiment.model.OperationalPhenomena
+import com.atlarge.opendc.experiments.sc20.experiment.model.Topology
+import com.atlarge.opendc.experiments.sc20.experiment.model.Workload
+
+public class HorVerPortfolio(parent: Experiment, id: Int) : Portfolio(parent, id, "horizontal_vs_vertical") {
+ override val topologies = listOf(
+ Topology("base"),
+ Topology("rep-vol-hor-hom"),
+ Topology("rep-vol-hor-het"),
+ Topology("rep-vol-ver-hom"),
+ Topology("rep-vol-ver-het"),
+ Topology("exp-vol-hor-hom"),
+ Topology("exp-vol-hor-het"),
+ Topology("exp-vol-ver-hom"),
+ Topology("exp-vol-ver-het")
+ )
+
+ override val workloads = listOf(
+ Workload("solvinity", 0.1),
+ Workload("solvinity", 0.25),
+ Workload("solvinity", 0.5),
+ Workload("solvinity", 1.0)
+ )
+
+ override val operationalPhenomenas = listOf(
+ OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true)
+ )
+
+ override val allocationPolicies = listOf(
+ "active-servers"
+ )
+}
+
+public class MoreVelocityPortfolio(parent: Experiment, id: Int) : Portfolio(parent, id, "more_velocity") {
+ override val topologies = listOf(
+ Topology("base"),
+ Topology("rep-vel-ver-hom"),
+ Topology("rep-vel-ver-het"),
+ Topology("exp-vel-ver-hom"),
+ Topology("exp-vel-ver-het")
+ )
+
+ override val workloads = listOf(
+ Workload("solvinity", 0.1),
+ Workload("solvinity", 0.25),
+ Workload("solvinity", 0.5),
+ Workload("solvinity", 1.0)
+ )
+
+ override val operationalPhenomenas = listOf(
+ OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true)
+ )
+
+ override val allocationPolicies = listOf(
+ "active-servers"
+ )
+}
+
+public class MoreHpcPortfolio(parent: Experiment, id: Int) : Portfolio(parent, id, "more_hpc") {
+ override val topologies = listOf(
+ Topology("base"),
+ Topology("exp-vol-hor-hom"),
+ Topology("exp-vol-ver-hom"),
+ Topology("exp-vel-ver-hom")
+ )
+
+ override val workloads = listOf(
+ Workload("solvinity", 0.1),
+ Workload("solvinity", 0.25),
+ Workload("solvinity", 0.5),
+ Workload("solvinity", 1.0)
+ )
+
+ override val operationalPhenomenas = listOf(
+ OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true)
+ )
+
+ override val allocationPolicies = listOf(
+ "active-servers"
+ )
+}
+
+public class OperationalPhenomenaPortfolio(parent: Experiment, id: Int) : Portfolio(parent, id, "operational_phenomena") {
+ override val topologies = listOf(
+ Topology("base")
+ )
+
+ override val workloads = listOf(
+ Workload("solvinity", 0.1),
+ Workload("solvinity", 0.25),
+ Workload("solvinity", 0.5),
+ Workload("solvinity", 1.0)
+ )
+
+ override val operationalPhenomenas = listOf(
+ OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true),
+ OperationalPhenomena(failureFrequency = 0.0, hasInterference = true),
+ OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = false),
+ OperationalPhenomena(failureFrequency = 0.0, hasInterference = false)
+ )
+
+ override val allocationPolicies = listOf(
+ "mem",
+ "mem-inv",
+ "core-mem",
+ "core-mem-inv",
+ "active-servers",
+ "active-servers-inv",
+ "random"
+ )
+}
+
+public class TestPortfolio(parent: Experiment, id: Int) : Portfolio(parent, id, "test") {
+ override val repetitions: Int = 1
+
+ override val topologies: List<Topology> = listOf(
+ Topology("base")
+ )
+
+ override val workloads: List<Workload> = listOf(
+ Workload("solvinity", 1.0)
+ )
+
+ override val operationalPhenomenas: List<OperationalPhenomena> = listOf(
+ OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true)
+ )
+
+ override val allocationPolicies: List<String> = listOf("active-servers")
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt
new file mode 100644
index 00000000..fd3e29c8
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt
@@ -0,0 +1,146 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.experiment
+
+import com.atlarge.odcsim.SimulationEngineProvider
+import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy
+import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy
+import com.atlarge.opendc.compute.virt.service.allocation.NumberOfActiveServersAllocationPolicy
+import com.atlarge.opendc.compute.virt.service.allocation.ProvisionedCoresAllocationPolicy
+import com.atlarge.opendc.compute.virt.service.allocation.RandomAllocationPolicy
+import com.atlarge.opendc.compute.virt.service.allocation.ReplayAllocationPolicy
+import com.atlarge.opendc.experiments.sc20.experiment.monitor.ParquetExperimentMonitor
+import com.atlarge.opendc.experiments.sc20.runner.TrialExperimentDescriptor
+import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionContext
+import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader
+import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader
+import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
+import kotlinx.coroutines.cancel
+import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.launch
+import mu.KotlinLogging
+import java.io.File
+import java.util.ServiceLoader
+import kotlin.random.Random
+
+/**
+ * The logger for the experiment scenario.
+ */
+private val logger = KotlinLogging.logger {}
+
+/**
+ * The provider for the simulation engine to use.
+ */
+private val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
+
+/**
+ * An experiment run represent a single invocation of a trial and is used to distinguish between repetitions of the
+ * same set of parameters.
+ */
+public data class Run(override val parent: Scenario, val id: Int, val seed: Int) : TrialExperimentDescriptor() {
+ override suspend fun invoke(context: ExperimentExecutionContext) {
+ val experiment = parent.parent.parent
+ val system = provider("experiment-$id")
+ val root = system.newDomain("root")
+ val seeder = Random(seed)
+ val environment = Sc20ClusterEnvironmentReader(File(experiment.environments, "${parent.topology.name}.txt"))
+
+ val chan = Channel<Unit>(Channel.CONFLATED)
+ val allocationPolicy = when (parent.allocationPolicy) {
+ "mem" -> AvailableMemoryAllocationPolicy()
+ "mem-inv" -> AvailableMemoryAllocationPolicy(true)
+ "core-mem" -> AvailableCoreMemoryAllocationPolicy()
+ "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true)
+ "active-servers" -> NumberOfActiveServersAllocationPolicy()
+ "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true)
+ "provisioned-cores" -> ProvisionedCoresAllocationPolicy()
+ "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true)
+ "random" -> RandomAllocationPolicy(Random(seeder.nextInt()))
+ "replay" -> ReplayAllocationPolicy(emptyMap())
+ else -> throw IllegalArgumentException("Unknown policy ${parent.allocationPolicy}")
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ val rawTraceReaders = context.cache.computeIfAbsent("raw-trace-readers") { mutableMapOf<String, Sc20RawParquetTraceReader>() } as MutableMap<String, Sc20RawParquetTraceReader>
+ val raw = synchronized(rawTraceReaders) {
+ val name = parent.workload.name
+ rawTraceReaders.computeIfAbsent(name) {
+ logger.info { "Loading trace $name" }
+ Sc20RawParquetTraceReader(File(experiment.traces, name))
+ }
+ }
+ val performanceInterferenceModel = experiment.performanceInterferenceModel
+ ?.takeIf { parent.operationalPhenomena.hasInterference }
+ ?.construct(seeder) ?: emptyMap()
+ val trace = Sc20ParquetTraceReader(raw, performanceInterferenceModel, parent.workload, seed)
+
+ val monitor = ParquetExperimentMonitor(this)
+
+ root.launch {
+ val (bareMetalProvisioner, scheduler) = createProvisioner(
+ root,
+ environment,
+ allocationPolicy
+ )
+
+ val failureDomain = if (parent.operationalPhenomena.failureFrequency > 0) {
+ logger.debug("ENABLING failures")
+ createFailureDomain(
+ seeder.nextInt(),
+ parent.operationalPhenomena.failureFrequency,
+ bareMetalProvisioner,
+ chan
+ )
+ } else {
+ null
+ }
+
+ attachMonitor(scheduler, monitor)
+ processTrace(
+ trace,
+ scheduler,
+ chan,
+ monitor,
+ experiment.vmPlacements
+ )
+
+ logger.debug("SUBMIT=${scheduler.submittedVms}")
+ logger.debug("FAIL=${scheduler.unscheduledVms}")
+ logger.debug("QUEUED=${scheduler.queuedVms}")
+ logger.debug("RUNNING=${scheduler.runningVms}")
+ logger.debug("FINISHED=${scheduler.finishedVms}")
+
+ failureDomain?.cancel()
+ scheduler.terminate()
+ }
+
+ try {
+ system.run()
+ } finally {
+ system.terminate()
+ monitor.close()
+ }
+ }
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Scenario.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Scenario.kt
new file mode 100644
index 00000000..98bc7fc2
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Scenario.kt
@@ -0,0 +1,48 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.experiment
+
+import com.atlarge.opendc.experiments.sc20.experiment.model.OperationalPhenomena
+import com.atlarge.opendc.experiments.sc20.experiment.model.Topology
+import com.atlarge.opendc.experiments.sc20.experiment.model.Workload
+import com.atlarge.opendc.experiments.sc20.runner.ContainerExperimentDescriptor
+import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor
+
+/**
+ * A scenario represents a single point in the design space (a unique combination of parameters).
+ */
+public class Scenario(
+ override val parent: Portfolio,
+ val id: Int,
+ val repetitions: Int,
+ val topology: Topology,
+ val workload: Workload,
+ val allocationPolicy: String,
+ val operationalPhenomena: OperationalPhenomena
+) : ContainerExperimentDescriptor() {
+ override val children: Sequence<ExperimentDescriptor> = sequence {
+ repeat(repetitions) { i -> yield(Run(this@Scenario, i, i)) }
+ }
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/OperationalPhenomena.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/OperationalPhenomena.kt
new file mode 100644
index 00000000..af99df84
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/OperationalPhenomena.kt
@@ -0,0 +1,33 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.experiment.model
+
+/**
+ * Operation phenomena during experiments.
+ *
+ * @param failureFrequency The average time between failures in hours.
+ * @param hasInterference A flag to enable performance interference between VMs.
+ */
+public data class OperationalPhenomena(val failureFrequency: Double, val hasInterference: Boolean)
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Topology.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Topology.kt
new file mode 100644
index 00000000..3ed71e09
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Topology.kt
@@ -0,0 +1,30 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.experiment.model
+
+/**
+ * The datacenter topology on which we test the workload.
+ */
+public data class Topology(val name: String)
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Workload.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Workload.kt
new file mode 100644
index 00000000..2dbdf570
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Workload.kt
@@ -0,0 +1,30 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.experiment.model
+
+/**
+ * A workload that is considered for a scenario.
+ */
+public class Workload(val name: String, val fraction: Double)
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Reporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ExperimentMonitor.kt
index 84500417..1f674f00 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Reporter.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ExperimentMonitor.kt
@@ -22,34 +22,40 @@
* SOFTWARE.
*/
-package com.atlarge.opendc.experiments.sc20
+package com.atlarge.opendc.experiments.sc20.experiment.monitor
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.virt.driver.VirtDriver
+import com.atlarge.opendc.compute.virt.service.VirtProvisioningEvent
import java.io.Closeable
-interface Sc20Reporter : Closeable {
+/**
+ * A monitor watches the events of an experiment.
+ */
+interface ExperimentMonitor : Closeable {
/**
* This method is invoked when the state of a VM changes.
*/
- suspend fun reportVmStateChange(server: Server) {}
+ fun reportVmStateChange(time: Long, server: Server) {}
/**
* This method is invoked when the state of a host changes.
*/
- suspend fun reportHostStateChange(
+ fun reportHostStateChange(
+ time: Long,
driver: VirtDriver,
- server: Server,
- submittedVms: Long,
- queuedVms: Long,
- runningVms: Long,
- finishedVms: Long
+ server: Server
) {}
/**
+ * Report the power consumption of a host.
+ */
+ fun reportPowerConsumption(host: Server, draw: Double) {}
+
+ /**
* This method is invoked for a host for each slice that is finishes.
*/
- suspend fun reportHostSlice(
+ fun reportHostSlice(
time: Long,
requestedBurst: Long,
grantedBurst: Long,
@@ -59,10 +65,11 @@ interface Sc20Reporter : Closeable {
cpuDemand: Double,
numberOfDeployedImages: Int,
hostServer: Server,
- submittedVms: Long,
- queuedVms: Long,
- runningVms: Long,
- finishedVms: Long,
duration: Long = 5 * 60 * 1000L
) {}
+
+ /**
+ * This method is invoked for a provisioner event.
+ */
+ fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) {}
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt
new file mode 100644
index 00000000..33978aab
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt
@@ -0,0 +1,145 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.experiment.monitor
+
+import com.atlarge.opendc.compute.core.Server
+import com.atlarge.opendc.compute.core.ServerState
+import com.atlarge.opendc.compute.virt.driver.VirtDriver
+import com.atlarge.opendc.compute.virt.service.VirtProvisioningEvent
+import com.atlarge.opendc.experiments.sc20.experiment.Run
+import com.atlarge.opendc.experiments.sc20.telemetry.HostEvent
+import com.atlarge.opendc.experiments.sc20.telemetry.ProvisionerEvent
+import com.atlarge.opendc.experiments.sc20.telemetry.parquet.ParquetHostEventWriter
+import com.atlarge.opendc.experiments.sc20.telemetry.parquet.ParquetProvisionerEventWriter
+import mu.KotlinLogging
+import java.io.File
+
+/**
+ * The logger instance to use.
+ */
+private val logger = KotlinLogging.logger {}
+
+/**
+ * An [ExperimentMonitor] that logs the events to a Parquet file.
+ */
+class ParquetExperimentMonitor(val run: Run) : ExperimentMonitor {
+ private val partition = "portfolio_id=${run.parent.parent.id}/scenario_id=${run.parent.id}/run_id=${run.id}"
+ private val hostWriter = ParquetHostEventWriter(
+ File(run.parent.parent.parent.output, "host-metrics/$partition/data.parquet"),
+ run.parent.parent.parent.bufferSize
+ )
+ private val provisionerWriter = ParquetProvisionerEventWriter(
+ File(run.parent.parent.parent.output, "provisioner-metrics/$partition/data.parquet"),
+ run.parent.parent.parent.bufferSize
+ )
+ private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>()
+
+ override fun reportVmStateChange(time: Long, server: Server) {}
+
+ override fun reportHostStateChange(
+ time: Long,
+ driver: VirtDriver,
+ server: Server
+ ) {
+ logger.debug("Host ${server.uid} changed state ${server.state} [$time]")
+
+ val lastServerState = lastServerStates[server]
+ if (server.state == ServerState.SHUTOFF && lastServerState != null) {
+ val duration = time - lastServerState.second
+ reportHostSlice(
+ time,
+ 0,
+ 0,
+ 0,
+ 0,
+ 0.0,
+ 0.0,
+ 0,
+ server,
+ duration
+ )
+
+ lastServerStates.remove(server)
+ lastPowerConsumption.remove(server)
+ } else {
+ lastServerStates[server] = Pair(server.state, time)
+ }
+ }
+
+ private val lastPowerConsumption = mutableMapOf<Server, Double>()
+
+ override fun reportPowerConsumption(host: Server, draw: Double) {
+ lastPowerConsumption[host] = draw
+ }
+
+ override fun reportHostSlice(
+ time: Long,
+ requestedBurst: Long,
+ grantedBurst: Long,
+ overcommissionedBurst: Long,
+ interferedBurst: Long,
+ cpuUsage: Double,
+ cpuDemand: Double,
+ numberOfDeployedImages: Int,
+ hostServer: Server,
+ duration: Long
+ ) {
+ hostWriter.write(
+ HostEvent(
+ time,
+ duration,
+ hostServer,
+ numberOfDeployedImages,
+ requestedBurst,
+ grantedBurst,
+ overcommissionedBurst,
+ interferedBurst,
+ cpuUsage,
+ cpuDemand,
+ lastPowerConsumption[hostServer] ?: 200.0
+ )
+ )
+ }
+
+ override fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) {
+ provisionerWriter.write(
+ ProvisionerEvent(
+ time,
+ event.totalHostCount,
+ event.availableHostCount,
+ event.totalVmCount,
+ event.activeVmCount,
+ event.inactiveVmCount,
+ event.waitingVmCount,
+ event.failedVmCount
+ )
+ )
+ }
+
+ override fun close() {
+ hostWriter.close()
+ provisionerWriter.close()
+ }
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt
new file mode 100644
index 00000000..f59402d5
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt
@@ -0,0 +1,75 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.reporter
+
+import com.atlarge.opendc.experiments.sc20.experiment.Run
+import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor
+import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionListener
+import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionResult
+import me.tongfei.progressbar.ProgressBar
+import me.tongfei.progressbar.ProgressBarBuilder
+
+/**
+ * A reporter that reports the experiment progress to the console.
+ */
+public class ConsoleExperimentReporter : ExperimentExecutionListener {
+ /**
+ * The active [Run]s.
+ */
+ private val runs: MutableSet<Run> = mutableSetOf()
+
+ /**
+ * The total number of runs.
+ */
+ private var total = 0
+
+ /**
+ * The progress bar to keep track of the progress.
+ */
+ private val pb: ProgressBar = ProgressBarBuilder()
+ .setTaskName("")
+ .setInitialMax(1)
+ .build()
+
+ override fun descriptorRegistered(descriptor: ExperimentDescriptor) {
+ if (descriptor is Run) {
+ runs += descriptor
+ pb.maxHint((++total).toLong())
+ }
+ }
+
+ override fun executionFinished(descriptor: ExperimentDescriptor, result: ExperimentExecutionResult) {
+ if (descriptor is Run) {
+ runs -= descriptor
+
+ pb.stepTo(total - runs.size.toLong())
+ if (runs.isEmpty()) {
+ pb.close()
+ }
+ }
+ }
+
+ override fun executionStarted(descriptor: ExperimentDescriptor) {}
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ContainerExperimentDescriptor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ContainerExperimentDescriptor.kt
new file mode 100644
index 00000000..dac32586
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ContainerExperimentDescriptor.kt
@@ -0,0 +1,68 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.runner
+
+import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionContext
+import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionResult
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.supervisorScope
+
+/**
+ * An abstract [ExperimentDescriptor] specifically for containers.
+ */
+public abstract class ContainerExperimentDescriptor : ExperimentDescriptor() {
+ /**
+ * The child descriptors of this container.
+ */
+ public abstract val children: Sequence<ExperimentDescriptor>
+
+ override val type: Type = Type.CONTAINER
+
+ override suspend fun invoke(context: ExperimentExecutionContext) {
+ val materializedChildren = children.toList()
+ for (child in materializedChildren) {
+ context.listener.descriptorRegistered(child)
+ }
+
+ supervisorScope {
+ for (child in materializedChildren) {
+ if (child.isTrial) {
+ launch {
+ val worker = context.scheduler.allocate()
+ context.listener.executionStarted(child)
+ try {
+ worker(child, context)
+ context.listener.executionFinished(child, ExperimentExecutionResult.Success)
+ } catch (e: Throwable) {
+ context.listener.executionFinished(child, ExperimentExecutionResult.Failed(e))
+ }
+ }
+ } else {
+ launch { child(context) }
+ }
+ }
+ }
+ }
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ExperimentDescriptor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ExperimentDescriptor.kt
new file mode 100644
index 00000000..64b6b767
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ExperimentDescriptor.kt
@@ -0,0 +1,81 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.runner
+
+import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionContext
+import java.io.Serializable
+
+/**
+ * An immutable description of an experiment in the **odcsim* simulation framework, which may be a single atomic trial
+ * or a composition of multiple trials.
+ *
+ * This class represents a dynamic tree-like structure where the children of the nodes are not known at instantiation
+ * since they might be generated dynamically.
+ */
+public abstract class ExperimentDescriptor : Serializable {
+ /**
+ * The parent of this descriptor, or `null` if it has no parent.
+ */
+ public abstract val parent: ExperimentDescriptor?
+
+ /**
+ * The type of descriptor.
+ */
+ abstract val type: Type
+
+ /**
+ * A flag to indicate that this descriptor is a root descriptor.
+ */
+ public open val isRoot: Boolean
+ get() = parent == null
+
+ /**
+ * A flag to indicate that this descriptor describes an experiment trial.
+ */
+ val isTrial: Boolean
+ get() = type == Type.TRIAL
+
+ /**
+ * Execute this [ExperimentDescriptor].
+ *
+ * @param context The context to execute the descriptor in.
+ */
+ public abstract suspend operator fun invoke(context: ExperimentExecutionContext)
+
+ /**
+ * The types of experiment descriptors.
+ */
+ enum class Type {
+ /**
+ * A composition of multiple experiment descriptions whose invocation happens on a single thread.
+ */
+ CONTAINER,
+
+ /**
+ * An invocation of a single scenario of an experiment whose invocation may happen on different threads.
+ */
+ TRIAL
+ }
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ExperimentRunner.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ExperimentRunner.kt
new file mode 100644
index 00000000..77f970fe
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/ExperimentRunner.kt
@@ -0,0 +1,51 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.runner
+
+import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionListener
+
+/**
+ * An [ExperimentRunner] facilitates discovery and execution of experiments.
+ */
+public interface ExperimentRunner {
+ /**
+ * The unique identifier of this runner.
+ */
+ val id: String
+
+ /**
+ * The version of this runner.
+ */
+ val version: String?
+ get() = null
+
+ /**
+ * Execute the specified experiment represented as [ExperimentDescriptor].
+ *
+ * @param root The experiment to execute.
+ * @param listener The listener to report events to.
+ */
+ public fun execute(root: ExperimentDescriptor, listener: ExperimentExecutionListener)
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/TrialExperimentDescriptor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/TrialExperimentDescriptor.kt
new file mode 100644
index 00000000..cf05416a
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/TrialExperimentDescriptor.kt
@@ -0,0 +1,32 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.runner
+
+/**
+ * An abstract [ExperimentDescriptor] specifically for trials.
+ */
+public abstract class TrialExperimentDescriptor : ExperimentDescriptor() {
+ override val type: Type = Type.TRIAL
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionContext.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionContext.kt
new file mode 100644
index 00000000..9a04c491
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionContext.kt
@@ -0,0 +1,45 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.runner.execution
+
+/**
+ * The execution context of an experiment.
+ */
+public interface ExperimentExecutionContext {
+ /**
+ * The execution listener to use.
+ */
+ public val listener: ExperimentExecutionListener
+
+ /**
+ * The experiment scheduler to use.
+ */
+ public val scheduler: ExperimentScheduler
+
+ /**
+ * A cache for objects within a single runner.
+ */
+ public val cache: MutableMap<Any?, Any?>
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionListener.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionListener.kt
new file mode 100644
index 00000000..f6df0524
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionListener.kt
@@ -0,0 +1,48 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.runner.execution
+
+import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor
+
+/**
+ * Listener to be notified of experiment execution events by experiment runners.
+ */
+interface ExperimentExecutionListener {
+ /**
+ * A method that is invoked when a new [ExperimentDescriptor] is registered.
+ */
+ fun descriptorRegistered(descriptor: ExperimentDescriptor)
+
+ /**
+ * A method that is invoked when when the execution of a leaf or subtree of the experiment tree has finished,
+ * regardless of the outcome.
+ */
+ fun executionFinished(descriptor: ExperimentDescriptor, result: ExperimentExecutionResult)
+
+ /**
+ * A method that is invoked when the execution of a leaf or subtree of the experiment tree is about to be started.
+ */
+ fun executionStarted(descriptor: ExperimentDescriptor)
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionResult.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionResult.kt
new file mode 100644
index 00000000..057e1f92
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentExecutionResult.kt
@@ -0,0 +1,42 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.runner.execution
+
+import java.io.Serializable
+
+/**
+ * The result of executing an experiment.
+ */
+public sealed class ExperimentExecutionResult : Serializable {
+ /**
+ * The experiment executed successfully
+ */
+ public object Success : ExperimentExecutionResult()
+
+ /**
+ * The experiment failed during execution.
+ */
+ public data class Failed(val throwable: Throwable) : ExperimentExecutionResult()
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentScheduler.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentScheduler.kt
new file mode 100644
index 00000000..0346a7f8
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentScheduler.kt
@@ -0,0 +1,59 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.runner.execution
+
+import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor
+import java.io.Closeable
+
+/**
+ * A interface for scheduling the execution of experiment trials over compute resources (threads/containers/vms)
+ */
+interface ExperimentScheduler : Closeable {
+ /**
+ * Allocate a [Worker] for executing an experiment trial. This method may suspend in case no resources are directly
+ * available at the moment.
+ *
+ * @return The available worker.
+ */
+ suspend fun allocate(): ExperimentScheduler.Worker
+
+ /**
+ * An isolated worker of an [ExperimentScheduler] that is responsible for executing a single experiment trial.
+ */
+ interface Worker {
+ /**
+ * Dispatch the specified [ExperimentDescriptor] to execute some time in the future and return the results of
+ * the trial.
+ *
+ * @param descriptor The descriptor to execute.
+ * @param context The context to execute the descriptor in.
+ * @return The results of the experiment trial.
+ */
+ suspend operator fun invoke(
+ descriptor: ExperimentDescriptor,
+ context: ExperimentExecutionContext
+ ): ExperimentExecutionResult
+ }
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt
new file mode 100644
index 00000000..31632b8c
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt
@@ -0,0 +1,85 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.runner.execution
+
+import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor
+import kotlinx.coroutines.asCoroutineDispatcher
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.supervisorScope
+import kotlinx.coroutines.sync.Semaphore
+import kotlinx.coroutines.withContext
+import java.util.concurrent.Executors
+
+/**
+ * An [ExperimentScheduler] that runs experiments using a local thread pool.
+ *
+ * @param parallelism The maximum amount of parallel workers (default is the number of available processors).
+ */
+class ThreadPoolExperimentScheduler(parallelism: Int = Runtime.getRuntime().availableProcessors() + 1) : ExperimentScheduler {
+ private val dispatcher = Executors.newCachedThreadPool().asCoroutineDispatcher()
+ private val tickets = Semaphore(parallelism)
+
+ override suspend fun allocate(): ExperimentScheduler.Worker {
+ tickets.acquire()
+ return object : ExperimentScheduler.Worker {
+ override suspend fun invoke(
+ descriptor: ExperimentDescriptor,
+ context: ExperimentExecutionContext
+ ): ExperimentExecutionResult = supervisorScope {
+ val listener =
+ object : ExperimentExecutionListener {
+ override fun descriptorRegistered(descriptor: ExperimentDescriptor) {
+ launch { context.listener.descriptorRegistered(descriptor) }
+ }
+
+ override fun executionFinished(descriptor: ExperimentDescriptor, result: ExperimentExecutionResult) {
+ launch { context.listener.executionFinished(descriptor, result) }
+ }
+
+ override fun executionStarted(descriptor: ExperimentDescriptor) {
+ launch { context.listener.executionStarted(descriptor) }
+ }
+ }
+
+ val newContext = object : ExperimentExecutionContext by context {
+ override val listener: ExperimentExecutionListener = listener
+ }
+
+ try {
+ withContext(dispatcher) {
+ descriptor(newContext)
+ ExperimentExecutionResult.Success
+ }
+ } catch (e: Throwable) {
+ ExperimentExecutionResult.Failed(e)
+ } finally {
+ tickets.release()
+ }
+ }
+ }
+ }
+
+ override fun close() = dispatcher.close()
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt
new file mode 100644
index 00000000..3b80276f
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt
@@ -0,0 +1,62 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.runner.internal
+
+import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor
+import com.atlarge.opendc.experiments.sc20.runner.ExperimentRunner
+import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionContext
+import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionListener
+import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionResult
+import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentScheduler
+import kotlinx.coroutines.runBlocking
+import java.util.concurrent.ConcurrentHashMap
+
+/**
+ * The default implementation of the [ExperimentRunner] interface.
+ *
+ * @param scheduler The scheduler to use.
+ */
+public class DefaultExperimentRunner(val scheduler: ExperimentScheduler) : ExperimentRunner {
+ override val id: String = "default"
+
+ override val version: String? = "1.0"
+
+ override fun execute(root: ExperimentDescriptor, listener: ExperimentExecutionListener) = runBlocking {
+ val context = object : ExperimentExecutionContext {
+ override val listener: ExperimentExecutionListener = listener
+ override val scheduler: ExperimentScheduler = this@DefaultExperimentRunner.scheduler
+ override val cache: MutableMap<Any?, Any?> = ConcurrentHashMap()
+ }
+
+ listener.descriptorRegistered(root)
+ context.listener.executionStarted(root)
+ try {
+ root(context)
+ context.listener.executionFinished(root, ExperimentExecutionResult.Success)
+ } catch (e: Throwable) {
+ context.listener.executionFinished(root, ExperimentExecutionResult.Failed(e))
+ }
+ }
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/Event.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/Event.kt
new file mode 100644
index 00000000..c1e14e2a
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/Event.kt
@@ -0,0 +1,35 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.telemetry
+
+/**
+ * An event that occurs within the system.
+ */
+public abstract class Event(val name: String) {
+ /**
+ * The time of occurrence of this event.
+ */
+ public abstract val timestamp: Long
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/HostEvent.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/HostEvent.kt
new file mode 100644
index 00000000..8e91bca2
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/HostEvent.kt
@@ -0,0 +1,44 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.telemetry
+
+import com.atlarge.opendc.compute.core.Server
+
+/**
+ * A periodic report of the host machine metrics.
+ */
+data class HostEvent(
+ override val timestamp: Long,
+ val duration: Long,
+ val host: Server,
+ val vmCount: Int,
+ val requestedBurst: Long,
+ val grantedBurst: Long,
+ val overcommissionedBurst: Long,
+ val interferedBurst: Long,
+ val cpuUsage: Double,
+ val cpuDemand: Double,
+ val powerDraw: Double
+) : Event("host-metrics")
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/ProvisionerEvent.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/ProvisionerEvent.kt
new file mode 100644
index 00000000..df619632
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/ProvisionerEvent.kt
@@ -0,0 +1,39 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.telemetry
+
+/**
+ * A periodic report of the provisioner's metrics.
+ */
+data class ProvisionerEvent(
+ override val timestamp: Long,
+ val totalHostCount: Int,
+ val availableHostCount: Int,
+ val totalVmCount: Int,
+ val activeVmCount: Int,
+ val inactiveVmCount: Int,
+ val waitingVmCount: Int,
+ val failedVmCount: Int
+) : Event("provisioner-metrics")
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/RunEvent.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/RunEvent.kt
new file mode 100644
index 00000000..497d2c3f
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/RunEvent.kt
@@ -0,0 +1,35 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.telemetry
+
+import com.atlarge.opendc.experiments.sc20.experiment.Run
+
+/**
+ * A periodic report of the host machine metrics.
+ */
+data class RunEvent(
+ val run: Run,
+ override val timestamp: Long
+) : Event("run")
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/VmEvent.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/VmEvent.kt
new file mode 100644
index 00000000..7289fb21
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/VmEvent.kt
@@ -0,0 +1,43 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.telemetry
+
+import com.atlarge.opendc.compute.core.Server
+
+/**
+ * A periodic report of a virtual machine's metrics.
+ */
+data class VmEvent(
+ override val timestamp: Long,
+ val duration: Long,
+ val vm: Server,
+ val host: Server,
+ val requestedBurst: Long,
+ val grantedBurst: Long,
+ val overcommissionedBurst: Long,
+ val interferedBurst: Long,
+ val cpuUsage: Double,
+ val cpuDemand: Double
+) : Event("vm-metrics")
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt
new file mode 100644
index 00000000..a69bd4b2
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt
@@ -0,0 +1,121 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.telemetry.parquet
+
+import com.atlarge.opendc.experiments.sc20.telemetry.Event
+import mu.KotlinLogging
+import org.apache.avro.Schema
+import org.apache.avro.generic.GenericData
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.parquet.hadoop.metadata.CompressionCodecName
+import java.io.Closeable
+import java.io.File
+import java.util.concurrent.ArrayBlockingQueue
+import java.util.concurrent.BlockingQueue
+import kotlin.concurrent.thread
+
+/**
+ * The logging instance to use.
+ */
+private val logger = KotlinLogging.logger {}
+
+/**
+ * A writer that writes events in Parquet format.
+ */
+public open class ParquetEventWriter<in T : Event>(
+ private val path: File,
+ private val schema: Schema,
+ private val converter: (T, GenericData.Record) -> Unit,
+ private val bufferSize: Int = 4096
+) : Runnable, Closeable {
+ /**
+ * The queue of commands to process.
+ */
+ private val queue: BlockingQueue<Action> = ArrayBlockingQueue(bufferSize)
+
+ /**
+ * The thread that is responsible for writing the Parquet records.
+ */
+ private val writerThread = thread(start = true, name = "parquet-writer") { run() }
+
+ /**
+ * Write the specified metrics to the database.
+ */
+ public fun write(event: T) {
+ queue.put(Action.Write(event))
+ }
+
+ /**
+ * Signal the writer to stop.
+ */
+ public override fun close() {
+ queue.put(Action.Stop)
+ writerThread.join()
+ }
+
+ /**
+ * Start the writer thread.
+ */
+ override fun run() {
+ val writer = AvroParquetWriter.builder<GenericData.Record>(Path(path.absolutePath))
+ .withSchema(schema)
+ .withCompressionCodec(CompressionCodecName.SNAPPY)
+ .withPageSize(4 * 1024 * 1024) // For compression
+ .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
+ .build()
+
+ try {
+ loop@ while (true) {
+ val action = queue.take()
+ when (action) {
+ is Action.Stop -> break@loop
+ is Action.Write<*> -> {
+ val record = GenericData.Record(schema)
+ @Suppress("UNCHECKED_CAST")
+ converter(action.event as T, record)
+ writer.write(record)
+ }
+ }
+ }
+ } catch (e: Throwable) {
+ logger.error("Writer failed", e)
+ } finally {
+ writer.close()
+ }
+ }
+
+ sealed class Action {
+ /**
+ * A poison pill that will stop the writer thread.
+ */
+ object Stop : Action()
+
+ /**
+ * Write the specified metrics to the database.
+ */
+ data class Write<out T : Event>(val event: T) : Action()
+ }
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt
new file mode 100644
index 00000000..523897b0
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt
@@ -0,0 +1,81 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.telemetry.parquet
+
+import com.atlarge.opendc.experiments.sc20.telemetry.HostEvent
+import org.apache.avro.Schema
+import org.apache.avro.SchemaBuilder
+import org.apache.avro.generic.GenericData
+import java.io.File
+
+/**
+ * A Parquet event writer for [HostEvent]s.
+ */
+public class ParquetHostEventWriter(path: File, bufferSize: Int) :
+ ParquetEventWriter<HostEvent>(path, schema, convert, bufferSize) {
+
+ override fun toString(): String = "host-writer"
+
+ companion object {
+ val convert: (HostEvent, GenericData.Record) -> Unit = { event, record ->
+ // record.put("portfolio_id", event.run.parent.parent.id)
+ // record.put("scenario_id", event.run.parent.id)
+ // record.put("run_id", event.run.id)
+ record.put("host_id", event.host.name)
+ record.put("state", event.host.state.name)
+ record.put("timestamp", event.timestamp)
+ record.put("duration", event.duration)
+ record.put("vm_count", event.vmCount)
+ record.put("requested_burst", event.requestedBurst)
+ record.put("granted_burst", event.grantedBurst)
+ record.put("overcommissioned_burst", event.overcommissionedBurst)
+ record.put("interfered_burst", event.interferedBurst)
+ record.put("cpu_usage", event.cpuUsage)
+ record.put("cpu_demand", event.cpuDemand)
+ record.put("power_draw", event.powerDraw * (1.0 / 12))
+ }
+
+ val schema: Schema = SchemaBuilder
+ .record("host_metrics")
+ .namespace("com.atlarge.opendc.experiments.sc20")
+ .fields()
+ // .name("portfolio_id").type().intType().noDefault()
+ // .name("scenario_id").type().intType().noDefault()
+ // .name("run_id").type().intType().noDefault()
+ .name("timestamp").type().longType().noDefault()
+ .name("duration").type().longType().noDefault()
+ .name("host_id").type().stringType().noDefault()
+ .name("state").type().stringType().noDefault()
+ .name("vm_count").type().intType().noDefault()
+ .name("requested_burst").type().longType().noDefault()
+ .name("granted_burst").type().longType().noDefault()
+ .name("overcommissioned_burst").type().longType().noDefault()
+ .name("interfered_burst").type().longType().noDefault()
+ .name("cpu_usage").type().doubleType().noDefault()
+ .name("cpu_demand").type().doubleType().noDefault()
+ .name("power_draw").type().doubleType().noDefault()
+ .endRecord()
+ }
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt
new file mode 100644
index 00000000..1f3b0472
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt
@@ -0,0 +1,67 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.telemetry.parquet
+
+import com.atlarge.opendc.experiments.sc20.telemetry.ProvisionerEvent
+import org.apache.avro.Schema
+import org.apache.avro.SchemaBuilder
+import org.apache.avro.generic.GenericData
+import java.io.File
+
+/**
+ * A Parquet event writer for [ProvisionerEvent]s.
+ */
+public class ParquetProvisionerEventWriter(path: File, bufferSize: Int) :
+ ParquetEventWriter<ProvisionerEvent>(path, schema, convert, bufferSize) {
+
+ override fun toString(): String = "provisioner-writer"
+
+ companion object {
+ val convert: (ProvisionerEvent, GenericData.Record) -> Unit = { event, record ->
+ record.put("timestamp", event.timestamp)
+ record.put("host_total_count", event.totalHostCount)
+ record.put("host_available_count", event.availableHostCount)
+ record.put("vm_total_count", event.totalVmCount)
+ record.put("vm_active_count", event.activeVmCount)
+ record.put("vm_inactive_count", event.inactiveVmCount)
+ record.put("vm_waiting_count", event.waitingVmCount)
+ record.put("vm_failed_count", event.failedVmCount)
+ }
+
+ val schema: Schema = SchemaBuilder
+ .record("provisioner_metrics")
+ .namespace("com.atlarge.opendc.experiments.sc20")
+ .fields()
+ .name("timestamp").type().longType().noDefault()
+ .name("host_total_count").type().intType().noDefault()
+ .name("host_available_count").type().intType().noDefault()
+ .name("vm_total_count").type().intType().noDefault()
+ .name("vm_active_count").type().intType().noDefault()
+ .name("vm_inactive_count").type().intType().noDefault()
+ .name("vm_waiting_count").type().intType().noDefault()
+ .name("vm_failed_count").type().intType().noDefault()
+ .endRecord()
+ }
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt
new file mode 100644
index 00000000..1549b8d2
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt
@@ -0,0 +1,78 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.telemetry.parquet
+
+import com.atlarge.opendc.experiments.sc20.telemetry.RunEvent
+import org.apache.avro.Schema
+import org.apache.avro.SchemaBuilder
+import org.apache.avro.generic.GenericData
+import java.io.File
+
+/**
+ * A Parquet event writer for [RunEvent]s.
+ */
+public class ParquetRunEventWriter(path: File, bufferSize: Int) :
+ ParquetEventWriter<RunEvent>(path, schema, convert, bufferSize) {
+
+ override fun toString(): String = "run-writer"
+
+ companion object {
+ val convert: (RunEvent, GenericData.Record) -> Unit = { event, record ->
+ val run = event.run
+ val scenario = run.parent
+ val portfolio = scenario.parent
+ record.put("portfolio_id", portfolio.id)
+ record.put("portfolio_name", portfolio.name)
+ record.put("scenario_id", scenario.id)
+ record.put("run_id", run.id)
+ record.put("repetitions", scenario.repetitions)
+ record.put("topology", scenario.topology.name)
+ record.put("workload_name", scenario.workload.name)
+ record.put("workload_fraction", scenario.workload.fraction)
+ record.put("allocation_policy", scenario.allocationPolicy)
+ record.put("failure_frequency", scenario.operationalPhenomena.failureFrequency)
+ record.put("interference", scenario.operationalPhenomena.hasInterference)
+ record.put("seed", run.seed)
+ }
+
+ val schema: Schema = SchemaBuilder
+ .record("runs")
+ .namespace("com.atlarge.opendc.experiments.sc20")
+ .fields()
+ .name("portfolio_id").type().intType().noDefault()
+ .name("portfolio_name").type().stringType().noDefault()
+ .name("scenario_id").type().intType().noDefault()
+ .name("run_id").type().intType().noDefault()
+ .name("repetitions").type().intType().noDefault()
+ .name("topology").type().stringType().noDefault()
+ .name("workload_name").type().stringType().noDefault()
+ .name("workload_fraction").type().doubleType().noDefault()
+ .name("allocation_policy").type().stringType().noDefault()
+ .name("failure_frequency").type().doubleType().noDefault()
+ .name("interference").type().booleanType().noDefault()
+ .name("seed").type().intType().noDefault()
+ .endRecord()
+ }
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt
new file mode 100644
index 00000000..ad50bf18
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt
@@ -0,0 +1,88 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.trace
+
+import com.atlarge.opendc.compute.core.image.VmImage
+import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL
+import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
+import com.atlarge.opendc.compute.core.workload.VmWorkload
+import com.atlarge.opendc.experiments.sc20.experiment.model.Workload
+import com.atlarge.opendc.format.trace.TraceEntry
+import com.atlarge.opendc.format.trace.TraceReader
+import java.util.TreeSet
+
+/**
+ * A [TraceReader] for the internal VM workload trace format.
+ *
+ * @param reader The internal trace reader to use.
+ * @param performanceInterferenceModel The performance model covering the workload in the VM trace.
+ * @param run The run to which this reader belongs.
+ */
+@OptIn(ExperimentalStdlibApi::class)
+class Sc20ParquetTraceReader(
+ raw: Sc20RawParquetTraceReader,
+ performanceInterferenceModel: Map<String, PerformanceInterferenceModel>,
+ workload: Workload,
+ seed: Int
+) : TraceReader<VmWorkload> {
+ /**
+ * The iterator over the actual trace.
+ */
+ private val iterator: Iterator<TraceEntry<VmWorkload>> =
+ raw.read()
+ .run { sampleWorkload(this, workload, seed) }
+ .run {
+ // Apply performance interference model
+ if (performanceInterferenceModel.isEmpty())
+ this
+ else {
+ map { entry ->
+ val image = entry.workload.image
+ val id = image.name
+ val relevantPerformanceInterferenceModelItems =
+ performanceInterferenceModel[id] ?: PerformanceInterferenceModel(TreeSet())
+
+ val newImage =
+ VmImage(
+ image.uid,
+ image.name,
+ mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems),
+ image.flopsHistory,
+ image.maxCores,
+ image.requiredMemory
+ )
+ val newWorkload = entry.workload.copy(image = newImage)
+ Sc20RawParquetTraceReader.TraceEntryImpl(entry.submissionTime, newWorkload)
+ }
+ }
+ }
+ .iterator()
+
+ override fun hasNext(): Boolean = iterator.hasNext()
+
+ override fun next(): TraceEntry<VmWorkload> = iterator.next()
+
+ override fun close() {}
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt
new file mode 100644
index 00000000..3b480d33
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt
@@ -0,0 +1,167 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.trace
+
+import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment
+import com.atlarge.opendc.compute.core.image.VmImage
+import com.atlarge.opendc.compute.core.workload.VmWorkload
+import com.atlarge.opendc.core.User
+import com.atlarge.opendc.format.trace.TraceEntry
+import com.atlarge.opendc.format.trace.TraceReader
+import mu.KotlinLogging
+import org.apache.avro.generic.GenericData
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.avro.AvroParquetReader
+import java.io.File
+import java.util.UUID
+
+private val logger = KotlinLogging.logger {}
+
+/**
+ * A [TraceReader] for the internal VM workload trace format.
+ *
+ * @param path The directory of the traces.
+ */
+@OptIn(ExperimentalStdlibApi::class)
+class Sc20RawParquetTraceReader(private val path: File) {
+ /**
+ * Read the fragments into memory.
+ */
+ private fun parseFragments(path: File): Map<String, List<FlopsHistoryFragment>> {
+ val reader = AvroParquetReader.builder<GenericData.Record>(Path(path.absolutePath, "trace.parquet"))
+ .disableCompatibility()
+ .build()
+
+ val fragments = mutableMapOf<String, MutableList<FlopsHistoryFragment>>()
+
+ return try {
+ while (true) {
+ val record = reader.read() ?: break
+
+ val id = record["id"].toString()
+ val tick = record["time"] as Long
+ val duration = record["duration"] as Long
+ val cores = record["cores"] as Int
+ val cpuUsage = record["cpuUsage"] as Double
+ val flops = record["flops"] as Long
+
+ val fragment = FlopsHistoryFragment(
+ tick,
+ flops,
+ duration,
+ cpuUsage,
+ cores
+ )
+
+ fragments.getOrPut(id) { mutableListOf() }.add(fragment)
+ }
+
+ fragments
+ } finally {
+ reader.close()
+ }
+ }
+
+ /**
+ * Read the metadata into a workload.
+ */
+ private fun parseMeta(path: File, fragments: Map<String, List<FlopsHistoryFragment>>): List<TraceEntryImpl> {
+ val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(path.absolutePath, "meta.parquet"))
+ .disableCompatibility()
+ .build()
+
+ var counter = 0
+ val entries = mutableListOf<TraceEntryImpl>()
+
+ return try {
+ while (true) {
+ val record = metaReader.read() ?: break
+ val id = record["id"].toString()
+ val submissionTime = record["submissionTime"] as Long
+ val endTime = record["endTime"] as Long
+ val maxCores = record["maxCores"] as Int
+ val requiredMemory = record["requiredMemory"] as Long
+ val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray())
+
+ logger.info { "VM $id" }
+
+ val vmFragments = fragments.getValue(id).asSequence()
+ val totalLoad = vmFragments.sumByDouble { it.usage } * 5 * 60 // avg MHz * duration = MFLOPs
+ val vmWorkload = VmWorkload(
+ uid, id,
+ UnnamedUser,
+ VmImage(
+ uid,
+ id,
+ mapOf(
+ "submit-time" to submissionTime,
+ "end-time" to endTime,
+ "total-load" to totalLoad
+ ),
+ vmFragments,
+ maxCores,
+ requiredMemory
+ )
+ )
+ entries.add(TraceEntryImpl(submissionTime, vmWorkload))
+ }
+
+ entries
+ } finally {
+ metaReader.close()
+ }
+ }
+
+ /**
+ * The entries in the trace.
+ */
+ private val entries: List<TraceEntryImpl>
+
+ init {
+ val fragments = parseFragments(path)
+ entries = parseMeta(path, fragments)
+ }
+
+ /**
+ * Read the entries in the trace.
+ */
+ public fun read(): List<TraceEntry<VmWorkload>> = entries
+
+ /**
+ * An unnamed user.
+ */
+ private object UnnamedUser : User {
+ override val name: String = "<unnamed>"
+ override val uid: UUID = UUID.randomUUID()
+ }
+
+ /**
+ * An entry in the trace.
+ */
+ internal data class TraceEntryImpl(
+ override var submissionTime: Long,
+ override val workload: VmWorkload
+ ) : TraceEntry<VmWorkload>
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt
index 8ae1693c..f6d6e6fd 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt
@@ -1,7 +1,7 @@
/*
* MIT License
*
- * Copyright (c) 2019 atlarge-research
+ * Copyright (c) 2020 atlarge-research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -22,7 +22,7 @@
* SOFTWARE.
*/
-package com.atlarge.opendc.experiments.sc20
+package com.atlarge.opendc.experiments.sc20.trace
import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment
import com.atlarge.opendc.compute.core.image.VmImage
@@ -53,13 +53,13 @@ import kotlin.random.Random
private val logger = KotlinLogging.logger {}
/**
- * A [TraceReader] for the internal VM workload trace format.
+ * A [TraceReader] for the internal VM workload trace format that streams workloads on the fly.
*
* @param traceFile The directory of the traces.
* @param performanceInterferenceModel The performance model covering the workload in the VM trace.
*/
@OptIn(ExperimentalStdlibApi::class)
-class Sc20ParquetTraceReader(
+class Sc20StreamingParquetTraceReader(
traceFile: File,
performanceInterferenceModel: PerformanceInterferenceModel,
selectedVms: List<String>,
@@ -82,7 +82,11 @@ class Sc20ParquetTraceReader(
if (selectedVms.isEmpty())
null
else
- FilterCompat.get(FilterApi.userDefined(FilterApi.binaryColumn("id"), SelectedVmFilter(TreeSet(selectedVms))))
+ FilterCompat.get(FilterApi.userDefined(FilterApi.binaryColumn("id"),
+ SelectedVmFilter(
+ TreeSet(selectedVms)
+ )
+ ))
/**
* A poisonous fragment.
@@ -227,11 +231,12 @@ class Sc20ParquetTraceReader(
}
val relevantPerformanceInterferenceModelItems =
PerformanceInterferenceModel(
- performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSet(),
+ performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSortedSet(),
Random(random.nextInt())
)
val vmWorkload = VmWorkload(
- uid, "VM Workload $id", UnnamedUser,
+ uid, "VM Workload $id",
+ UnnamedUser,
VmImage(
uid,
id,
@@ -242,7 +247,10 @@ class Sc20ParquetTraceReader(
)
)
- TraceEntryImpl(submissionTime, vmWorkload)
+ TraceEntryImpl(
+ submissionTime,
+ vmWorkload
+ )
}
.sortedBy { it.submissionTime }
.toList()
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt
index c62f59f9..04cdd302 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt
@@ -22,7 +22,7 @@
* SOFTWARE.
*/
-package com.atlarge.opendc.experiments.sc20
+package com.atlarge.opendc.experiments.sc20.trace
import org.apache.avro.SchemaBuilder
import org.apache.avro.generic.GenericData
@@ -139,7 +139,14 @@ fun main(args: Array<String>) {
)
} else {
val fragment =
- Fragment(vmId, timestamp, flops, traceInterval, cpuUsage, cores)
+ Fragment(
+ vmId,
+ timestamp,
+ flops,
+ traceInterval,
+ cpuUsage,
+ cores
+ )
if (last != null) {
yield(last!!)
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt
new file mode 100644
index 00000000..a8580686
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt
@@ -0,0 +1,69 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.trace
+
+import com.atlarge.opendc.compute.core.workload.VmWorkload
+import com.atlarge.opendc.experiments.sc20.experiment.model.Workload
+import com.atlarge.opendc.format.trace.TraceEntry
+import mu.KotlinLogging
+import kotlin.random.Random
+
+private val logger = KotlinLogging.logger {}
+
+/**
+ * Sample the workload for the specified [run].
+ */
+fun sampleWorkload(trace: List<TraceEntry<VmWorkload>>, workload: Workload, seed: Int): List<TraceEntry<VmWorkload>> {
+ return sampleRegularWorkload(trace, workload, seed)
+}
+
+/**
+ * Sample a regular (non-HPC) workload.
+ */
+fun sampleRegularWorkload(trace: List<TraceEntry<VmWorkload>>, workload: Workload, seed: Int): List<TraceEntry<VmWorkload>> {
+ val fraction = workload.fraction
+ if (fraction >= 1) {
+ return trace
+ }
+
+ val shuffled = trace.shuffled(Random(seed))
+ val res = mutableListOf<TraceEntry<VmWorkload>>()
+ val totalLoad = shuffled.sumByDouble { it.workload.image.tags.getValue("total-load") as Double }
+ var currentLoad = 0.0
+
+ for (entry in shuffled) {
+ val entryLoad = entry.workload.image.tags.getValue("total-load") as Double
+ if ((currentLoad + entryLoad) / totalLoad > fraction) {
+ break
+ }
+
+ currentLoad += entryLoad
+ res += entry
+ }
+
+ logger.info { "Sampled ${trace.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }
+
+ return res
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/resources/env/performance-interference.json b/opendc/opendc-experiments-sc20/src/main/resources/env/performance-interference.json
deleted file mode 100644
index 87a4f8af..00000000
--- a/opendc/opendc-experiments-sc20/src/main/resources/env/performance-interference.json
+++ /dev/null
@@ -1,7 +0,0 @@
-[
- {
- "vms": [545, 223],
- "minServerLoad": 0.84,
- "performanceScore": 0.6
- }
-]
diff --git a/opendc/opendc-experiments-sc20/src/main/resources/env/setup-small.json b/opendc/opendc-experiments-sc20/src/main/resources/env/setup-small.json
deleted file mode 100644
index 80b24dba..00000000
--- a/opendc/opendc-experiments-sc20/src/main/resources/env/setup-small.json
+++ /dev/null
@@ -1,21 +0,0 @@
-{
- "name": "Experimental Setup 2",
- "rooms": [
- {
- "type": "SERVER",
- "objects": [
- {
- "type": "RACK",
- "machines": [
- {"cpus": [1], "memories": [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]},
- {"cpus": [1], "memories": [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]},
- {"cpus": [1], "memories": [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]},
- {"cpus": [1], "memories": [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]},
- {"cpus": [1], "memories": [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]},
- {"cpus": [1], "memories": [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]}
- ]
- }
- ]
- }
- ]
-}
diff --git a/opendc/opendc-experiments-sc20/src/main/resources/env/setup-test.json b/opendc/opendc-experiments-sc20/src/main/resources/env/setup-test.json
deleted file mode 100644
index 02a1d25b..00000000
--- a/opendc/opendc-experiments-sc20/src/main/resources/env/setup-test.json
+++ /dev/null
@@ -1,36 +0,0 @@
-{
- "name": "Experimental Setup 2",
- "rooms": [
- {
- "type": "SERVER",
- "objects": [
- {
- "type": "RACK",
- "machines": [
- {"cpus": [2], "memories": [1, 1, 1, 1]}, {"cpus": [2], "memories": [1, 1, 1, 1]},
- {"cpus": [2], "memories": [1, 1, 1, 1]}, {"cpus": [2], "memories": [1, 1, 1, 1]},
- {"cpus": [2], "memories": [1, 1, 1, 1]}, {"cpus": [2], "memories": [1, 1, 1, 1]},
- {"cpus": [2], "memories": [1, 1, 1, 1]}, {"cpus": [2], "memories": [1, 1, 1, 1]},
- {"cpus": [2], "memories": [1, 1, 1, 1]}, {"cpus": [2], "memories": [1, 1, 1, 1]},
- {"cpus": [2], "memories": [1, 1, 1, 1]}, {"cpus": [2], "memories": [1, 1, 1, 1]},
- {"cpus": [2], "memories": [1, 1, 1, 1]}, {"cpus": [2], "memories": [1, 1, 1, 1]},
- {"cpus": [2], "memories": [1, 1, 1, 1]}, {"cpus": [2], "memories": [1, 1, 1, 1]}
- ]
- },
- {
- "type": "RACK",
- "machines": [
- {"cpus": [1], "memories": [1, 1, 1, 1]}, {"cpus": [1], "memories": [1, 1, 1, 1]},
- {"cpus": [1], "memories": [1, 1, 1, 1]}, {"cpus": [1], "memories": [1, 1, 1, 1]},
- {"cpus": [1], "memories": [1, 1, 1, 1]}, {"cpus": [1], "memories": [1, 1, 1, 1]},
- {"cpus": [1], "memories": [1, 1, 1, 1]}, {"cpus": [1], "memories": [1, 1, 1, 1]},
- {"cpus": [1], "memories": [1, 1, 1, 1]}, {"cpus": [1], "memories": [1, 1, 1, 1]},
- {"cpus": [1], "memories": [1, 1, 1, 1]}, {"cpus": [1], "memories": [1, 1, 1, 1]},
- {"cpus": [1], "memories": [1, 1, 1, 1]}, {"cpus": [1], "memories": [1, 1, 1, 1]},
- {"cpus": [1], "memories": [1, 1, 1, 1]}, {"cpus": [1], "memories": [1, 1, 1, 1]}
- ]
- }
- ]
- }
- ]
-}
diff --git a/opendc/opendc-experiments-sc20/src/main/resources/log4j2.xml b/opendc/opendc-experiments-sc20/src/main/resources/log4j2.xml
index 77a15e55..f47a6da8 100644
--- a/opendc/opendc-experiments-sc20/src/main/resources/log4j2.xml
+++ b/opendc/opendc-experiments-sc20/src/main/resources/log4j2.xml
@@ -33,7 +33,10 @@
<Logger name="com.atlarge.odcsim" level="info" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
- <Logger name="com.atlarge.opendc" level="info" additivity="false">
+ <Logger name="com.atlarge.opendc" level="warn" additivity="false">
+ <AppenderRef ref="Console"/>
+ </Logger>
+ <Logger name="com.atlarge.opendc.experiments.sc20" level="info" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
<Logger name="org.apache.hadoop" level="warn" additivity="false">
diff --git a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
index 239d018a..abd5c961 100644
--- a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
+++ b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
@@ -31,10 +31,17 @@ import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.workload.VmWorkload
import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService
import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy
+import com.atlarge.opendc.experiments.sc20.experiment.attachMonitor
+import com.atlarge.opendc.experiments.sc20.experiment.createFailureDomain
+import com.atlarge.opendc.experiments.sc20.experiment.createProvisioner
+import com.atlarge.opendc.experiments.sc20.experiment.model.Workload
+import com.atlarge.opendc.experiments.sc20.experiment.monitor.ExperimentMonitor
+import com.atlarge.opendc.experiments.sc20.experiment.processTrace
+import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader
+import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader
import com.atlarge.opendc.format.environment.EnvironmentReader
import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
import com.atlarge.opendc.format.trace.TraceReader
-import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
@@ -63,7 +70,7 @@ class Sc20IntegrationTest {
/**
* The monitor used to keep track of the metrics.
*/
- private lateinit var monitor: TestSc20Reporter
+ private lateinit var monitor: TestExperimentReporter
/**
* Setup the experimental environment.
@@ -73,7 +80,7 @@ class Sc20IntegrationTest {
val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
simulationEngine = provider("test")
root = simulationEngine.newDomain("root")
- monitor = TestSc20Reporter()
+ monitor = TestExperimentReporter()
}
/**
@@ -95,19 +102,33 @@ class Sc20IntegrationTest {
lateinit var scheduler: SimpleVirtProvisioningService
root.launch {
- val res = createProvisioner(root, environmentReader, allocationPolicy)
+ val res = createProvisioner(
+ root,
+ environmentReader,
+ allocationPolicy
+ )
val bareMetalProvisioner = res.first
scheduler = res.second
val failureDomain = if (failures) {
println("ENABLING failures")
- createFailureDomain(seed, 24 * 7, bareMetalProvisioner, chan)
+ createFailureDomain(
+ seed,
+ 24.0 * 7,
+ bareMetalProvisioner,
+ chan
+ )
} else {
null
}
attachMonitor(scheduler, monitor)
- processTrace(traceReader, scheduler, chan, monitor)
+ processTrace(
+ traceReader,
+ scheduler,
+ chan,
+ monitor
+ )
println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}")
@@ -121,8 +142,8 @@ class Sc20IntegrationTest {
assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs")
assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run")
assertEquals(207379117949, monitor.totalRequestedBurst)
- assertEquals(207378478631, monitor.totalGrantedBurst)
- assertEquals(639360, monitor.totalOvercommissionedBurst)
+ assertEquals(207102919834, monitor.totalGrantedBurst)
+ assertEquals(276198896, monitor.totalOvercommissionedBurst)
assertEquals(0, monitor.totalInterferedBurst)
}
@@ -137,10 +158,12 @@ class Sc20IntegrationTest {
* Obtain the trace reader for the test.
*/
private fun createTestTraceReader(): TraceReader<VmWorkload> {
- val performanceInterferenceStream = object {}.javaClass.getResourceAsStream("/env/performance-interference.json")
- val performanceInterferenceModel = Sc20PerformanceInterferenceReader(performanceInterferenceStream)
- .construct()
- return createTraceReader(File("src/test/resources/trace"), performanceInterferenceModel, emptyList(), 0)
+ return Sc20ParquetTraceReader(
+ Sc20RawParquetTraceReader(File("src/test/resources/trace")),
+ emptyMap(),
+ Workload("test", 1.0),
+ 0
+ )
}
/**
@@ -151,13 +174,13 @@ class Sc20IntegrationTest {
return Sc20ClusterEnvironmentReader(stream)
}
- class TestSc20Reporter : Sc20Reporter {
+ class TestExperimentReporter : ExperimentMonitor {
var totalRequestedBurst = 0L
var totalGrantedBurst = 0L
var totalOvercommissionedBurst = 0L
var totalInterferedBurst = 0L
- override suspend fun reportHostSlice(
+ override fun reportHostSlice(
time: Long,
requestedBurst: Long,
grantedBurst: Long,
@@ -167,10 +190,6 @@ class Sc20IntegrationTest {
cpuDemand: Double,
numberOfDeployedImages: Int,
hostServer: Server,
- submittedVms: Long,
- queuedVms: Long,
- runningVms: Long,
- finishedVms: Long,
duration: Long
) {
totalRequestedBurst += requestedBurst
diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/PerformanceInterferenceModelReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/PerformanceInterferenceModelReader.kt
index a653e643..407bc0b4 100644
--- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/PerformanceInterferenceModelReader.kt
+++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/PerformanceInterferenceModelReader.kt
@@ -26,6 +26,7 @@ package com.atlarge.opendc.format.trace
import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
import java.io.Closeable
+import kotlin.random.Random
/**
* An interface for reading descriptions of performance interference models into memory.
@@ -34,5 +35,5 @@ interface PerformanceInterferenceModelReader : Closeable {
/**
* Construct a [PerformanceInterferenceModel].
*/
- fun construct(): PerformanceInterferenceModel
+ fun construct(random: Random): Map<String, PerformanceInterferenceModel>
}
diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt
index 5220af9b..2a8fefeb 100644
--- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt
+++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt
@@ -125,7 +125,7 @@ class BitbrainsTraceReader(
val relevantPerformanceInterferenceModelItems =
PerformanceInterferenceModel(
- performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId.toString()) }.toSet()
+ performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId.toString()) }.toSortedSet()
)
val vmWorkload = VmWorkload(
diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt
index 8562cefe..0e8e1fd2 100644
--- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt
+++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt
@@ -32,6 +32,7 @@ import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import java.io.InputStream
import java.util.TreeSet
+import kotlin.random.Random
/**
* A parser for the JSON performance interference setup files used for the SC20 paper.
@@ -42,20 +43,25 @@ import java.util.TreeSet
class Sc20PerformanceInterferenceReader(input: InputStream, mapper: ObjectMapper = jacksonObjectMapper()) :
PerformanceInterferenceModelReader {
/**
- * The environment that was read from the file.
+ * The computed value from the file.
*/
- private val performanceInterferenceModel: List<PerformanceInterferenceEntry> = mapper.readValue(input)
+ private val items: Map<String, TreeSet<PerformanceInterferenceModelItem>>
- override fun construct(): PerformanceInterferenceModel {
- return PerformanceInterferenceModel(
- performanceInterferenceModel.map { item ->
- PerformanceInterferenceModelItem(
- TreeSet(item.vms),
- item.minServerLoad,
- item.performanceScore
- )
- }.toSet()
- )
+ init {
+ val entries: List<PerformanceInterferenceEntry> = mapper.readValue(input)
+ val res = mutableMapOf<String, TreeSet<PerformanceInterferenceModelItem>>()
+ for (entry in entries) {
+ val item = PerformanceInterferenceModelItem(TreeSet(entry.vms), entry.minServerLoad, entry.performanceScore)
+ for (workload in entry.vms) {
+ res.computeIfAbsent(workload) { TreeSet() }.add(item)
+ }
+ }
+
+ items = res
+ }
+
+ override fun construct(random: Random): Map<String, PerformanceInterferenceModel> {
+ return items.mapValues { PerformanceInterferenceModel(it.value, Random(random.nextInt())) }
}
override fun close() {}
diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt
index c53cd569..076274d5 100644
--- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt
+++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt
@@ -160,7 +160,7 @@ class Sc20TraceReader(
val relevantPerformanceInterferenceModelItems =
PerformanceInterferenceModel(
- performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId) }.toSet(),
+ performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId) }.toSortedSet(),
Random(random.nextInt())
)
val vmWorkload = VmWorkload(