From ef83b0ff68f023a64a7e3446b23e4a6f02aca841 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 23 Feb 2021 10:53:42 +0100 Subject: Remove unnecessary dependencies on JUnit Platform Launcher This change removes unnecessary dependencies on JUnit Platform launcher from the repository. Previously, the launcher was used to bootstrap tests for Gradle when it did not natively support JUnit Platform. Gradle now has native support for JUnit Platform, so the dependency is not needed anymore. --- simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts | 1 - simulator/opendc-experiments/opendc-experiments-sc20/build.gradle.kts | 1 - 2 files changed, 2 deletions(-) (limited to 'simulator/opendc-experiments') diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts b/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts index b6b35694..2d5e018b 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts +++ b/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts @@ -46,5 +46,4 @@ dependencies { testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}") testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:${Library.JUNIT_JUPITER}") - testImplementation("org.junit.platform:junit-platform-launcher:${Library.JUNIT_PLATFORM}") } diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/build.gradle.kts b/simulator/opendc-experiments/opendc-experiments-sc20/build.gradle.kts index b94207ba..8fac6524 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc20/build.gradle.kts +++ b/simulator/opendc-experiments/opendc-experiments-sc20/build.gradle.kts @@ -54,5 +54,4 @@ dependencies { testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}") testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:${Library.JUNIT_JUPITER}") - testImplementation("org.junit.platform:junit-platform-launcher:${Library.JUNIT_PLATFORM}") } -- cgit v1.2.3 From 0b092b352dc29ce69f6f126eb7857a1243a6ef62 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 23 Feb 2021 11:46:06 +0100 Subject: Extract testing conventions from Kotlin conventions This change extracts the configuration for test from the Kotlin library conventions. --- .../opendc-experiments/opendc-experiments-sc18/build.gradle.kts | 5 +---- .../opendc-experiments/opendc-experiments-sc20/build.gradle.kts | 6 ++---- 2 files changed, 3 insertions(+), 8 deletions(-) (limited to 'simulator/opendc-experiments') diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts b/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts index 2d5e018b..f02d1e38 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts +++ b/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts @@ -24,7 +24,7 @@ description = "Experiments for the TPDS paper" /* Build configuration */ plugins { - `kotlin-library-convention` + `kotlin-library-conventions` application } @@ -43,7 +43,4 @@ dependencies { exclude("org.jetbrains.kotlin", module = "kotlin-reflect") } implementation(kotlin("reflect")) - - testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}") - testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:${Library.JUNIT_JUPITER}") } diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/build.gradle.kts b/simulator/opendc-experiments/opendc-experiments-sc20/build.gradle.kts index 8fac6524..271852b3 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc20/build.gradle.kts +++ b/simulator/opendc-experiments/opendc-experiments-sc20/build.gradle.kts @@ -24,7 +24,8 @@ description = "Experiments for the SC20 paper" /* Build configuration */ plugins { - `kotlin-library-convention` + `kotlin-library-conventions` + `testing-conventions` application } @@ -51,7 +52,4 @@ dependencies { exclude(group = "org.slf4j", module = "slf4j-log4j12") exclude(group = "log4j") } - - testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}") - testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:${Library.JUNIT_JUPITER}") } -- cgit v1.2.3 From 90de768b8bfb3acc222f508d2e602ef3b7f1ff91 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 23 Feb 2021 12:05:59 +0100 Subject: Move dependency versions to gradle.properties This change moves the version of the dependencies from buildSrc to gradle.properties to prevent recompilation when changing dependency versions. --- simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts | 4 ---- simulator/opendc-experiments/opendc-experiments-sc20/build.gradle.kts | 2 +- 2 files changed, 1 insertion(+), 5 deletions(-) (limited to 'simulator/opendc-experiments') diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts b/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts index f02d1e38..9c21a8c6 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts +++ b/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts @@ -39,8 +39,4 @@ dependencies { implementation(project(":opendc-workflows")) implementation(project(":opendc-simulator:opendc-simulator-core")) implementation(project(":opendc-compute:opendc-compute-simulator")) - implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.9.8") { - exclude("org.jetbrains.kotlin", module = "kotlin-reflect") - } - implementation(kotlin("reflect")) } diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/build.gradle.kts b/simulator/opendc-experiments/opendc-experiments-sc20/build.gradle.kts index 271852b3..6592c86f 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc20/build.gradle.kts +++ b/simulator/opendc-experiments/opendc-experiments-sc20/build.gradle.kts @@ -43,7 +43,7 @@ dependencies { implementation(project(":opendc-simulator:opendc-simulator-failures")) implementation(project(":opendc-compute:opendc-compute-simulator")) - implementation("io.github.microutils:kotlin-logging:2.0.4") + implementation("io.github.microutils:kotlin-logging:${versions.kotlinLogging}") implementation("me.tongfei:progressbar:0.9.0") implementation("com.github.ajalt.clikt:clikt:3.1.0") -- cgit v1.2.3 From 2119427fe6f7874867c6985cacda28befc53436b Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 23 Feb 2021 12:37:31 +0100 Subject: Utilize version constraints for shared versions This change updates the Gradle configuration to utilize version constraints to force the same dependency version across modules. --- .../opendc-experiments-sc20/build.gradle.kts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'simulator/opendc-experiments') diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/build.gradle.kts b/simulator/opendc-experiments/opendc-experiments-sc20/build.gradle.kts index 6592c86f..8127d9eb 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc20/build.gradle.kts +++ b/simulator/opendc-experiments/opendc-experiments-sc20/build.gradle.kts @@ -43,12 +43,12 @@ dependencies { implementation(project(":opendc-simulator:opendc-simulator-failures")) implementation(project(":opendc-compute:opendc-compute-simulator")) - implementation("io.github.microutils:kotlin-logging:${versions.kotlinLogging}") - implementation("me.tongfei:progressbar:0.9.0") - implementation("com.github.ajalt.clikt:clikt:3.1.0") + implementation("io.github.microutils:kotlin-logging") + implementation("me.tongfei:progressbar:${versions["progressbar"]}") + implementation("com.github.ajalt.clikt:clikt:${versions["clikt"]}") - implementation("org.apache.parquet:parquet-avro:1.11.0") - implementation("org.apache.hadoop:hadoop-client:3.2.1") { + implementation("org.apache.parquet:parquet-avro:${versions["parquet-avro"]}") + implementation("org.apache.hadoop:hadoop-client:${versions["hadoop-client"]}") { exclude(group = "org.slf4j", module = "slf4j-log4j12") exclude(group = "log4j") } -- cgit v1.2.3 From e99ceb1575f50d98ad61f5101b7b69ab69453d70 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 23 Feb 2021 20:38:31 +0100 Subject: exp: Support running SC18 experiment from within IntelliJ --- .../opendc-experiments/opendc-experiments-sc18/build.gradle.kts | 8 ++------ .../org/opendc/experiments/sc18/UnderspecificationExperiment.kt | 2 +- 2 files changed, 3 insertions(+), 7 deletions(-) (limited to 'simulator/opendc-experiments') diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts b/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts index 9c21a8c6..f4d947d8 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts +++ b/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts @@ -20,16 +20,12 @@ * SOFTWARE. */ -description = "Experiments for the TPDS paper" +description = "Experiments for the SC18 article" /* Build configuration */ plugins { `kotlin-library-conventions` - application -} - -application { - mainClass.set("org.opendc.harness.runner.console.ConsoleRunnerKt") + `experiment-conventions` } dependencies { diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt index 6d2c0ec7..3843f198 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt +++ b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal -- cgit v1.2.3 From 481706adc89d502840c967d94a165c55d8ac0e1a Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 23 Feb 2021 21:30:51 +0100 Subject: exp: Add support for running Capelin experiments in IntelliJ --- .../opendc-experiments-capelin/.gitignore | 2 + .../opendc-experiments-capelin/build.gradle.kts | 50 ++ .../capelin/CompositeWorkloadPortfolio.kt | 79 +++ .../experiments/capelin/ExperimentHelpers.kt | 276 +++++++++ .../opendc/experiments/capelin/HorVerPortfolio.kt | 60 ++ .../opendc/experiments/capelin/MoreHpcPortfolio.kt | 59 ++ .../experiments/capelin/MoreVelocityPortfolio.kt | 56 ++ .../capelin/OperationalPhenomenaPortfolio.kt | 61 ++ .../org/opendc/experiments/capelin/Portfolio.kt | 221 ++++++++ .../opendc/experiments/capelin/ReplayPortfolio.kt | 50 ++ .../opendc/experiments/capelin/TestPortfolio.kt | 47 ++ .../capelin/model/OperationalPhenomena.kt | 31 + .../opendc/experiments/capelin/model/Topology.kt | 28 + .../opendc/experiments/capelin/model/Workload.kt | 44 ++ .../capelin/monitor/ExperimentMonitor.kt | 75 +++ .../capelin/monitor/ParquetExperimentMonitor.kt | 202 +++++++ .../opendc/experiments/capelin/telemetry/Event.kt | 35 ++ .../experiments/capelin/telemetry/HostEvent.kt | 43 ++ .../capelin/telemetry/ProvisionerEvent.kt | 39 ++ .../experiments/capelin/telemetry/RunEvent.kt | 34 ++ .../experiments/capelin/telemetry/VmEvent.kt | 41 ++ .../telemetry/parquet/ParquetEventWriter.kt | 126 +++++ .../telemetry/parquet/ParquetHostEventWriter.kt | 81 +++ .../parquet/ParquetProvisionerEventWriter.kt | 65 +++ .../telemetry/parquet/ParquetRunEventWriter.kt | 72 +++ .../capelin/trace/Sc20ParquetTraceReader.kt | 94 ++++ .../capelin/trace/Sc20RawParquetTraceReader.kt | 170 ++++++ .../trace/Sc20StreamingParquetTraceReader.kt | 305 ++++++++++ .../capelin/trace/Sc20TraceConverter.kt | 621 +++++++++++++++++++++ .../experiments/capelin/trace/WorkloadSampler.kt | 210 +++++++ .../src/main/resources/log4j2.xml | 49 ++ .../experiments/capelin/CapelinIntegrationTest.kt | 256 +++++++++ .../src/test/resources/env/single.txt | 3 + .../src/test/resources/env/topology.txt | 5 + .../src/test/resources/trace/meta.parquet | Bin 0 -> 2148 bytes .../src/test/resources/trace/trace.parquet | Bin 0 -> 1672463 bytes .../opendc-experiments-sc18/.gitignore | 2 + .../sc18/UnderspecificationExperiment.kt | 4 +- .../opendc-experiments-sc20/build.gradle.kts | 55 -- .../sc20/experiment/CompositeWorkloadPortfolio.kt | 79 --- .../sc20/experiment/ExperimentHelpers.kt | 276 --------- .../experiments/sc20/experiment/HorVerPortfolio.kt | 60 -- .../sc20/experiment/MoreHpcPortfolio.kt | 56 -- .../sc20/experiment/MoreVelocityPortfolio.kt | 56 -- .../experiment/OperationalPhenomenaPortfolio.kt | 61 -- .../experiments/sc20/experiment/Portfolio.kt | 217 ------- .../experiments/sc20/experiment/ReplayPortfolio.kt | 50 -- .../experiments/sc20/experiment/TestPortfolio.kt | 47 -- .../sc20/experiment/model/OperationalPhenomena.kt | 33 -- .../experiments/sc20/experiment/model/Topology.kt | 30 - .../experiments/sc20/experiment/model/Workload.kt | 44 -- .../sc20/experiment/monitor/ExperimentMonitor.kt | 75 --- .../experiment/monitor/ParquetExperimentMonitor.kt | 202 ------- .../org/opendc/experiments/sc20/telemetry/Event.kt | 35 -- .../opendc/experiments/sc20/telemetry/HostEvent.kt | 43 -- .../experiments/sc20/telemetry/ProvisionerEvent.kt | 39 -- .../opendc/experiments/sc20/telemetry/RunEvent.kt | 34 -- .../opendc/experiments/sc20/telemetry/VmEvent.kt | 41 -- .../sc20/telemetry/parquet/ParquetEventWriter.kt | 126 ----- .../telemetry/parquet/ParquetHostEventWriter.kt | 81 --- .../parquet/ParquetProvisionerEventWriter.kt | 65 --- .../telemetry/parquet/ParquetRunEventWriter.kt | 72 --- .../sc20/trace/Sc20ParquetTraceReader.kt | 94 ---- .../sc20/trace/Sc20RawParquetTraceReader.kt | 170 ------ .../sc20/trace/Sc20StreamingParquetTraceReader.kt | 305 ---------- .../experiments/sc20/trace/Sc20TraceConverter.kt | 621 --------------------- .../experiments/sc20/trace/WorkloadSampler.kt | 210 ------- .../src/main/resources/log4j2.xml | 49 -- .../opendc/experiments/sc20/Sc20IntegrationTest.kt | 256 --------- .../src/test/resources/env/single.txt | 3 - .../src/test/resources/env/topology.txt | 5 - .../src/test/resources/trace/meta.parquet | Bin 2148 -> 0 bytes .../src/test/resources/trace/trace.parquet | Bin 1672463 -> 0 bytes 73 files changed, 3594 insertions(+), 3592 deletions(-) create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/.gitignore create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/OperationalPhenomena.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Topology.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/Event.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/ProvisionerEvent.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/RunEvent.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetProvisionerEventWriter.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetRunEventWriter.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/topology.txt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet create mode 100644 simulator/opendc-experiments/opendc-experiments-sc18/.gitignore delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/build.gradle.kts delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/CompositeWorkloadPortfolio.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/ExperimentHelpers.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/HorVerPortfolio.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/MoreHpcPortfolio.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/MoreVelocityPortfolio.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/OperationalPhenomenaPortfolio.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/Portfolio.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/ReplayPortfolio.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/TestPortfolio.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/model/OperationalPhenomena.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/model/Topology.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/model/Workload.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/monitor/ExperimentMonitor.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/Event.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/HostEvent.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/ProvisionerEvent.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/RunEvent.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/VmEvent.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20TraceConverter.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/WorkloadSampler.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/main/resources/log4j2.xml delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/test/resources/env/single.txt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/test/resources/env/topology.txt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/test/resources/trace/meta.parquet delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc20/src/test/resources/trace/trace.parquet (limited to 'simulator/opendc-experiments') diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/.gitignore b/simulator/opendc-experiments/opendc-experiments-capelin/.gitignore new file mode 100644 index 00000000..ba64707c --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/.gitignore @@ -0,0 +1,2 @@ +input/ +output/ diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts new file mode 100644 index 00000000..041cf36a --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2019 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "Experiments for the Capelin work" + +/* Build configuration */ +plugins { + `kotlin-library-conventions` + `experiment-conventions` + `testing-conventions` +} + +dependencies { + api(project(":opendc-core")) + api(project(":opendc-harness")) + implementation(project(":opendc-format")) + implementation(project(":opendc-simulator:opendc-simulator-core")) + implementation(project(":opendc-simulator:opendc-simulator-compute")) + implementation(project(":opendc-simulator:opendc-simulator-failures")) + implementation(project(":opendc-compute:opendc-compute-simulator")) + + implementation("io.github.microutils:kotlin-logging") + implementation("me.tongfei:progressbar:${versions["progressbar"]}") + implementation("com.github.ajalt.clikt:clikt:${versions["clikt"]}") + + implementation("org.apache.parquet:parquet-avro:${versions["parquet-avro"]}") + implementation("org.apache.hadoop:hadoop-client:${versions["hadoop-client"]}") { + exclude(group = "org.slf4j", module = "slf4j-log4j12") + exclude(group = "log4j") + } +} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt new file mode 100644 index 00000000..faabe5cb --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin + +import org.opendc.experiments.capelin.model.CompositeWorkload +import org.opendc.experiments.capelin.model.OperationalPhenomena +import org.opendc.experiments.capelin.model.Topology +import org.opendc.experiments.capelin.model.Workload +import org.opendc.harness.dsl.anyOf + +/** + * A [Portfolio] that explores the effect of a composite workload. + */ +public class CompositeWorkloadPortfolio : Portfolio("composite-workload") { + private val totalSampleLoad = 1.3301733005049648E12 + + override val topology: Topology by anyOf( + Topology("base"), + Topology("exp-vol-hor-hom"), + Topology("exp-vol-ver-hom"), + Topology("exp-vel-ver-hom") + ) + + override val workload: Workload by anyOf( + CompositeWorkload( + "all-azure", + listOf(Workload("solvinity-short", 0.0), Workload("azure", 1.0)), + totalSampleLoad + ), + CompositeWorkload( + "solvinity-25-azure-75", + listOf(Workload("solvinity-short", 0.25), Workload("azure", 0.75)), + totalSampleLoad + ), + CompositeWorkload( + "solvinity-50-azure-50", + listOf(Workload("solvinity-short", 0.5), Workload("azure", 0.5)), + totalSampleLoad + ), + CompositeWorkload( + "solvinity-75-azure-25", + listOf(Workload("solvinity-short", 0.75), Workload("azure", 0.25)), + totalSampleLoad + ), + CompositeWorkload( + "all-solvinity", + listOf(Workload("solvinity-short", 1.0), Workload("azure", 0.0)), + totalSampleLoad + ) + ) + + override val operationalPhenomena: OperationalPhenomena by anyOf( + OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = false) + ) + + override val allocationPolicy: String by anyOf( + "active-servers" + ) +} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt new file mode 100644 index 00000000..8f3e686a --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -0,0 +1,276 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.experiment + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.takeWhile +import kotlinx.coroutines.launch +import mu.KotlinLogging +import org.opendc.compute.core.Flavor +import org.opendc.compute.core.ServerEvent +import org.opendc.compute.core.metal.NODE_CLUSTER +import org.opendc.compute.core.metal.driver.BareMetalDriver +import org.opendc.compute.core.metal.service.ProvisioningService +import org.opendc.compute.core.virt.HypervisorEvent +import org.opendc.compute.core.virt.service.VirtProvisioningEvent +import org.opendc.compute.core.workload.VmWorkload +import org.opendc.compute.simulator.SimBareMetalDriver +import org.opendc.compute.simulator.SimVirtDriver +import org.opendc.compute.simulator.SimVirtProvisioningService +import org.opendc.compute.simulator.allocation.AllocationPolicy +import org.opendc.experiments.capelin.monitor.ExperimentMonitor +import org.opendc.experiments.capelin.trace.Sc20StreamingParquetTraceReader +import org.opendc.format.environment.EnvironmentReader +import org.opendc.format.trace.TraceReader +import org.opendc.simulator.compute.SimFairShareHypervisorProvider +import org.opendc.simulator.compute.interference.PerformanceInterferenceModel +import org.opendc.simulator.failures.CorrelatedFaultInjector +import org.opendc.simulator.failures.FailureDomain +import org.opendc.simulator.failures.FaultInjector +import org.opendc.trace.core.EventTracer +import java.io.File +import java.time.Clock +import kotlin.math.ln +import kotlin.math.max +import kotlin.random.Random + +/** + * The logger for this experiment. + */ +private val logger = KotlinLogging.logger {} + +/** + * Construct the failure domain for the experiments. + */ +public suspend fun createFailureDomain( + coroutineScope: CoroutineScope, + clock: Clock, + seed: Int, + failureInterval: Double, + bareMetalProvisioner: ProvisioningService, + chan: Channel +): CoroutineScope { + val job = coroutineScope.launch { + chan.receive() + val random = Random(seed) + val injectors = mutableMapOf() + for (node in bareMetalProvisioner.nodes()) { + val cluster = node.metadata[NODE_CLUSTER] as String + val injector = + injectors.getOrPut(cluster) { + createFaultInjector( + this, + clock, + random, + failureInterval + ) + } + injector.enqueue(node.metadata["driver"] as FailureDomain) + } + } + return CoroutineScope(coroutineScope.coroutineContext + job) +} + +/** + * Obtain the [FaultInjector] to use for the experiments. + */ +public fun createFaultInjector( + coroutineScope: CoroutineScope, + clock: Clock, + random: Random, + failureInterval: Double +): FaultInjector { + // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 + // GRID'5000 + return CorrelatedFaultInjector( + coroutineScope, + clock, + iatScale = ln(failureInterval), iatShape = 1.03, // Hours + sizeScale = ln(2.0), sizeShape = ln(1.0), // Expect 2 machines, with variation of 1 + dScale = ln(60.0), dShape = ln(60.0 * 8), // Minutes + random = random + ) +} + +/** + * Create the trace reader from which the VM workloads are read. + */ +public fun createTraceReader( + path: File, + performanceInterferenceModel: PerformanceInterferenceModel, + vms: List, + seed: Int +): Sc20StreamingParquetTraceReader { + return Sc20StreamingParquetTraceReader( + path, + performanceInterferenceModel, + vms, + Random(seed) + ) +} + +/** + * Construct the environment for a VM provisioner and return the provisioner instance. + */ +public suspend fun createProvisioner( + coroutineScope: CoroutineScope, + clock: Clock, + environmentReader: EnvironmentReader, + allocationPolicy: AllocationPolicy, + eventTracer: EventTracer +): Pair { + val environment = environmentReader.use { it.construct(coroutineScope, clock) } + val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService] + + // Wait for the bare metal nodes to be spawned + delay(10) + + val scheduler = SimVirtProvisioningService(coroutineScope, clock, bareMetalProvisioner, allocationPolicy, eventTracer, SimFairShareHypervisorProvider()) + + // Wait for the hypervisors to be spawned + delay(10) + + return bareMetalProvisioner to scheduler +} + +/** + * Attach the specified monitor to the VM provisioner. + */ +@OptIn(ExperimentalCoroutinesApi::class) +public suspend fun attachMonitor( + coroutineScope: CoroutineScope, + clock: Clock, + scheduler: SimVirtProvisioningService, + monitor: ExperimentMonitor +) { + + val hypervisors = scheduler.drivers() + + // Monitor hypervisor events + for (hypervisor in hypervisors) { + // TODO Do not expose VirtDriver directly but use Hypervisor class. + val server = (hypervisor as SimVirtDriver).server + monitor.reportHostStateChange(clock.millis(), hypervisor, server) + server.events + .onEach { event -> + val time = clock.millis() + when (event) { + is ServerEvent.StateChanged -> { + monitor.reportHostStateChange(time, hypervisor, event.server) + } + } + } + .launchIn(coroutineScope) + hypervisor.events + .onEach { event -> + when (event) { + is HypervisorEvent.SliceFinished -> monitor.reportHostSlice( + clock.millis(), + event.requestedBurst, + event.grantedBurst, + event.overcommissionedBurst, + event.interferedBurst, + event.cpuUsage, + event.cpuDemand, + event.numberOfDeployedImages, + event.hostServer + ) + } + } + .launchIn(coroutineScope) + + val driver = hypervisor.server.services[BareMetalDriver.Key] as SimBareMetalDriver + driver.powerDraw + .onEach { monitor.reportPowerConsumption(hypervisor.server, it) } + .launchIn(coroutineScope) + } + + scheduler.events + .onEach { event -> + when (event) { + is VirtProvisioningEvent.MetricsAvailable -> + monitor.reportProvisionerMetrics(clock.millis(), event) + } + } + .launchIn(coroutineScope) +} + +/** + * Process the trace. + */ +public suspend fun processTrace( + coroutineScope: CoroutineScope, + clock: Clock, + reader: TraceReader, + scheduler: SimVirtProvisioningService, + chan: Channel, + monitor: ExperimentMonitor +) { + try { + var submitted = 0 + + while (reader.hasNext()) { + val (time, workload) = reader.next() + + submitted++ + delay(max(0, time - clock.millis())) + coroutineScope.launch { + chan.send(Unit) + val server = scheduler.deploy( + workload.image.name, + workload.image, + Flavor( + workload.image.tags["cores"] as Int, + workload.image.tags["required-memory"] as Long + ) + ) + // Monitor server events + server.events + .onEach { + if (it is ServerEvent.StateChanged) { + monitor.reportVmStateChange(clock.millis(), it.server) + } + } + .collect() + } + } + + scheduler.events + .takeWhile { + when (it) { + is VirtProvisioningEvent.MetricsAvailable -> + it.inactiveVmCount + it.failedVmCount != submitted + } + } + .collect() + delay(1) + } finally { + reader.close() + } +} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt new file mode 100644 index 00000000..e1cf8517 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin + +import org.opendc.experiments.capelin.model.OperationalPhenomena +import org.opendc.experiments.capelin.model.Topology +import org.opendc.experiments.capelin.model.Workload +import org.opendc.harness.dsl.anyOf + +/** + * A [Portfolio] that explores the difference between horizontal and vertical scaling. + */ +public class HorVerPortfolio : Portfolio("horizontal_vs_vertical") { + override val topology: Topology by anyOf( + Topology("base"), + Topology("rep-vol-hor-hom"), + Topology("rep-vol-hor-het"), + Topology("rep-vol-ver-hom"), + Topology("rep-vol-ver-het"), + Topology("exp-vol-hor-hom"), + Topology("exp-vol-hor-het"), + Topology("exp-vol-ver-hom"), + Topology("exp-vol-ver-het") + ) + + override val workload: Workload by anyOf( + Workload("solvinity", 0.1), + Workload("solvinity", 0.25), + Workload("solvinity", 0.5), + Workload("solvinity", 1.0) + ) + + override val operationalPhenomena: OperationalPhenomena by anyOf( + OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true) + ) + + override val allocationPolicy: String by anyOf( + "active-servers" + ) +} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt new file mode 100644 index 00000000..a995e467 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin + +import org.opendc.experiments.capelin.model.OperationalPhenomena +import org.opendc.experiments.capelin.model.SamplingStrategy +import org.opendc.experiments.capelin.model.Topology +import org.opendc.experiments.capelin.model.Workload +import org.opendc.harness.dsl.anyOf + +/** + * A [Portfolio] to explore the effect of HPC workloads. + */ +public class MoreHpcPortfolio : Portfolio("more_hpc") { + override val topology: Topology by anyOf( + Topology("base"), + Topology("exp-vol-hor-hom"), + Topology("exp-vol-ver-hom"), + Topology("exp-vel-ver-hom") + ) + + override val workload: Workload by anyOf( + Workload("solvinity", 0.0, samplingStrategy = SamplingStrategy.HPC), + Workload("solvinity", 0.25, samplingStrategy = SamplingStrategy.HPC), + Workload("solvinity", 0.5, samplingStrategy = SamplingStrategy.HPC), + Workload("solvinity", 1.0, samplingStrategy = SamplingStrategy.HPC), + Workload("solvinity", 0.25, samplingStrategy = SamplingStrategy.HPC_LOAD), + Workload("solvinity", 0.5, samplingStrategy = SamplingStrategy.HPC_LOAD), + Workload("solvinity", 1.0, samplingStrategy = SamplingStrategy.HPC_LOAD) + ) + + override val operationalPhenomena: OperationalPhenomena by anyOf( + OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true) + ) + + override val allocationPolicy: String by anyOf( + "active-servers" + ) +} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt new file mode 100644 index 00000000..49559e0e --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin + +import org.opendc.experiments.capelin.model.OperationalPhenomena +import org.opendc.experiments.capelin.model.Topology +import org.opendc.experiments.capelin.model.Workload +import org.opendc.harness.dsl.anyOf + +/** + * A [Portfolio] that explores the effect of adding more velocity to a cluster (e.g., faster machines). + */ +public class MoreVelocityPortfolio : Portfolio("more_velocity") { + override val topology: Topology by anyOf( + Topology("base"), + Topology("rep-vel-ver-hom"), + Topology("rep-vel-ver-het"), + Topology("exp-vel-ver-hom"), + Topology("exp-vel-ver-het") + ) + + override val workload: Workload by anyOf( + Workload("solvinity", 0.1), + Workload("solvinity", 0.25), + Workload("solvinity", 0.5), + Workload("solvinity", 1.0) + ) + + override val operationalPhenomena: OperationalPhenomena by anyOf( + OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true) + ) + + override val allocationPolicy: String by anyOf( + "active-servers" + ) +} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt new file mode 100644 index 00000000..1aac4f9e --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin + +import org.opendc.experiments.capelin.model.OperationalPhenomena +import org.opendc.experiments.capelin.model.Topology +import org.opendc.experiments.capelin.model.Workload +import org.opendc.harness.dsl.anyOf + +/** + * A [Portfolio] that explores the effect of operational phenomena on metrics. + */ +public class OperationalPhenomenaPortfolio : Portfolio("operational_phenomena") { + override val topology: Topology by anyOf( + Topology("base") + ) + + override val workload: Workload by anyOf( + Workload("solvinity", 0.1), + Workload("solvinity", 0.25), + Workload("solvinity", 0.5), + Workload("solvinity", 1.0) + ) + + override val operationalPhenomena: OperationalPhenomena by anyOf( + OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true), + OperationalPhenomena(failureFrequency = 0.0, hasInterference = true), + OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = false), + OperationalPhenomena(failureFrequency = 0.0, hasInterference = false) + ) + + override val allocationPolicy: String by anyOf( + "mem", + "mem-inv", + "core-mem", + "core-mem-inv", + "active-servers", + "active-servers-inv", + "random" + ) +} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt new file mode 100644 index 00000000..75b0d735 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -0,0 +1,221 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin + +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.cancel +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.launch +import kotlinx.coroutines.test.TestCoroutineScope +import mu.KotlinLogging +import org.opendc.compute.simulator.allocation.* +import org.opendc.experiments.capelin.experiment.attachMonitor +import org.opendc.experiments.capelin.experiment.createFailureDomain +import org.opendc.experiments.capelin.experiment.createProvisioner +import org.opendc.experiments.capelin.experiment.processTrace +import org.opendc.experiments.capelin.model.CompositeWorkload +import org.opendc.experiments.capelin.model.OperationalPhenomena +import org.opendc.experiments.capelin.model.Topology +import org.opendc.experiments.capelin.model.Workload +import org.opendc.experiments.capelin.monitor.ParquetExperimentMonitor +import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader +import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader +import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader +import org.opendc.format.trace.PerformanceInterferenceModelReader +import org.opendc.harness.dsl.Experiment +import org.opendc.harness.dsl.anyOf +import org.opendc.simulator.utils.DelayControllerClockAdapter +import org.opendc.trace.core.EventTracer +import java.io.File +import java.util.concurrent.ConcurrentHashMap +import kotlin.random.Random + +/** + * A portfolio represents a collection of scenarios are tested for the work. + * + * @param name The name of the portfolio. + */ +public abstract class Portfolio(name: String) : Experiment(name) { + /** + * The logger for this portfolio instance. + */ + private val logger = KotlinLogging.logger {} + + /** + * The path to where the environments are located. + */ + private val environmentPath by anyOf(File("input/environments/")) + + /** + * The path to where the traces are located. + */ + private val tracePath by anyOf(File("input/traces/")) + + /** + * The path to where the output results should be written. + */ + private val outputPath by anyOf(File("output/")) + + /** + * The path to the original VM placements file. + */ + private val vmPlacements by anyOf(emptyMap()) + + /** + * The path to the performance interference model. + */ + private val performanceInterferenceModel by anyOf(null) + + /** + * The topology to test. + */ + public abstract val topology: Topology + + /** + * The workload to test. + */ + public abstract val workload: Workload + + /** + * The operational phenomenas to consider. + */ + public abstract val operationalPhenomena: OperationalPhenomena + + /** + * The allocation policies to consider. + */ + public abstract val allocationPolicy: String + + /** + * A map of trace readers. + */ + private val traceReaders = ConcurrentHashMap() + + /** + * Perform a single trial for this portfolio. + */ + @OptIn(ExperimentalCoroutinesApi::class) + override fun doRun(repeat: Int) { + val testScope = TestCoroutineScope() + val clock = DelayControllerClockAdapter(testScope) + val tracer = EventTracer(clock) + val seeder = Random(repeat) + val environment = Sc20ClusterEnvironmentReader(File(environmentPath, "${topology.name}.txt")) + + val chan = Channel(Channel.CONFLATED) + val allocationPolicy = createAllocationPolicy(seeder) + + val workload = workload + val workloadNames = if (workload is CompositeWorkload) { + workload.workloads.map { it.name } + } else { + listOf(workload.name) + } + + val rawReaders = workloadNames.map { workloadName -> + traceReaders.computeIfAbsent(workloadName) { + logger.info { "Loading trace $workloadName" } + Sc20RawParquetTraceReader(File(tracePath, workloadName)) + } + } + + val performanceInterferenceModel = performanceInterferenceModel + ?.takeIf { operationalPhenomena.hasInterference } + ?.construct(seeder) ?: emptyMap() + val trace = Sc20ParquetTraceReader(rawReaders, performanceInterferenceModel, workload, seeder.nextInt()) + + val monitor = ParquetExperimentMonitor( + outputPath, + "portfolio_id=$name/scenario_id=$id/run_id=$repeat", + 4096 + ) + + testScope.launch { + val (bareMetalProvisioner, scheduler) = createProvisioner( + this, + clock, + environment, + allocationPolicy, + tracer + ) + + val failureDomain = if (operationalPhenomena.failureFrequency > 0) { + logger.debug("ENABLING failures") + createFailureDomain( + this, + clock, + seeder.nextInt(), + operationalPhenomena.failureFrequency, + bareMetalProvisioner, + chan + ) + } else { + null + } + + attachMonitor(this, clock, scheduler, monitor) + processTrace( + this, + clock, + trace, + scheduler, + chan, + monitor + ) + + logger.debug("SUBMIT=${scheduler.submittedVms}") + logger.debug("FAIL=${scheduler.unscheduledVms}") + logger.debug("QUEUED=${scheduler.queuedVms}") + logger.debug("RUNNING=${scheduler.runningVms}") + logger.debug("FINISHED=${scheduler.finishedVms}") + + failureDomain?.cancel() + scheduler.terminate() + } + + try { + testScope.advanceUntilIdle() + } finally { + monitor.close() + } + } + + /** + * Create the [AllocationPolicy] instance to use for the trial. + */ + private fun createAllocationPolicy(seeder: Random): AllocationPolicy { + return when (allocationPolicy) { + "mem" -> AvailableMemoryAllocationPolicy() + "mem-inv" -> AvailableMemoryAllocationPolicy(true) + "core-mem" -> AvailableCoreMemoryAllocationPolicy() + "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true) + "active-servers" -> NumberOfActiveServersAllocationPolicy() + "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true) + "provisioned-cores" -> ProvisionedCoresAllocationPolicy() + "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true) + "random" -> RandomAllocationPolicy(Random(seeder.nextInt())) + "replay" -> ReplayAllocationPolicy(vmPlacements) + else -> throw IllegalArgumentException("Unknown policy $allocationPolicy") + } + } +} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt new file mode 100644 index 00000000..b6d3b30c --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin + +import org.opendc.experiments.capelin.model.OperationalPhenomena +import org.opendc.experiments.capelin.model.Topology +import org.opendc.experiments.capelin.model.Workload +import org.opendc.harness.dsl.anyOf + +/** + * A [Portfolio] that compares the original VM placements against our policies. + */ +public class ReplayPortfolio : Portfolio("replay") { + override val topology: Topology by anyOf( + Topology("base") + ) + + override val workload: Workload by anyOf( + Workload("solvinity", 1.0) + ) + + override val operationalPhenomena: OperationalPhenomena by anyOf( + OperationalPhenomena(failureFrequency = 0.0, hasInterference = false) + ) + + override val allocationPolicy: String by anyOf( + "replay", + "active-servers" + ) +} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt new file mode 100644 index 00000000..90840db8 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin + +import org.opendc.experiments.capelin.model.OperationalPhenomena +import org.opendc.experiments.capelin.model.Topology +import org.opendc.experiments.capelin.model.Workload +import org.opendc.harness.dsl.anyOf + +/** + * A [Portfolio] to perform a simple test run. + */ +public class TestPortfolio : Portfolio("test") { + override val topology: Topology by anyOf( + Topology("base") + ) + + override val workload: Workload by anyOf( + Workload("solvinity", 1.0) + ) + + override val operationalPhenomena: OperationalPhenomena by anyOf( + OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true) + ) + + override val allocationPolicy: String by anyOf("active-servers") +} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/OperationalPhenomena.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/OperationalPhenomena.kt new file mode 100644 index 00000000..b53b3617 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/OperationalPhenomena.kt @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.model + +/** + * Operation phenomena during experiments. + * + * @param failureFrequency The average time between failures in hours. + * @param hasInterference A flag to enable performance interference between VMs. + */ +public data class OperationalPhenomena(val failureFrequency: Double, val hasInterference: Boolean) diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Topology.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Topology.kt new file mode 100644 index 00000000..fe16a294 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Topology.kt @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.model + +/** + * The topology topology on which we test the workload. + */ +public data class Topology(val name: String) diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt new file mode 100644 index 00000000..c4ddd158 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.model + +public enum class SamplingStrategy { + REGULAR, + HPC, + HPC_LOAD +} + +/** + * A workload that is considered for a scenario. + */ +public open class Workload( + public open val name: String, + public val fraction: Double, + public val samplingStrategy: SamplingStrategy = SamplingStrategy.REGULAR +) + +/** + * A workload that is composed of multiple workloads. + */ +public class CompositeWorkload(override val name: String, public val workloads: List, public val totalLoad: Double) : + Workload(name, -1.0) diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt new file mode 100644 index 00000000..3c6637bf --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.monitor + +import org.opendc.compute.core.Server +import org.opendc.compute.core.virt.driver.VirtDriver +import org.opendc.compute.core.virt.service.VirtProvisioningEvent +import java.io.Closeable + +/** + * A monitor watches the events of an experiment. + */ +public interface ExperimentMonitor : Closeable { + /** + * This method is invoked when the state of a VM changes. + */ + public fun reportVmStateChange(time: Long, server: Server) {} + + /** + * This method is invoked when the state of a host changes. + */ + public fun reportHostStateChange( + time: Long, + driver: VirtDriver, + server: Server + ) { + } + + /** + * Report the power consumption of a host. + */ + public fun reportPowerConsumption(host: Server, draw: Double) {} + + /** + * This method is invoked for a host for each slice that is finishes. + */ + public fun reportHostSlice( + time: Long, + requestedBurst: Long, + grantedBurst: Long, + overcommissionedBurst: Long, + interferedBurst: Long, + cpuUsage: Double, + cpuDemand: Double, + numberOfDeployedImages: Int, + hostServer: Server, + duration: Long = 5 * 60 * 1000L + ) { + } + + /** + * This method is invoked for a provisioner event. + */ + public fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) {} +} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt new file mode 100644 index 00000000..a0d57656 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt @@ -0,0 +1,202 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.monitor + +import mu.KotlinLogging +import org.opendc.compute.core.Server +import org.opendc.compute.core.virt.driver.VirtDriver +import org.opendc.compute.core.virt.service.VirtProvisioningEvent +import org.opendc.experiments.capelin.telemetry.HostEvent +import org.opendc.experiments.capelin.telemetry.ProvisionerEvent +import org.opendc.experiments.capelin.telemetry.parquet.ParquetHostEventWriter +import org.opendc.experiments.capelin.telemetry.parquet.ParquetProvisionerEventWriter +import java.io.File + +/** + * The logger instance to use. + */ +private val logger = KotlinLogging.logger {} + +/** + * An [ExperimentMonitor] that logs the events to a Parquet file. + */ +public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: Int) : ExperimentMonitor { + private val hostWriter = ParquetHostEventWriter( + File(base, "host-metrics/$partition/data.parquet"), + bufferSize + ) + private val provisionerWriter = ParquetProvisionerEventWriter( + File(base, "provisioner-metrics/$partition/data.parquet"), + bufferSize + ) + private val currentHostEvent = mutableMapOf() + private var startTime = -1L + + override fun reportVmStateChange(time: Long, server: Server) { + if (startTime < 0) { + startTime = time + + // Update timestamp of initial event + currentHostEvent.replaceAll { _, v -> v.copy(timestamp = startTime) } + } + } + + override fun reportHostStateChange( + time: Long, + driver: VirtDriver, + server: Server + ) { + logger.debug { "Host ${server.uid} changed state ${server.state} [$time]" } + + val previousEvent = currentHostEvent[server] + + val roundedTime = previousEvent?.let { + val duration = time - it.timestamp + val k = 5 * 60 * 1000L // 5 min in ms + val rem = duration % k + + if (rem == 0L) { + time + } else { + it.timestamp + duration + k - rem + } + } ?: time + + reportHostSlice( + roundedTime, + 0, + 0, + 0, + 0, + 0.0, + 0.0, + 0, + server + ) + } + + private val lastPowerConsumption = mutableMapOf() + + override fun reportPowerConsumption(host: Server, draw: Double) { + lastPowerConsumption[host] = draw + } + + override fun reportHostSlice( + time: Long, + requestedBurst: Long, + grantedBurst: Long, + overcommissionedBurst: Long, + interferedBurst: Long, + cpuUsage: Double, + cpuDemand: Double, + numberOfDeployedImages: Int, + hostServer: Server, + duration: Long + ) { + val previousEvent = currentHostEvent[hostServer] + when { + previousEvent == null -> { + val event = HostEvent( + time, + 5 * 60 * 1000L, + hostServer, + numberOfDeployedImages, + requestedBurst, + grantedBurst, + overcommissionedBurst, + interferedBurst, + cpuUsage, + cpuDemand, + lastPowerConsumption[hostServer] ?: 200.0, + hostServer.flavor.cpuCount + ) + + currentHostEvent[hostServer] = event + } + previousEvent.timestamp == time -> { + val event = HostEvent( + time, + previousEvent.duration, + hostServer, + numberOfDeployedImages, + requestedBurst, + grantedBurst, + overcommissionedBurst, + interferedBurst, + cpuUsage, + cpuDemand, + lastPowerConsumption[hostServer] ?: 200.0, + hostServer.flavor.cpuCount + ) + + currentHostEvent[hostServer] = event + } + else -> { + hostWriter.write(previousEvent) + + val event = HostEvent( + time, + time - previousEvent.timestamp, + hostServer, + numberOfDeployedImages, + requestedBurst, + grantedBurst, + overcommissionedBurst, + interferedBurst, + cpuUsage, + cpuDemand, + lastPowerConsumption[hostServer] ?: 200.0, + hostServer.flavor.cpuCount + ) + + currentHostEvent[hostServer] = event + } + } + } + + override fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) { + provisionerWriter.write( + ProvisionerEvent( + time, + event.totalHostCount, + event.availableHostCount, + event.totalVmCount, + event.activeVmCount, + event.inactiveVmCount, + event.waitingVmCount, + event.failedVmCount + ) + ) + } + + override fun close() { + // Flush remaining events + for ((_, event) in currentHostEvent) { + hostWriter.write(event) + } + currentHostEvent.clear() + + hostWriter.close() + provisionerWriter.close() + } +} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/Event.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/Event.kt new file mode 100644 index 00000000..c29e116e --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/Event.kt @@ -0,0 +1,35 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.telemetry + +/** + * An event that occurs within the system. + */ +public abstract class Event(public val name: String) { + /** + * The time of occurrence of this event. + */ + public abstract val timestamp: Long +} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt new file mode 100644 index 00000000..e5e9d520 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.telemetry + +import org.opendc.compute.core.Server + +/** + * A periodic report of the host machine metrics. + */ +public data class HostEvent( + override val timestamp: Long, + public val duration: Long, + public val host: Server, + public val vmCount: Int, + public val requestedBurst: Long, + public val grantedBurst: Long, + public val overcommissionedBurst: Long, + public val interferedBurst: Long, + public val cpuUsage: Double, + public val cpuDemand: Double, + public val powerDraw: Double, + public val cores: Int +) : Event("host-metrics") diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/ProvisionerEvent.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/ProvisionerEvent.kt new file mode 100644 index 00000000..539c9bc9 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/ProvisionerEvent.kt @@ -0,0 +1,39 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.telemetry + +/** + * A periodic report of the provisioner's metrics. + */ +public data class ProvisionerEvent( + override val timestamp: Long, + public val totalHostCount: Int, + public val availableHostCount: Int, + public val totalVmCount: Int, + public val activeVmCount: Int, + public val inactiveVmCount: Int, + public val waitingVmCount: Int, + public val failedVmCount: Int +) : Event("provisioner-metrics") diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/RunEvent.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/RunEvent.kt new file mode 100644 index 00000000..6c8fc941 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/RunEvent.kt @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.telemetry + +import org.opendc.experiments.capelin.Portfolio + +/** + * A periodic report of the host machine metrics. + */ +public data class RunEvent( + val portfolio: Portfolio, + val repeat: Int, + override val timestamp: Long +) : Event("run") diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt new file mode 100644 index 00000000..427c453a --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.telemetry + +import org.opendc.compute.core.Server + +/** + * A periodic report of a virtual machine's metrics. + */ +public data class VmEvent( + override val timestamp: Long, + public val duration: Long, + public val vm: Server, + public val host: Server, + public val requestedBurst: Long, + public val grantedBurst: Long, + public val overcommissionedBurst: Long, + public val interferedBurst: Long, + public val cpuUsage: Double, + public val cpuDemand: Double +) : Event("vm-metrics") diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt new file mode 100644 index 00000000..38930ee5 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt @@ -0,0 +1,126 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.telemetry.parquet + +import mu.KotlinLogging +import org.apache.avro.Schema +import org.apache.avro.generic.GenericData +import org.apache.hadoop.fs.Path +import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.metadata.CompressionCodecName +import org.opendc.experiments.capelin.telemetry.Event +import java.io.Closeable +import java.io.File +import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.BlockingQueue +import kotlin.concurrent.thread + +/** + * The logging instance to use. + */ +private val logger = KotlinLogging.logger {} + +/** + * A writer that writes events in Parquet format. + */ +public open class ParquetEventWriter( + private val path: File, + private val schema: Schema, + private val converter: (T, GenericData.Record) -> Unit, + private val bufferSize: Int = 4096 +) : Runnable, Closeable { + /** + * The writer to write the Parquet file. + */ + private val writer = AvroParquetWriter.builder(Path(path.absolutePath)) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withPageSize(4 * 1024 * 1024) // For compression + .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) + .build() + + /** + * The queue of commands to process. + */ + private val queue: BlockingQueue = ArrayBlockingQueue(bufferSize) + + /** + * The thread that is responsible for writing the Parquet records. + */ + private val writerThread = thread(start = false, name = "parquet-writer") { run() } + + /** + * Write the specified metrics to the database. + */ + public fun write(event: T) { + queue.put(Action.Write(event)) + } + + /** + * Signal the writer to stop. + */ + public override fun close() { + queue.put(Action.Stop) + writerThread.join() + } + + init { + writerThread.start() + } + + /** + * Start the writer thread. + */ + override fun run() { + try { + loop@ while (true) { + val action = queue.take() + when (action) { + is Action.Stop -> break@loop + is Action.Write<*> -> { + val record = GenericData.Record(schema) + @Suppress("UNCHECKED_CAST") + converter(action.event as T, record) + writer.write(record) + } + } + } + } catch (e: Throwable) { + logger.error("Writer failed", e) + } finally { + writer.close() + } + } + + public sealed class Action { + /** + * A poison pill that will stop the writer thread. + */ + public object Stop : Action() + + /** + * Write the specified metrics to the database. + */ + public data class Write(val event: T) : Action() + } +} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt new file mode 100644 index 00000000..4a3e7963 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.telemetry.parquet + +import org.apache.avro.Schema +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericData +import org.opendc.experiments.capelin.telemetry.HostEvent +import java.io.File + +/** + * A Parquet event writer for [HostEvent]s. + */ +public class ParquetHostEventWriter(path: File, bufferSize: Int) : + ParquetEventWriter(path, schema, convert, bufferSize) { + + override fun toString(): String = "host-writer" + + public companion object { + private val convert: (HostEvent, GenericData.Record) -> Unit = { event, record -> + // record.put("portfolio_id", event.run.parent.parent.id) + // record.put("scenario_id", event.run.parent.id) + // record.put("run_id", event.run.id) + record.put("host_id", event.host.name) + record.put("state", event.host.state.name) + record.put("timestamp", event.timestamp) + record.put("duration", event.duration) + record.put("vm_count", event.vmCount) + record.put("requested_burst", event.requestedBurst) + record.put("granted_burst", event.grantedBurst) + record.put("overcommissioned_burst", event.overcommissionedBurst) + record.put("interfered_burst", event.interferedBurst) + record.put("cpu_usage", event.cpuUsage) + record.put("cpu_demand", event.cpuDemand) + record.put("power_draw", event.powerDraw * (1.0 / 12)) + record.put("cores", event.cores) + } + + private val schema: Schema = SchemaBuilder + .record("host_metrics") + .namespace("org.opendc.experiments.sc20") + .fields() + // .name("portfolio_id").type().intType().noDefault() + // .name("scenario_id").type().intType().noDefault() + // .name("run_id").type().intType().noDefault() + .name("timestamp").type().longType().noDefault() + .name("duration").type().longType().noDefault() + .name("host_id").type().stringType().noDefault() + .name("state").type().stringType().noDefault() + .name("vm_count").type().intType().noDefault() + .name("requested_burst").type().longType().noDefault() + .name("granted_burst").type().longType().noDefault() + .name("overcommissioned_burst").type().longType().noDefault() + .name("interfered_burst").type().longType().noDefault() + .name("cpu_usage").type().doubleType().noDefault() + .name("cpu_demand").type().doubleType().noDefault() + .name("power_draw").type().doubleType().noDefault() + .name("cores").type().intType().noDefault() + .endRecord() + } +} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetProvisionerEventWriter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetProvisionerEventWriter.kt new file mode 100644 index 00000000..8feff8d9 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetProvisionerEventWriter.kt @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.telemetry.parquet + +import org.apache.avro.Schema +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericData +import org.opendc.experiments.capelin.telemetry.ProvisionerEvent +import java.io.File + +/** + * A Parquet event writer for [ProvisionerEvent]s. + */ +public class ParquetProvisionerEventWriter(path: File, bufferSize: Int) : + ParquetEventWriter(path, schema, convert, bufferSize) { + + override fun toString(): String = "provisioner-writer" + + public companion object { + private val convert: (ProvisionerEvent, GenericData.Record) -> Unit = { event, record -> + record.put("timestamp", event.timestamp) + record.put("host_total_count", event.totalHostCount) + record.put("host_available_count", event.availableHostCount) + record.put("vm_total_count", event.totalVmCount) + record.put("vm_active_count", event.activeVmCount) + record.put("vm_inactive_count", event.inactiveVmCount) + record.put("vm_waiting_count", event.waitingVmCount) + record.put("vm_failed_count", event.failedVmCount) + } + + private val schema: Schema = SchemaBuilder + .record("provisioner_metrics") + .namespace("org.opendc.experiments.sc20") + .fields() + .name("timestamp").type().longType().noDefault() + .name("host_total_count").type().intType().noDefault() + .name("host_available_count").type().intType().noDefault() + .name("vm_total_count").type().intType().noDefault() + .name("vm_active_count").type().intType().noDefault() + .name("vm_inactive_count").type().intType().noDefault() + .name("vm_waiting_count").type().intType().noDefault() + .name("vm_failed_count").type().intType().noDefault() + .endRecord() + } +} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetRunEventWriter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetRunEventWriter.kt new file mode 100644 index 00000000..946410eb --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetRunEventWriter.kt @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.telemetry.parquet + +import org.apache.avro.Schema +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericData +import org.opendc.experiments.capelin.telemetry.RunEvent +import java.io.File + +/** + * A Parquet event writer for [RunEvent]s. + */ +public class ParquetRunEventWriter(path: File, bufferSize: Int) : + ParquetEventWriter(path, schema, convert, bufferSize) { + + override fun toString(): String = "run-writer" + + public companion object { + private val convert: (RunEvent, GenericData.Record) -> Unit = { event, record -> + val portfolio = event.portfolio + record.put("portfolio_name", portfolio.name) + record.put("scenario_id", portfolio.id) + record.put("run_id", event.repeat) + record.put("topology", portfolio.topology.name) + record.put("workload_name", portfolio.workload.name) + record.put("workload_fraction", portfolio.workload.fraction) + record.put("workload_sampler", portfolio.workload.samplingStrategy) + record.put("allocation_policy", portfolio.allocationPolicy) + record.put("failure_frequency", portfolio.operationalPhenomena.failureFrequency) + record.put("interference", portfolio.operationalPhenomena.hasInterference) + record.put("seed", event.repeat) + } + + private val schema: Schema = SchemaBuilder + .record("runs") + .namespace("org.opendc.experiments.sc20") + .fields() + .name("portfolio_name").type().stringType().noDefault() + .name("scenario_id").type().intType().noDefault() + .name("run_id").type().intType().noDefault() + .name("topology").type().stringType().noDefault() + .name("workload_name").type().stringType().noDefault() + .name("workload_fraction").type().doubleType().noDefault() + .name("workload_sampler").type().stringType().noDefault() + .name("allocation_policy").type().stringType().noDefault() + .name("failure_frequency").type().doubleType().noDefault() + .name("interference").type().booleanType().noDefault() + .name("seed").type().intType().noDefault() + .endRecord() + } +} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt new file mode 100644 index 00000000..6cfdae40 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace + +import org.opendc.compute.core.workload.VmWorkload +import org.opendc.compute.simulator.SimWorkloadImage +import org.opendc.experiments.capelin.model.CompositeWorkload +import org.opendc.experiments.capelin.model.Workload +import org.opendc.format.trace.TraceEntry +import org.opendc.format.trace.TraceReader +import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL +import org.opendc.simulator.compute.interference.PerformanceInterferenceModel +import java.util.TreeSet + +/** + * A [TraceReader] for the internal VM workload trace format. + * + * @param reader The internal trace reader to use. + * @param performanceInterferenceModel The performance model covering the workload in the VM trace. + * @param run The run to which this reader belongs. + */ +@OptIn(ExperimentalStdlibApi::class) +public class Sc20ParquetTraceReader( + rawReaders: List, + performanceInterferenceModel: Map, + workload: Workload, + seed: Int +) : TraceReader { + /** + * The iterator over the actual trace. + */ + private val iterator: Iterator> = + rawReaders + .map { it.read() } + .run { + if (workload is CompositeWorkload) { + this.zip(workload.workloads) + } else { + this.zip(listOf(workload)) + } + } + .map { sampleWorkload(it.first, workload, it.second, seed) } + .flatten() + .run { + // Apply performance interference model + if (performanceInterferenceModel.isEmpty()) + this + else { + map { entry -> + val image = entry.workload.image + val id = image.name + val relevantPerformanceInterferenceModelItems = + performanceInterferenceModel[id] ?: PerformanceInterferenceModel(TreeSet()) + + val newImage = + SimWorkloadImage( + image.uid, + image.name, + image.tags + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), + (image as SimWorkloadImage).workload + ) + val newWorkload = entry.workload.copy(image = newImage) + Sc20RawParquetTraceReader.TraceEntryImpl(entry.submissionTime, newWorkload) + } + } + } + .iterator() + + override fun hasNext(): Boolean = iterator.hasNext() + + override fun next(): TraceEntry = iterator.next() + + override fun close() {} +} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt new file mode 100644 index 00000000..d2560d62 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt @@ -0,0 +1,170 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace + +import mu.KotlinLogging +import org.apache.avro.generic.GenericData +import org.apache.hadoop.fs.Path +import org.apache.parquet.avro.AvroParquetReader +import org.opendc.compute.core.workload.VmWorkload +import org.opendc.compute.simulator.SimWorkloadImage +import org.opendc.core.User +import org.opendc.format.trace.TraceEntry +import org.opendc.format.trace.TraceReader +import org.opendc.simulator.compute.workload.SimTraceWorkload +import java.io.File +import java.util.UUID + +private val logger = KotlinLogging.logger {} + +/** + * A [TraceReader] for the internal VM workload trace format. + * + * @param path The directory of the traces. + */ +@OptIn(ExperimentalStdlibApi::class) +public class Sc20RawParquetTraceReader(private val path: File) { + /** + * Read the fragments into memory. + */ + private fun parseFragments(path: File): Map> { + val reader = AvroParquetReader.builder(Path(path.absolutePath, "trace.parquet")) + .disableCompatibility() + .build() + + val fragments = mutableMapOf>() + + return try { + while (true) { + val record = reader.read() ?: break + + val id = record["id"].toString() + val tick = record["time"] as Long + val duration = record["duration"] as Long + val cores = record["cores"] as Int + val cpuUsage = record["cpuUsage"] as Double + val flops = record["flops"] as Long + + val fragment = SimTraceWorkload.Fragment( + duration, + cpuUsage, + cores + ) + + fragments.getOrPut(id) { mutableListOf() }.add(fragment) + } + + fragments + } finally { + reader.close() + } + } + + /** + * Read the metadata into a workload. + */ + private fun parseMeta(path: File, fragments: Map>): List { + val metaReader = AvroParquetReader.builder(Path(path.absolutePath, "meta.parquet")) + .disableCompatibility() + .build() + + var counter = 0 + val entries = mutableListOf() + + return try { + while (true) { + val record = metaReader.read() ?: break + + val id = record["id"].toString() + if (!fragments.containsKey(id)) { + continue + } + + val submissionTime = record["submissionTime"] as Long + val endTime = record["endTime"] as Long + val maxCores = record["maxCores"] as Int + val requiredMemory = record["requiredMemory"] as Long + val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray()) + + val vmFragments = fragments.getValue(id).asSequence() + val totalLoad = vmFragments.sumByDouble { it.usage } * 5 * 60 // avg MHz * duration = MFLOPs + val vmWorkload = VmWorkload( + uid, + id, + UnnamedUser, + SimWorkloadImage( + uid, + id, + mapOf( + "submit-time" to submissionTime, + "end-time" to endTime, + "total-load" to totalLoad, + "cores" to maxCores, + "required-memory" to requiredMemory + ), + SimTraceWorkload(vmFragments) + ) + ) + entries.add(TraceEntryImpl(submissionTime, vmWorkload)) + } + + entries + } catch (e: Exception) { + e.printStackTrace() + throw e + } finally { + metaReader.close() + } + } + + /** + * The entries in the trace. + */ + private val entries: List + + init { + val fragments = parseFragments(path) + entries = parseMeta(path, fragments) + } + + /** + * Read the entries in the trace. + */ + public fun read(): List> = entries + + /** + * An unnamed user. + */ + private object UnnamedUser : User { + override val name: String = "" + override val uid: UUID = UUID.randomUUID() + } + + /** + * An entry in the trace. + */ + internal data class TraceEntryImpl( + override var submissionTime: Long, + override val workload: VmWorkload + ) : TraceEntry +} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt new file mode 100644 index 00000000..12705c80 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt @@ -0,0 +1,305 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace + +import mu.KotlinLogging +import org.apache.avro.generic.GenericData +import org.apache.hadoop.fs.Path +import org.apache.parquet.avro.AvroParquetReader +import org.apache.parquet.filter2.compat.FilterCompat +import org.apache.parquet.filter2.predicate.FilterApi +import org.apache.parquet.filter2.predicate.Statistics +import org.apache.parquet.filter2.predicate.UserDefinedPredicate +import org.apache.parquet.io.api.Binary +import org.opendc.compute.core.workload.VmWorkload +import org.opendc.compute.simulator.SimWorkloadImage +import org.opendc.core.User +import org.opendc.format.trace.TraceEntry +import org.opendc.format.trace.TraceReader +import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL +import org.opendc.simulator.compute.interference.PerformanceInterferenceModel +import org.opendc.simulator.compute.workload.SimTraceWorkload +import java.io.File +import java.io.Serializable +import java.util.SortedSet +import java.util.TreeSet +import java.util.UUID +import java.util.concurrent.ArrayBlockingQueue +import kotlin.concurrent.thread +import kotlin.random.Random + +private val logger = KotlinLogging.logger {} + +/** + * A [TraceReader] for the internal VM workload trace format that streams workloads on the fly. + * + * @param traceFile The directory of the traces. + * @param performanceInterferenceModel The performance model covering the workload in the VM trace. + */ +@OptIn(ExperimentalStdlibApi::class) +public class Sc20StreamingParquetTraceReader( + traceFile: File, + performanceInterferenceModel: PerformanceInterferenceModel, + selectedVms: List, + random: Random +) : TraceReader { + /** + * The internal iterator to use for this reader. + */ + private val iterator: Iterator> + + /** + * The intermediate buffer to store the read records in. + */ + private val queue = ArrayBlockingQueue>(1024) + + /** + * An optional filter for filtering the selected VMs + */ + private val filter = + if (selectedVms.isEmpty()) + null + else + FilterCompat.get( + FilterApi.userDefined( + FilterApi.binaryColumn("id"), + SelectedVmFilter( + TreeSet(selectedVms) + ) + ) + ) + + /** + * A poisonous fragment. + */ + private val poison = Pair("\u0000", SimTraceWorkload.Fragment(0, 0.0, 0)) + + /** + * The thread to read the records in. + */ + private val readerThread = thread(start = true, name = "sc20-reader") { + val reader = AvroParquetReader.builder(Path(traceFile.absolutePath, "trace.parquet")) + .disableCompatibility() + .run { if (filter != null) withFilter(filter) else this } + .build() + + try { + while (true) { + val record = reader.read() + + if (record == null) { + queue.put(poison) + break + } + + val id = record["id"].toString() + val tick = record["time"] as Long + val duration = record["duration"] as Long + val cores = record["cores"] as Int + val cpuUsage = record["cpuUsage"] as Double + val flops = record["flops"] as Long + + val fragment = SimTraceWorkload.Fragment( + duration, + cpuUsage, + cores + ) + + queue.put(id to fragment) + } + } catch (e: InterruptedException) { + // Do not rethrow this + } finally { + reader.close() + } + } + + /** + * Fill the buffers with the VMs + */ + private fun pull(buffers: Map>>) { + if (!hasNext) { + return + } + + val fragments = mutableListOf>() + queue.drainTo(fragments) + + for ((id, fragment) in fragments) { + if (id == poison.first) { + hasNext = false + return + } + buffers[id]?.forEach { it.add(fragment) } + } + } + + /** + * A flag to indicate whether the reader has more entries. + */ + private var hasNext: Boolean = true + + /** + * Initialize the reader. + */ + init { + val takenIds = mutableSetOf() + val entries = mutableMapOf() + val buffers = mutableMapOf>>() + + val metaReader = AvroParquetReader.builder(Path(traceFile.absolutePath, "meta.parquet")) + .disableCompatibility() + .run { if (filter != null) withFilter(filter) else this } + .build() + + while (true) { + val record = metaReader.read() ?: break + val id = record["id"].toString() + entries[id] = record + } + + metaReader.close() + + val selection = if (selectedVms.isEmpty()) entries.keys else selectedVms + + // Create the entry iterator + iterator = selection.asSequence() + .mapNotNull { entries[it] } + .mapIndexed { index, record -> + val id = record["id"].toString() + val submissionTime = record["submissionTime"] as Long + val endTime = record["endTime"] as Long + val maxCores = record["maxCores"] as Int + val requiredMemory = record["requiredMemory"] as Long + val uid = UUID.nameUUIDFromBytes("$id-$index".toByteArray()) + + assert(uid !in takenIds) + takenIds += uid + + logger.info("Processing VM $id") + + val internalBuffer = mutableListOf() + val externalBuffer = mutableListOf() + buffers.getOrPut(id) { mutableListOf() }.add(externalBuffer) + val fragments = sequence { + var time = submissionTime + repeat@ while (true) { + if (externalBuffer.isEmpty()) { + if (hasNext) { + pull(buffers) + continue + } else { + break + } + } + + internalBuffer.addAll(externalBuffer) + externalBuffer.clear() + + for (fragment in internalBuffer) { + yield(fragment) + + time += fragment.duration + if (time >= endTime) { + break@repeat + } + } + + internalBuffer.clear() + } + + buffers.remove(id) + } + val relevantPerformanceInterferenceModelItems = + PerformanceInterferenceModel( + performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSortedSet(), + Random(random.nextInt()) + ) + val vmWorkload = VmWorkload( + uid, + "VM Workload $id", + UnnamedUser, + SimWorkloadImage( + uid, + id, + mapOf( + IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems, + "cores" to maxCores, + "required-memory" to requiredMemory + ), + SimTraceWorkload(fragments), + ) + ) + + TraceEntryImpl( + submissionTime, + vmWorkload + ) + } + .sortedBy { it.submissionTime } + .toList() + .iterator() + } + + override fun hasNext(): Boolean = iterator.hasNext() + + override fun next(): TraceEntry = iterator.next() + + override fun close() { + readerThread.interrupt() + } + + private class SelectedVmFilter(val selectedVms: SortedSet) : UserDefinedPredicate(), Serializable { + override fun keep(value: Binary?): Boolean = value != null && selectedVms.contains(value.toStringUsingUTF8()) + + override fun canDrop(statistics: Statistics): Boolean { + val min = statistics.min + val max = statistics.max + + return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isEmpty() + } + + override fun inverseCanDrop(statistics: Statistics): Boolean { + val min = statistics.min + val max = statistics.max + + return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isNotEmpty() + } + } + + /** + * An unnamed user. + */ + private object UnnamedUser : User { + override val name: String = "" + override val uid: UUID = UUID.randomUUID() + } + + /** + * An entry in the trace. + */ + private data class TraceEntryImpl( + override var submissionTime: Long, + override val workload: VmWorkload + ) : TraceEntry +} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt new file mode 100644 index 00000000..7713c06f --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt @@ -0,0 +1,621 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace + +import com.github.ajalt.clikt.core.CliktCommand +import com.github.ajalt.clikt.parameters.arguments.argument +import com.github.ajalt.clikt.parameters.groups.OptionGroup +import com.github.ajalt.clikt.parameters.groups.groupChoice +import com.github.ajalt.clikt.parameters.options.convert +import com.github.ajalt.clikt.parameters.options.default +import com.github.ajalt.clikt.parameters.options.defaultLazy +import com.github.ajalt.clikt.parameters.options.option +import com.github.ajalt.clikt.parameters.options.required +import com.github.ajalt.clikt.parameters.options.split +import com.github.ajalt.clikt.parameters.types.file +import com.github.ajalt.clikt.parameters.types.long +import me.tongfei.progressbar.ProgressBar +import org.apache.avro.Schema +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericData +import org.apache.hadoop.fs.Path +import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.ParquetWriter +import org.apache.parquet.hadoop.metadata.CompressionCodecName +import org.opendc.format.trace.sc20.Sc20VmPlacementReader +import java.io.BufferedReader +import java.io.File +import java.io.FileReader +import java.util.Random +import kotlin.math.max +import kotlin.math.min + +/** + * Represents the command for converting traces + */ +public class TraceConverterCli : CliktCommand(name = "trace-converter") { + /** + * The directory where the trace should be stored. + */ + private val outputPath by option("-O", "--output", help = "path to store the trace") + .file(canBeFile = false, mustExist = false) + .defaultLazy { File("output") } + + /** + * The directory where the input trace is located. + */ + private val inputPath by argument("input", help = "path to the input trace") + .file(canBeFile = false) + + /** + * The input type of the trace. + */ + private val type by option("-t", "--type", help = "input type of trace").groupChoice( + "solvinity" to SolvinityConversion(), + "bitbrains" to BitbrainsConversion(), + "azure" to AzureConversion() + ) + + override fun run() { + val metaSchema = SchemaBuilder + .record("meta") + .namespace("org.opendc.format.sc20") + .fields() + .name("id").type().stringType().noDefault() + .name("submissionTime").type().longType().noDefault() + .name("endTime").type().longType().noDefault() + .name("maxCores").type().intType().noDefault() + .name("requiredMemory").type().longType().noDefault() + .endRecord() + val schema = SchemaBuilder + .record("trace") + .namespace("org.opendc.format.sc20") + .fields() + .name("id").type().stringType().noDefault() + .name("time").type().longType().noDefault() + .name("duration").type().longType().noDefault() + .name("cores").type().intType().noDefault() + .name("cpuUsage").type().doubleType().noDefault() + .name("flops").type().longType().noDefault() + .endRecord() + + val metaParquet = File(outputPath, "meta.parquet") + val traceParquet = File(outputPath, "trace.parquet") + + if (metaParquet.exists()) { + metaParquet.delete() + } + if (traceParquet.exists()) { + traceParquet.delete() + } + + val metaWriter = AvroParquetWriter.builder(Path(metaParquet.toURI())) + .withSchema(metaSchema) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withPageSize(4 * 1024 * 1024) // For compression + .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) + .build() + + val writer = AvroParquetWriter.builder(Path(traceParquet.toURI())) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withPageSize(4 * 1024 * 1024) // For compression + .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) + .build() + + try { + val type = type ?: throw IllegalArgumentException("Invalid trace conversion") + val allFragments = type.read(inputPath, metaSchema, metaWriter) + allFragments.sortWith(compareBy { it.tick }.thenBy { it.id }) + + for (fragment in allFragments) { + val record = GenericData.Record(schema) + record.put("id", fragment.id) + record.put("time", fragment.tick) + record.put("duration", fragment.duration) + record.put("cores", fragment.cores) + record.put("cpuUsage", fragment.usage) + record.put("flops", fragment.flops) + + writer.write(record) + } + } finally { + writer.close() + metaWriter.close() + } + } +} + +/** + * The supported trace conversions. + */ +public sealed class TraceConversion(name: String) : OptionGroup(name) { + /** + * Read the fragments of the trace. + */ + public abstract fun read( + traceDirectory: File, + metaSchema: Schema, + metaWriter: ParquetWriter + ): MutableList +} + +public class SolvinityConversion : TraceConversion("Solvinity") { + private val clusters by option() + .split(",") + + private val vmPlacements by option("--vm-placements", help = "file containing the VM placements") + .file(canBeDir = false) + .convert { it.inputStream().buffered().use { Sc20VmPlacementReader(it).construct() } } + .required() + + override fun read( + traceDirectory: File, + metaSchema: Schema, + metaWriter: ParquetWriter + ): MutableList { + val clusters = clusters?.toSet() ?: emptySet() + val timestampCol = 0 + val cpuUsageCol = 1 + val coreCol = 12 + val provisionedMemoryCol = 20 + val traceInterval = 5 * 60 * 1000L + + // Identify start time of the entire trace + var minTimestamp = Long.MAX_VALUE + traceDirectory.walk() + .filterNot { it.isDirectory } + .filter { it.extension == "csv" || it.extension == "txt" } + .toList() + .forEach file@{ vmFile -> + BufferedReader(FileReader(vmFile)).use { reader -> + reader.lineSequence() + .chunked(128) + .forEach { lines -> + for (line in lines) { + // Ignore comments in the trace + if (line.startsWith("#") || line.isBlank()) { + continue + } + + val vmId = vmFile.name + + // Check if VM in topology + val clusterName = vmPlacements[vmId] + if (clusterName == null || !clusters.contains(clusterName)) { + continue + } + + val values = line.split("\t") + val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L + + if (timestamp < minTimestamp) { + minTimestamp = timestamp + } + return@file + } + } + } + } + + println("Start of trace at $minTimestamp") + + val allFragments = mutableListOf() + + val begin = 15 * 24 * 60 * 60 * 1000L + val end = 45 * 24 * 60 * 60 * 1000L + + traceDirectory.walk() + .filterNot { it.isDirectory } + .filter { it.extension == "csv" || it.extension == "txt" } + .toList() + .forEach { vmFile -> + println(vmFile) + + var vmId = "" + var maxCores = -1 + var requiredMemory = -1L + var cores: Int + var minTime = Long.MAX_VALUE + + val flopsFragments = sequence { + var last: Fragment? = null + + BufferedReader(FileReader(vmFile)).use { reader -> + reader.lineSequence() + .chunked(128) + .forEach { lines -> + for (line in lines) { + // Ignore comments in the trace + if (line.startsWith("#") || line.isBlank()) { + continue + } + + val values = line.split("\t") + + vmId = vmFile.name + + // Check if VM in topology + val clusterName = vmPlacements[vmId] + if (clusterName == null || !clusters.contains(clusterName)) { + continue + } + + val timestamp = + (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - minTimestamp + if (begin > timestamp || timestamp > end) { + continue + } + + cores = values[coreCol].trim().toInt() + requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) + maxCores = max(maxCores, cores) + minTime = min(minTime, timestamp) + val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz + requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) + maxCores = max(maxCores, cores) + + val flops: Long = (cpuUsage * 5 * 60).toLong() + + last = if (last != null && last!!.flops == 0L && flops == 0L) { + val oldFragment = last!! + Fragment( + vmId, + oldFragment.tick, + oldFragment.flops + flops, + oldFragment.duration + traceInterval, + cpuUsage, + cores + ) + } else { + val fragment = + Fragment( + vmId, + timestamp, + flops, + traceInterval, + cpuUsage, + cores + ) + if (last != null) { + yield(last!!) + } + fragment + } + } + } + } + + if (last != null) { + yield(last!!) + } + } + + var maxTime = Long.MIN_VALUE + flopsFragments.filter { it.tick in begin until end }.forEach { fragment -> + allFragments.add(fragment) + maxTime = max(maxTime, fragment.tick) + } + + if (minTime in begin until end) { + val metaRecord = GenericData.Record(metaSchema) + metaRecord.put("id", vmId) + metaRecord.put("submissionTime", minTime) + metaRecord.put("endTime", maxTime) + metaRecord.put("maxCores", maxCores) + metaRecord.put("requiredMemory", requiredMemory) + metaWriter.write(metaRecord) + } + } + + return allFragments + } +} + +/** + * Conversion of the Bitbrains public trace. + */ +public class BitbrainsConversion : TraceConversion("Bitbrains") { + override fun read( + traceDirectory: File, + metaSchema: Schema, + metaWriter: ParquetWriter + ): MutableList { + val timestampCol = 0 + val cpuUsageCol = 3 + val coreCol = 1 + val provisionedMemoryCol = 5 + val traceInterval = 5 * 60 * 1000L + + val allFragments = mutableListOf() + + traceDirectory.walk() + .filterNot { it.isDirectory } + .filter { it.extension == "csv" || it.extension == "txt" } + .toList() + .forEach { vmFile -> + println(vmFile) + + var vmId = "" + var maxCores = -1 + var requiredMemory = -1L + var cores: Int + var minTime = Long.MAX_VALUE + + val flopsFragments = sequence { + var last: Fragment? = null + + BufferedReader(FileReader(vmFile)).use { reader -> + reader.lineSequence() + .drop(1) + .chunked(128) + .forEach { lines -> + for (line in lines) { + // Ignore comments in the trace + if (line.startsWith("#") || line.isBlank()) { + continue + } + + val values = line.split(";\t") + + vmId = vmFile.name + + val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L + + cores = values[coreCol].trim().toInt() + val provisionedMemory = values[provisionedMemoryCol].trim().toDouble() // KB + requiredMemory = max(requiredMemory, (provisionedMemory / 1000).toLong()) + maxCores = max(maxCores, cores) + minTime = min(minTime, timestamp) + val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz + + val flops: Long = (cpuUsage * 5 * 60).toLong() + + last = if (last != null && last!!.flops == 0L && flops == 0L) { + val oldFragment = last!! + Fragment( + vmId, + oldFragment.tick, + oldFragment.flops + flops, + oldFragment.duration + traceInterval, + cpuUsage, + cores + ) + } else { + val fragment = + Fragment( + vmId, + timestamp, + flops, + traceInterval, + cpuUsage, + cores + ) + if (last != null) { + yield(last!!) + } + fragment + } + } + } + } + + if (last != null) { + yield(last!!) + } + } + + var maxTime = Long.MIN_VALUE + flopsFragments.forEach { fragment -> + allFragments.add(fragment) + maxTime = max(maxTime, fragment.tick) + } + + val metaRecord = GenericData.Record(metaSchema) + metaRecord.put("id", vmId) + metaRecord.put("submissionTime", minTime) + metaRecord.put("endTime", maxTime) + metaRecord.put("maxCores", maxCores) + metaRecord.put("requiredMemory", requiredMemory) + metaWriter.write(metaRecord) + } + + return allFragments + } +} + +/** + * Conversion of the Azure public VM trace. + */ +public class AzureConversion : TraceConversion("Azure") { + private val seed by option(help = "seed for trace sampling") + .long() + .default(0) + + override fun read( + traceDirectory: File, + metaSchema: Schema, + metaWriter: ParquetWriter + ): MutableList { + val random = Random(seed) + val fraction = 0.01 + + // Read VM table + val vmIdTableCol = 0 + val coreTableCol = 9 + val provisionedMemoryTableCol = 10 + + var vmId: String + var cores: Int + var requiredMemory: Long + + val vmIds = mutableSetOf() + val vmIdToMetadata = mutableMapOf() + + BufferedReader(FileReader(File(traceDirectory, "vmtable.csv"))).use { reader -> + reader.lineSequence() + .chunked(1024) + .forEach { lines -> + for (line in lines) { + // Ignore comments in the trace + if (line.startsWith("#") || line.isBlank()) { + continue + } + // Sample only a fraction of the VMs + if (random.nextDouble() > fraction) { + continue + } + + val values = line.split(",") + + // Exclude VMs with a large number of cores (not specified exactly) + if (values[coreTableCol].contains(">")) { + continue + } + + vmId = values[vmIdTableCol].trim() + cores = values[coreTableCol].trim().toInt() + requiredMemory = values[provisionedMemoryTableCol].trim().toInt() * 1_000L // GB -> MB + + vmIds.add(vmId) + vmIdToMetadata[vmId] = VmInfo(cores, requiredMemory, Long.MAX_VALUE, -1L) + } + } + } + + // Read VM metric reading files + val timestampCol = 0 + val vmIdCol = 1 + val cpuUsageCol = 4 + val traceInterval = 5 * 60 * 1000L + + val vmIdToFragments = mutableMapOf>() + val vmIdToLastFragment = mutableMapOf() + val allFragments = mutableListOf() + + for (i in ProgressBar.wrap((1..195).toList(), "Reading Trace")) { + val readingsFile = File(File(traceDirectory, "readings"), "readings-$i.csv") + var timestamp: Long + var cpuUsage: Double + + BufferedReader(FileReader(readingsFile)).use { reader -> + reader.lineSequence() + .chunked(128) + .forEach { lines -> + for (line in lines) { + // Ignore comments in the trace + if (line.startsWith("#") || line.isBlank()) { + continue + } + + val values = line.split(",") + vmId = values[vmIdCol].trim() + + // Ignore readings for VMs not in the sample + if (!vmIds.contains(vmId)) { + continue + } + + timestamp = values[timestampCol].trim().toLong() * 1000L + vmIdToMetadata[vmId]!!.minTime = min(vmIdToMetadata[vmId]!!.minTime, timestamp) + cpuUsage = values[cpuUsageCol].trim().toDouble() * 3_000 // MHz + vmIdToMetadata[vmId]!!.maxTime = max(vmIdToMetadata[vmId]!!.maxTime, timestamp) + + val flops: Long = (cpuUsage * 5 * 60).toLong() + val lastFragment = vmIdToLastFragment[vmId] + + vmIdToLastFragment[vmId] = + if (lastFragment != null && lastFragment.flops == 0L && flops == 0L) { + Fragment( + vmId, + lastFragment.tick, + lastFragment.flops + flops, + lastFragment.duration + traceInterval, + cpuUsage, + vmIdToMetadata[vmId]!!.cores + ) + } else { + val fragment = + Fragment( + vmId, + timestamp, + flops, + traceInterval, + cpuUsage, + vmIdToMetadata[vmId]!!.cores + ) + if (lastFragment != null) { + if (vmIdToFragments[vmId] == null) { + vmIdToFragments[vmId] = mutableListOf() + } + vmIdToFragments[vmId]!!.add(lastFragment) + allFragments.add(lastFragment) + } + fragment + } + } + } + } + } + + for (entry in vmIdToLastFragment) { + if (entry.value != null) { + if (vmIdToFragments[entry.key] == null) { + vmIdToFragments[entry.key] = mutableListOf() + } + vmIdToFragments[entry.key]!!.add(entry.value!!) + } + } + + println("Read ${vmIdToLastFragment.size} VMs") + + for (entry in vmIdToMetadata) { + val metaRecord = GenericData.Record(metaSchema) + metaRecord.put("id", entry.key) + metaRecord.put("submissionTime", entry.value.minTime) + metaRecord.put("endTime", entry.value.maxTime) + println("${entry.value.minTime} - ${entry.value.maxTime}") + metaRecord.put("maxCores", entry.value.cores) + metaRecord.put("requiredMemory", entry.value.requiredMemory) + metaWriter.write(metaRecord) + } + + return allFragments + } +} + +public data class Fragment( + public val id: String, + public val tick: Long, + public val flops: Long, + public val duration: Long, + public val usage: Double, + public val cores: Int +) + +public class VmInfo(public val cores: Int, public val requiredMemory: Long, public var minTime: Long, public var maxTime: Long) + +/** + * A script to convert a trace in text format into a Parquet trace. + */ +public fun main(args: Array): Unit = TraceConverterCli().main(args) diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt new file mode 100644 index 00000000..4d9b9df1 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt @@ -0,0 +1,210 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace + +import mu.KotlinLogging +import org.opendc.compute.core.workload.VmWorkload +import org.opendc.compute.simulator.SimWorkloadImage +import org.opendc.experiments.capelin.model.CompositeWorkload +import org.opendc.experiments.capelin.model.SamplingStrategy +import org.opendc.experiments.capelin.model.Workload +import org.opendc.format.trace.TraceEntry +import java.util.* +import kotlin.random.Random + +private val logger = KotlinLogging.logger {} + +/** + * Sample the workload for the specified [run]. + */ +public fun sampleWorkload( + trace: List>, + workload: Workload, + subWorkload: Workload, + seed: Int +): List> { + return when { + workload is CompositeWorkload -> sampleRegularWorkload(trace, workload, subWorkload, seed) + workload.samplingStrategy == SamplingStrategy.HPC -> + sampleHpcWorkload(trace, workload, seed, sampleOnLoad = false) + workload.samplingStrategy == SamplingStrategy.HPC_LOAD -> + sampleHpcWorkload(trace, workload, seed, sampleOnLoad = true) + else -> + sampleRegularWorkload(trace, workload, workload, seed) + } +} + +/** + * Sample a regular (non-HPC) workload. + */ +public fun sampleRegularWorkload( + trace: List>, + workload: Workload, + subWorkload: Workload, + seed: Int +): List> { + val fraction = subWorkload.fraction + + val shuffled = trace.shuffled(Random(seed)) + val res = mutableListOf>() + val totalLoad = if (workload is CompositeWorkload) { + workload.totalLoad + } else { + shuffled.sumByDouble { it.workload.image.tags.getValue("total-load") as Double } + } + var currentLoad = 0.0 + + for (entry in shuffled) { + val entryLoad = entry.workload.image.tags.getValue("total-load") as Double + if ((currentLoad + entryLoad) / totalLoad > fraction) { + break + } + + currentLoad += entryLoad + res += entry + } + + logger.info { "Sampled ${trace.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } + + return res +} + +/** + * Sample a HPC workload. + */ +public fun sampleHpcWorkload( + trace: List>, + workload: Workload, + seed: Int, + sampleOnLoad: Boolean +): List> { + val pattern = Regex("^vm__workload__(ComputeNode|cn).*") + val random = Random(seed) + + val fraction = workload.fraction + val (hpc, nonHpc) = trace.partition { entry -> + val name = entry.workload.image.name + name.matches(pattern) + } + + val hpcSequence = generateSequence(0) { it + 1 } + .map { index -> + val res = mutableListOf>() + hpc.mapTo(res) { sample(it, index) } + res.shuffle(random) + res + } + .flatten() + + val nonHpcSequence = generateSequence(0) { it + 1 } + .map { index -> + val res = mutableListOf>() + nonHpc.mapTo(res) { sample(it, index) } + res.shuffle(random) + res + } + .flatten() + + logger.debug { "Found ${hpc.size} HPC workloads and ${nonHpc.size} non-HPC workloads" } + + val totalLoad = if (workload is CompositeWorkload) { + workload.totalLoad + } else { + trace.sumByDouble { it.workload.image.tags.getValue("total-load") as Double } + } + + logger.debug { "Total trace load: $totalLoad" } + var hpcCount = 0 + var hpcLoad = 0.0 + var nonHpcCount = 0 + var nonHpcLoad = 0.0 + + val res = mutableListOf>() + + if (sampleOnLoad) { + var currentLoad = 0.0 + for (entry in hpcSequence) { + val entryLoad = entry.workload.image.tags.getValue("total-load") as Double + if ((currentLoad + entryLoad) / totalLoad > fraction) { + break + } + + hpcLoad += entryLoad + hpcCount += 1 + currentLoad += entryLoad + res += entry + } + + for (entry in nonHpcSequence) { + val entryLoad = entry.workload.image.tags.getValue("total-load") as Double + if ((currentLoad + entryLoad) / totalLoad > 1) { + break + } + + nonHpcLoad += entryLoad + nonHpcCount += 1 + currentLoad += entryLoad + res += entry + } + } else { + hpcSequence + .take((fraction * trace.size).toInt()) + .forEach { entry -> + hpcLoad += entry.workload.image.tags.getValue("total-load") as Double + hpcCount += 1 + res.add(entry) + } + + nonHpcSequence + .take(((1 - fraction) * trace.size).toInt()) + .forEach { entry -> + nonHpcLoad += entry.workload.image.tags.getValue("total-load") as Double + nonHpcCount += 1 + res.add(entry) + } + } + + logger.debug { "HPC $hpcCount (load $hpcLoad) and non-HPC $nonHpcCount (load $nonHpcLoad)" } + logger.debug { "Total sampled load: ${hpcLoad + nonHpcLoad}" } + logger.info { "Sampled ${trace.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } + + return res +} + +/** + * Sample a random trace entry. + */ +private fun sample(entry: TraceEntry, i: Int): TraceEntry { + val id = UUID.nameUUIDFromBytes("${entry.workload.image.uid}-$i".toByteArray()) + val image = SimWorkloadImage( + id, + entry.workload.image.name, + entry.workload.image.tags, + (entry.workload.image as SimWorkloadImage).workload + ) + val vmWorkload = entry.workload.copy(uid = id, image = image, name = entry.workload.name) + return VmTraceEntry(vmWorkload, entry.submissionTime) +} + +private class VmTraceEntry(override val workload: VmWorkload, override val submissionTime: Long) : + TraceEntry diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml new file mode 100644 index 00000000..d1c01b8e --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml @@ -0,0 +1,49 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt new file mode 100644 index 00000000..6a0796f6 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -0,0 +1,256 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin + +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.cancel +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.launch +import kotlinx.coroutines.test.TestCoroutineScope +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll +import org.opendc.compute.core.Server +import org.opendc.compute.core.workload.VmWorkload +import org.opendc.compute.simulator.SimVirtProvisioningService +import org.opendc.compute.simulator.allocation.AvailableCoreMemoryAllocationPolicy +import org.opendc.experiments.capelin.experiment.attachMonitor +import org.opendc.experiments.capelin.experiment.createFailureDomain +import org.opendc.experiments.capelin.experiment.createProvisioner +import org.opendc.experiments.capelin.experiment.processTrace +import org.opendc.experiments.capelin.model.Workload +import org.opendc.experiments.capelin.monitor.ExperimentMonitor +import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader +import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader +import org.opendc.format.environment.EnvironmentReader +import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader +import org.opendc.format.trace.TraceReader +import org.opendc.simulator.utils.DelayControllerClockAdapter +import org.opendc.trace.core.EventTracer +import java.io.File +import java.time.Clock + +/** + * An integration test suite for the SC20 experiments. + */ +@OptIn(ExperimentalCoroutinesApi::class) +class CapelinIntegrationTest { + /** + * The [TestCoroutineScope] to use. + */ + private lateinit var testScope: TestCoroutineScope + + /** + * The simulation clock to use. + */ + private lateinit var clock: Clock + + /** + * The monitor used to keep track of the metrics. + */ + private lateinit var monitor: TestExperimentReporter + + /** + * Setup the experimental environment. + */ + @BeforeEach + fun setUp() { + testScope = TestCoroutineScope() + clock = DelayControllerClockAdapter(testScope) + + monitor = TestExperimentReporter() + } + + /** + * Tear down the experimental environment. + */ + @AfterEach + fun tearDown() = testScope.cleanupTestCoroutines() + + @Test + fun testLarge() { + val failures = false + val seed = 0 + val chan = Channel(Channel.CONFLATED) + val allocationPolicy = AvailableCoreMemoryAllocationPolicy() + val traceReader = createTestTraceReader() + val environmentReader = createTestEnvironmentReader() + lateinit var scheduler: SimVirtProvisioningService + val tracer = EventTracer(clock) + + testScope.launch { + val res = createProvisioner( + this, + clock, + environmentReader, + allocationPolicy, + tracer + ) + val bareMetalProvisioner = res.first + scheduler = res.second + + val failureDomain = if (failures) { + println("ENABLING failures") + createFailureDomain( + this, + clock, + seed, + 24.0 * 7, + bareMetalProvisioner, + chan + ) + } else { + null + } + + attachMonitor(this, clock, scheduler, monitor) + processTrace( + this, + clock, + traceReader, + scheduler, + chan, + monitor + ) + + println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") + + failureDomain?.cancel() + scheduler.terminate() + monitor.close() + } + + runSimulation() + + // Note that these values have been verified beforehand + assertAll( + { assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") }, + { assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") }, + { assertEquals(1684849230562, monitor.totalRequestedBurst) }, + { assertEquals(447612683996, monitor.totalGrantedBurst) }, + { assertEquals(1219535757406, monitor.totalOvercommissionedBurst) }, + { assertEquals(0, monitor.totalInterferedBurst) } + ) + } + + @Test + fun testSmall() { + val seed = 1 + val chan = Channel(Channel.CONFLATED) + val allocationPolicy = AvailableCoreMemoryAllocationPolicy() + val traceReader = createTestTraceReader(0.5, seed) + val environmentReader = createTestEnvironmentReader("single") + lateinit var scheduler: SimVirtProvisioningService + val tracer = EventTracer(clock) + + testScope.launch { + val res = createProvisioner( + this, + clock, + environmentReader, + allocationPolicy, + tracer + ) + scheduler = res.second + + attachMonitor(this, clock, scheduler, monitor) + processTrace( + this, + clock, + traceReader, + scheduler, + chan, + monitor + ) + + println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") + + scheduler.terminate() + monitor.close() + } + + runSimulation() + + // Note that these values have been verified beforehand + assertAll( + { assertEquals(705128393966, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, + { assertEquals(173489747029, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, + { assertEquals(526858997740, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, + { assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } } + ) + } + + /** + * Run the simulation. + */ + private fun runSimulation() = testScope.advanceUntilIdle() + + /** + * Obtain the trace reader for the test. + */ + private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader { + return Sc20ParquetTraceReader( + listOf(Sc20RawParquetTraceReader(File("src/test/resources/trace"))), + emptyMap(), + Workload("test", fraction), + seed + ) + } + + /** + * Obtain the environment reader for the test. + */ + private fun createTestEnvironmentReader(name: String = "topology"): EnvironmentReader { + val stream = object {}.javaClass.getResourceAsStream("/env/$name.txt") + return Sc20ClusterEnvironmentReader(stream) + } + + class TestExperimentReporter : ExperimentMonitor { + var totalRequestedBurst = 0L + var totalGrantedBurst = 0L + var totalOvercommissionedBurst = 0L + var totalInterferedBurst = 0L + + override fun reportHostSlice( + time: Long, + requestedBurst: Long, + grantedBurst: Long, + overcommissionedBurst: Long, + interferedBurst: Long, + cpuUsage: Double, + cpuDemand: Double, + numberOfDeployedImages: Int, + hostServer: Server, + duration: Long + ) { + totalRequestedBurst += requestedBurst + totalGrantedBurst += grantedBurst + totalOvercommissionedBurst += overcommissionedBurst + totalInterferedBurst += interferedBurst + } + + override fun close() {} + } +} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt new file mode 100644 index 00000000..53b3c2d7 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt @@ -0,0 +1,3 @@ +ClusterID;ClusterName;Cores;Speed;Memory;numberOfHosts;memoryCapacityPerHost;coreCountPerHost +A01;A01;8;3.2;64;1;64;8 + diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/topology.txt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/topology.txt new file mode 100644 index 00000000..6b347bff --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/topology.txt @@ -0,0 +1,5 @@ +ClusterID;ClusterName;Cores;Speed;Memory;numberOfHosts;memoryCapacityPerHost;coreCountPerHost +A01;A01;32;3.2;2048;1;256;32 +B01;B01;48;2.93;1256;6;64;8 +C01;C01;32;3.2;2048;2;128;16 + diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet new file mode 100644 index 00000000..ce7a812c Binary files /dev/null and b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet differ diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet new file mode 100644 index 00000000..1d7ce882 Binary files /dev/null and b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet differ diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/.gitignore b/simulator/opendc-experiments/opendc-experiments-sc18/.gitignore new file mode 100644 index 00000000..ba64707c --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-sc18/.gitignore @@ -0,0 +1,2 @@ +input/ +output/ diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt index 3843f198..fc979363 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt +++ b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt @@ -55,12 +55,12 @@ public class UnderspecificationExperiment : Experiment("underspecification") { /** * The workflow traces to test. */ - private val trace: String by anyOf("traces/chronos_exp_noscaler_ca.gwf") + private val trace: String by anyOf("input/traces/chronos_exp_noscaler_ca.gwf") /** * The datacenter environments to test. */ - private val environment: String by anyOf("environments/base.json") + private val environment: String by anyOf("input/environments/base.json") @OptIn(ExperimentalCoroutinesApi::class) override fun doRun(repeat: Int) { diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/build.gradle.kts b/simulator/opendc-experiments/opendc-experiments-sc20/build.gradle.kts deleted file mode 100644 index 8127d9eb..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/build.gradle.kts +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright (c) 2019 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -description = "Experiments for the SC20 paper" - -/* Build configuration */ -plugins { - `kotlin-library-conventions` - `testing-conventions` - application -} - -application { - mainClass.set("org.opendc.harness.runner.console.ConsoleRunnerKt") - applicationDefaultJvmArgs = listOf("-Xms2500M") -} - -dependencies { - api(project(":opendc-core")) - api(project(":opendc-harness")) - implementation(project(":opendc-format")) - implementation(project(":opendc-simulator:opendc-simulator-core")) - implementation(project(":opendc-simulator:opendc-simulator-compute")) - implementation(project(":opendc-simulator:opendc-simulator-failures")) - implementation(project(":opendc-compute:opendc-compute-simulator")) - - implementation("io.github.microutils:kotlin-logging") - implementation("me.tongfei:progressbar:${versions["progressbar"]}") - implementation("com.github.ajalt.clikt:clikt:${versions["clikt"]}") - - implementation("org.apache.parquet:parquet-avro:${versions["parquet-avro"]}") - implementation("org.apache.hadoop:hadoop-client:${versions["hadoop-client"]}") { - exclude(group = "org.slf4j", module = "slf4j-log4j12") - exclude(group = "log4j") - } -} diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/CompositeWorkloadPortfolio.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/CompositeWorkloadPortfolio.kt deleted file mode 100644 index f4242456..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/CompositeWorkloadPortfolio.kt +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.experiment - -import org.opendc.experiments.sc20.experiment.model.CompositeWorkload -import org.opendc.experiments.sc20.experiment.model.OperationalPhenomena -import org.opendc.experiments.sc20.experiment.model.Topology -import org.opendc.experiments.sc20.experiment.model.Workload -import org.opendc.harness.dsl.anyOf - -/** - * A [Portfolio] that explores the effect of a composite workload. - */ -public class CompositeWorkloadPortfolio : Portfolio("composite-workload") { - private val totalSampleLoad = 1.3301733005049648E12 - - override val topology: Topology by anyOf( - Topology("base"), - Topology("exp-vol-hor-hom"), - Topology("exp-vol-ver-hom"), - Topology("exp-vel-ver-hom") - ) - - override val workload: Workload by anyOf( - CompositeWorkload( - "all-azure", - listOf(Workload("solvinity-short", 0.0), Workload("azure", 1.0)), - totalSampleLoad - ), - CompositeWorkload( - "solvinity-25-azure-75", - listOf(Workload("solvinity-short", 0.25), Workload("azure", 0.75)), - totalSampleLoad - ), - CompositeWorkload( - "solvinity-50-azure-50", - listOf(Workload("solvinity-short", 0.5), Workload("azure", 0.5)), - totalSampleLoad - ), - CompositeWorkload( - "solvinity-75-azure-25", - listOf(Workload("solvinity-short", 0.75), Workload("azure", 0.25)), - totalSampleLoad - ), - CompositeWorkload( - "all-solvinity", - listOf(Workload("solvinity-short", 1.0), Workload("azure", 0.0)), - totalSampleLoad - ) - ) - - override val operationalPhenomena: OperationalPhenomena by anyOf( - OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = false) - ) - - override val allocationPolicy: String by anyOf( - "active-servers" - ) -} diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/ExperimentHelpers.kt deleted file mode 100644 index 1e01e892..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/ExperimentHelpers.kt +++ /dev/null @@ -1,276 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.experiment - -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.collect -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.flow.takeWhile -import kotlinx.coroutines.launch -import mu.KotlinLogging -import org.opendc.compute.core.Flavor -import org.opendc.compute.core.ServerEvent -import org.opendc.compute.core.metal.NODE_CLUSTER -import org.opendc.compute.core.metal.driver.BareMetalDriver -import org.opendc.compute.core.metal.service.ProvisioningService -import org.opendc.compute.core.virt.HypervisorEvent -import org.opendc.compute.core.virt.service.VirtProvisioningEvent -import org.opendc.compute.core.workload.VmWorkload -import org.opendc.compute.simulator.SimBareMetalDriver -import org.opendc.compute.simulator.SimVirtDriver -import org.opendc.compute.simulator.SimVirtProvisioningService -import org.opendc.compute.simulator.allocation.AllocationPolicy -import org.opendc.experiments.sc20.experiment.monitor.ExperimentMonitor -import org.opendc.experiments.sc20.trace.Sc20StreamingParquetTraceReader -import org.opendc.format.environment.EnvironmentReader -import org.opendc.format.trace.TraceReader -import org.opendc.simulator.compute.SimFairShareHypervisorProvider -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel -import org.opendc.simulator.failures.CorrelatedFaultInjector -import org.opendc.simulator.failures.FailureDomain -import org.opendc.simulator.failures.FaultInjector -import org.opendc.trace.core.EventTracer -import java.io.File -import java.time.Clock -import kotlin.math.ln -import kotlin.math.max -import kotlin.random.Random - -/** - * The logger for this experiment. - */ -private val logger = KotlinLogging.logger {} - -/** - * Construct the failure domain for the experiments. - */ -public suspend fun createFailureDomain( - coroutineScope: CoroutineScope, - clock: Clock, - seed: Int, - failureInterval: Double, - bareMetalProvisioner: ProvisioningService, - chan: Channel -): CoroutineScope { - val job = coroutineScope.launch { - chan.receive() - val random = Random(seed) - val injectors = mutableMapOf() - for (node in bareMetalProvisioner.nodes()) { - val cluster = node.metadata[NODE_CLUSTER] as String - val injector = - injectors.getOrPut(cluster) { - createFaultInjector( - this, - clock, - random, - failureInterval - ) - } - injector.enqueue(node.metadata["driver"] as FailureDomain) - } - } - return CoroutineScope(coroutineScope.coroutineContext + job) -} - -/** - * Obtain the [FaultInjector] to use for the experiments. - */ -public fun createFaultInjector( - coroutineScope: CoroutineScope, - clock: Clock, - random: Random, - failureInterval: Double -): FaultInjector { - // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 - // GRID'5000 - return CorrelatedFaultInjector( - coroutineScope, - clock, - iatScale = ln(failureInterval), iatShape = 1.03, // Hours - sizeScale = ln(2.0), sizeShape = ln(1.0), // Expect 2 machines, with variation of 1 - dScale = ln(60.0), dShape = ln(60.0 * 8), // Minutes - random = random - ) -} - -/** - * Create the trace reader from which the VM workloads are read. - */ -public fun createTraceReader( - path: File, - performanceInterferenceModel: PerformanceInterferenceModel, - vms: List, - seed: Int -): Sc20StreamingParquetTraceReader { - return Sc20StreamingParquetTraceReader( - path, - performanceInterferenceModel, - vms, - Random(seed) - ) -} - -/** - * Construct the environment for a VM provisioner and return the provisioner instance. - */ -public suspend fun createProvisioner( - coroutineScope: CoroutineScope, - clock: Clock, - environmentReader: EnvironmentReader, - allocationPolicy: AllocationPolicy, - eventTracer: EventTracer -): Pair { - val environment = environmentReader.use { it.construct(coroutineScope, clock) } - val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService] - - // Wait for the bare metal nodes to be spawned - delay(10) - - val scheduler = SimVirtProvisioningService(coroutineScope, clock, bareMetalProvisioner, allocationPolicy, eventTracer, SimFairShareHypervisorProvider()) - - // Wait for the hypervisors to be spawned - delay(10) - - return bareMetalProvisioner to scheduler -} - -/** - * Attach the specified monitor to the VM provisioner. - */ -@OptIn(ExperimentalCoroutinesApi::class) -public suspend fun attachMonitor( - coroutineScope: CoroutineScope, - clock: Clock, - scheduler: SimVirtProvisioningService, - monitor: ExperimentMonitor -) { - - val hypervisors = scheduler.drivers() - - // Monitor hypervisor events - for (hypervisor in hypervisors) { - // TODO Do not expose VirtDriver directly but use Hypervisor class. - val server = (hypervisor as SimVirtDriver).server - monitor.reportHostStateChange(clock.millis(), hypervisor, server) - server.events - .onEach { event -> - val time = clock.millis() - when (event) { - is ServerEvent.StateChanged -> { - monitor.reportHostStateChange(time, hypervisor, event.server) - } - } - } - .launchIn(coroutineScope) - hypervisor.events - .onEach { event -> - when (event) { - is HypervisorEvent.SliceFinished -> monitor.reportHostSlice( - clock.millis(), - event.requestedBurst, - event.grantedBurst, - event.overcommissionedBurst, - event.interferedBurst, - event.cpuUsage, - event.cpuDemand, - event.numberOfDeployedImages, - event.hostServer - ) - } - } - .launchIn(coroutineScope) - - val driver = hypervisor.server.services[BareMetalDriver.Key] as SimBareMetalDriver - driver.powerDraw - .onEach { monitor.reportPowerConsumption(hypervisor.server, it) } - .launchIn(coroutineScope) - } - - scheduler.events - .onEach { event -> - when (event) { - is VirtProvisioningEvent.MetricsAvailable -> - monitor.reportProvisionerMetrics(clock.millis(), event) - } - } - .launchIn(coroutineScope) -} - -/** - * Process the trace. - */ -public suspend fun processTrace( - coroutineScope: CoroutineScope, - clock: Clock, - reader: TraceReader, - scheduler: SimVirtProvisioningService, - chan: Channel, - monitor: ExperimentMonitor -) { - try { - var submitted = 0 - - while (reader.hasNext()) { - val (time, workload) = reader.next() - - submitted++ - delay(max(0, time - clock.millis())) - coroutineScope.launch { - chan.send(Unit) - val server = scheduler.deploy( - workload.image.name, - workload.image, - Flavor( - workload.image.tags["cores"] as Int, - workload.image.tags["required-memory"] as Long - ) - ) - // Monitor server events - server.events - .onEach { - if (it is ServerEvent.StateChanged) { - monitor.reportVmStateChange(clock.millis(), it.server) - } - } - .collect() - } - } - - scheduler.events - .takeWhile { - when (it) { - is VirtProvisioningEvent.MetricsAvailable -> - it.inactiveVmCount + it.failedVmCount != submitted - } - } - .collect() - delay(1) - } finally { - reader.close() - } -} diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/HorVerPortfolio.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/HorVerPortfolio.kt deleted file mode 100644 index aa97b808..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/HorVerPortfolio.kt +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.experiment - -import org.opendc.experiments.sc20.experiment.model.OperationalPhenomena -import org.opendc.experiments.sc20.experiment.model.Topology -import org.opendc.experiments.sc20.experiment.model.Workload -import org.opendc.harness.dsl.anyOf - -/** - * A [Portfolio] that explores the difference between horizontal and vertical scaling. - */ -public class HorVerPortfolio : Portfolio("horizontal_vs_vertical") { - override val topology: Topology by anyOf( - Topology("base"), - Topology("rep-vol-hor-hom"), - Topology("rep-vol-hor-het"), - Topology("rep-vol-ver-hom"), - Topology("rep-vol-ver-het"), - Topology("exp-vol-hor-hom"), - Topology("exp-vol-hor-het"), - Topology("exp-vol-ver-hom"), - Topology("exp-vol-ver-het") - ) - - override val workload: Workload by anyOf( - Workload("solvinity", 0.1), - Workload("solvinity", 0.25), - Workload("solvinity", 0.5), - Workload("solvinity", 1.0) - ) - - override val operationalPhenomena: OperationalPhenomena by anyOf( - OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true) - ) - - override val allocationPolicy: String by anyOf( - "active-servers" - ) -} diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/MoreHpcPortfolio.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/MoreHpcPortfolio.kt deleted file mode 100644 index bdb33f59..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/MoreHpcPortfolio.kt +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.experiment - -import org.opendc.experiments.sc20.experiment.model.* -import org.opendc.harness.dsl.anyOf - -/** - * A [Portfolio] to explore the effect of HPC workloads. - */ -public class MoreHpcPortfolio : Portfolio("more_hpc") { - override val topology: Topology by anyOf( - Topology("base"), - Topology("exp-vol-hor-hom"), - Topology("exp-vol-ver-hom"), - Topology("exp-vel-ver-hom") - ) - - override val workload: Workload by anyOf( - Workload("solvinity", 0.0, samplingStrategy = SamplingStrategy.HPC), - Workload("solvinity", 0.25, samplingStrategy = SamplingStrategy.HPC), - Workload("solvinity", 0.5, samplingStrategy = SamplingStrategy.HPC), - Workload("solvinity", 1.0, samplingStrategy = SamplingStrategy.HPC), - Workload("solvinity", 0.25, samplingStrategy = SamplingStrategy.HPC_LOAD), - Workload("solvinity", 0.5, samplingStrategy = SamplingStrategy.HPC_LOAD), - Workload("solvinity", 1.0, samplingStrategy = SamplingStrategy.HPC_LOAD) - ) - - override val operationalPhenomena: OperationalPhenomena by anyOf( - OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true) - ) - - override val allocationPolicy: String by anyOf( - "active-servers" - ) -} diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/MoreVelocityPortfolio.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/MoreVelocityPortfolio.kt deleted file mode 100644 index 733dabf6..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/MoreVelocityPortfolio.kt +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.experiment - -import org.opendc.experiments.sc20.experiment.model.OperationalPhenomena -import org.opendc.experiments.sc20.experiment.model.Topology -import org.opendc.experiments.sc20.experiment.model.Workload -import org.opendc.harness.dsl.anyOf - -/** - * A [Portfolio] that explores the effect of adding more velocity to a cluster (e.g., faster machines). - */ -public class MoreVelocityPortfolio : Portfolio("more_velocity") { - override val topology: Topology by anyOf( - Topology("base"), - Topology("rep-vel-ver-hom"), - Topology("rep-vel-ver-het"), - Topology("exp-vel-ver-hom"), - Topology("exp-vel-ver-het") - ) - - override val workload: Workload by anyOf( - Workload("solvinity", 0.1), - Workload("solvinity", 0.25), - Workload("solvinity", 0.5), - Workload("solvinity", 1.0) - ) - - override val operationalPhenomena: OperationalPhenomena by anyOf( - OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true) - ) - - override val allocationPolicy: String by anyOf( - "active-servers" - ) -} diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/OperationalPhenomenaPortfolio.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/OperationalPhenomenaPortfolio.kt deleted file mode 100644 index 66b94faf..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/OperationalPhenomenaPortfolio.kt +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.experiment - -import org.opendc.experiments.sc20.experiment.model.OperationalPhenomena -import org.opendc.experiments.sc20.experiment.model.Topology -import org.opendc.experiments.sc20.experiment.model.Workload -import org.opendc.harness.dsl.anyOf - -/** - * A [Portfolio] that explores the effect of operational phenomena on metrics. - */ -public class OperationalPhenomenaPortfolio : Portfolio("operational_phenomena") { - override val topology: Topology by anyOf( - Topology("base") - ) - - override val workload: Workload by anyOf( - Workload("solvinity", 0.1), - Workload("solvinity", 0.25), - Workload("solvinity", 0.5), - Workload("solvinity", 1.0) - ) - - override val operationalPhenomena: OperationalPhenomena by anyOf( - OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true), - OperationalPhenomena(failureFrequency = 0.0, hasInterference = true), - OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = false), - OperationalPhenomena(failureFrequency = 0.0, hasInterference = false) - ) - - override val allocationPolicy: String by anyOf( - "mem", - "mem-inv", - "core-mem", - "core-mem-inv", - "active-servers", - "active-servers-inv", - "random" - ) -} diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/Portfolio.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/Portfolio.kt deleted file mode 100644 index 4a82ad56..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/Portfolio.kt +++ /dev/null @@ -1,217 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.experiment - -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.cancel -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.launch -import kotlinx.coroutines.test.TestCoroutineScope -import mu.KotlinLogging -import org.opendc.compute.simulator.allocation.* -import org.opendc.experiments.sc20.experiment.model.CompositeWorkload -import org.opendc.experiments.sc20.experiment.model.OperationalPhenomena -import org.opendc.experiments.sc20.experiment.model.Topology -import org.opendc.experiments.sc20.experiment.model.Workload -import org.opendc.experiments.sc20.experiment.monitor.ParquetExperimentMonitor -import org.opendc.experiments.sc20.trace.Sc20ParquetTraceReader -import org.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader -import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader -import org.opendc.format.trace.PerformanceInterferenceModelReader -import org.opendc.harness.dsl.Experiment -import org.opendc.harness.dsl.anyOf -import org.opendc.simulator.utils.DelayControllerClockAdapter -import org.opendc.trace.core.EventTracer -import java.io.File -import java.util.concurrent.ConcurrentHashMap -import kotlin.random.Random - -/** - * A portfolio represents a collection of scenarios are tested for the work. - * - * @param name The name of the portfolio. - */ -public abstract class Portfolio(name: String) : Experiment(name) { - /** - * The logger for this portfolio instance. - */ - private val logger = KotlinLogging.logger {} - - /** - * The path to where the environments are located. - */ - private val environmentPath by anyOf(File("environments/")) - - /** - * The path to where the traces are located. - */ - private val tracePath by anyOf(File("traces/")) - - /** - * The path to where the output results should be written. - */ - private val outputPath by anyOf(File("results/")) - - /** - * The path to the original VM placements file. - */ - private val vmPlacements by anyOf(emptyMap()) - - /** - * The path to the performance interference model. - */ - private val performanceInterferenceModel by anyOf(null) - - /** - * The topology to test. - */ - public abstract val topology: Topology - - /** - * The workload to test. - */ - public abstract val workload: Workload - - /** - * The operational phenomenas to consider. - */ - public abstract val operationalPhenomena: OperationalPhenomena - - /** - * The allocation policies to consider. - */ - public abstract val allocationPolicy: String - - /** - * A map of trace readers. - */ - private val traceReaders = ConcurrentHashMap() - - /** - * Perform a single trial for this portfolio. - */ - @OptIn(ExperimentalCoroutinesApi::class) - override fun doRun(repeat: Int) { - val testScope = TestCoroutineScope() - val clock = DelayControllerClockAdapter(testScope) - val tracer = EventTracer(clock) - val seeder = Random(repeat) - val environment = Sc20ClusterEnvironmentReader(File(environmentPath, "${topology.name}.txt")) - - val chan = Channel(Channel.CONFLATED) - val allocationPolicy = createAllocationPolicy(seeder) - - val workload = workload - val workloadNames = if (workload is CompositeWorkload) { - workload.workloads.map { it.name } - } else { - listOf(workload.name) - } - - val rawReaders = workloadNames.map { workloadName -> - traceReaders.computeIfAbsent(workloadName) { - logger.info { "Loading trace $workloadName" } - Sc20RawParquetTraceReader(File(tracePath, workloadName)) - } - } - - val performanceInterferenceModel = performanceInterferenceModel - ?.takeIf { operationalPhenomena.hasInterference } - ?.construct(seeder) ?: emptyMap() - val trace = Sc20ParquetTraceReader(rawReaders, performanceInterferenceModel, workload, seeder.nextInt()) - - val monitor = ParquetExperimentMonitor( - outputPath, - "portfolio_id=$name/scenario_id=$id/run_id=$repeat", - 4096 - ) - - testScope.launch { - val (bareMetalProvisioner, scheduler) = createProvisioner( - this, - clock, - environment, - allocationPolicy, - tracer - ) - - val failureDomain = if (operationalPhenomena.failureFrequency > 0) { - logger.debug("ENABLING failures") - createFailureDomain( - this, - clock, - seeder.nextInt(), - operationalPhenomena.failureFrequency, - bareMetalProvisioner, - chan - ) - } else { - null - } - - attachMonitor(this, clock, scheduler, monitor) - processTrace( - this, - clock, - trace, - scheduler, - chan, - monitor - ) - - logger.debug("SUBMIT=${scheduler.submittedVms}") - logger.debug("FAIL=${scheduler.unscheduledVms}") - logger.debug("QUEUED=${scheduler.queuedVms}") - logger.debug("RUNNING=${scheduler.runningVms}") - logger.debug("FINISHED=${scheduler.finishedVms}") - - failureDomain?.cancel() - scheduler.terminate() - } - - try { - testScope.advanceUntilIdle() - } finally { - monitor.close() - } - } - - /** - * Create the [AllocationPolicy] instance to use for the trial. - */ - private fun createAllocationPolicy(seeder: Random): AllocationPolicy { - return when (allocationPolicy) { - "mem" -> AvailableMemoryAllocationPolicy() - "mem-inv" -> AvailableMemoryAllocationPolicy(true) - "core-mem" -> AvailableCoreMemoryAllocationPolicy() - "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true) - "active-servers" -> NumberOfActiveServersAllocationPolicy() - "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true) - "provisioned-cores" -> ProvisionedCoresAllocationPolicy() - "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true) - "random" -> RandomAllocationPolicy(Random(seeder.nextInt())) - "replay" -> ReplayAllocationPolicy(vmPlacements) - else -> throw IllegalArgumentException("Unknown policy $allocationPolicy") - } - } -} diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/ReplayPortfolio.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/ReplayPortfolio.kt deleted file mode 100644 index 8a42a3b4..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/ReplayPortfolio.kt +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.experiment - -import org.opendc.experiments.sc20.experiment.model.OperationalPhenomena -import org.opendc.experiments.sc20.experiment.model.Topology -import org.opendc.experiments.sc20.experiment.model.Workload -import org.opendc.harness.dsl.anyOf - -/** - * A [Portfolio] that compares the original VM placements against our policies. - */ -public class ReplayPortfolio : Portfolio("replay") { - override val topology: Topology by anyOf( - Topology("base") - ) - - override val workload: Workload by anyOf( - Workload("solvinity", 1.0) - ) - - override val operationalPhenomena: OperationalPhenomena by anyOf( - OperationalPhenomena(failureFrequency = 0.0, hasInterference = false) - ) - - override val allocationPolicy: String by anyOf( - "replay", - "active-servers" - ) -} diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/TestPortfolio.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/TestPortfolio.kt deleted file mode 100644 index 2210fc97..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/TestPortfolio.kt +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.experiment - -import org.opendc.experiments.sc20.experiment.model.OperationalPhenomena -import org.opendc.experiments.sc20.experiment.model.Topology -import org.opendc.experiments.sc20.experiment.model.Workload -import org.opendc.harness.dsl.anyOf - -/** - * A [Portfolio] to perform a simple test run. - */ -public class TestPortfolio : Portfolio("test") { - override val topology: Topology by anyOf( - Topology("base") - ) - - override val workload: Workload by anyOf( - Workload("solvinity", 1.0) - ) - - override val operationalPhenomena: OperationalPhenomena by anyOf( - OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true) - ) - - override val allocationPolicy: String by anyOf("active-servers") -} diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/model/OperationalPhenomena.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/model/OperationalPhenomena.kt deleted file mode 100644 index b22f4c9e..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/model/OperationalPhenomena.kt +++ /dev/null @@ -1,33 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.experiment.model - -/** - * Operation phenomena during experiments. - * - * @param failureFrequency The average time between failures in hours. - * @param hasInterference A flag to enable performance interference between VMs. - */ -public data class OperationalPhenomena(val failureFrequency: Double, val hasInterference: Boolean) diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/model/Topology.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/model/Topology.kt deleted file mode 100644 index 95062fda..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/model/Topology.kt +++ /dev/null @@ -1,30 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.experiment.model - -/** - * The topology topology on which we test the workload. - */ -public data class Topology(val name: String) diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/model/Workload.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/model/Workload.kt deleted file mode 100644 index ba95f544..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/model/Workload.kt +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.experiment.model - -public enum class SamplingStrategy { - REGULAR, - HPC, - HPC_LOAD -} - -/** - * A workload that is considered for a scenario. - */ -public open class Workload( - public open val name: String, - public val fraction: Double, - public val samplingStrategy: SamplingStrategy = SamplingStrategy.REGULAR -) - -/** - * A workload that is composed of multiple workloads. - */ -public class CompositeWorkload(override val name: String, public val workloads: List, public val totalLoad: Double) : - Workload(name, -1.0) diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/monitor/ExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/monitor/ExperimentMonitor.kt deleted file mode 100644 index 18ba2c33..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/monitor/ExperimentMonitor.kt +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.experiment.monitor - -import org.opendc.compute.core.Server -import org.opendc.compute.core.virt.driver.VirtDriver -import org.opendc.compute.core.virt.service.VirtProvisioningEvent -import java.io.Closeable - -/** - * A monitor watches the events of an experiment. - */ -public interface ExperimentMonitor : Closeable { - /** - * This method is invoked when the state of a VM changes. - */ - public fun reportVmStateChange(time: Long, server: Server) {} - - /** - * This method is invoked when the state of a host changes. - */ - public fun reportHostStateChange( - time: Long, - driver: VirtDriver, - server: Server - ) { - } - - /** - * Report the power consumption of a host. - */ - public fun reportPowerConsumption(host: Server, draw: Double) {} - - /** - * This method is invoked for a host for each slice that is finishes. - */ - public fun reportHostSlice( - time: Long, - requestedBurst: Long, - grantedBurst: Long, - overcommissionedBurst: Long, - interferedBurst: Long, - cpuUsage: Double, - cpuDemand: Double, - numberOfDeployedImages: Int, - hostServer: Server, - duration: Long = 5 * 60 * 1000L - ) { - } - - /** - * This method is invoked for a provisioner event. - */ - public fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) {} -} diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt deleted file mode 100644 index 3eb9362c..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.experiment.monitor - -import mu.KotlinLogging -import org.opendc.compute.core.Server -import org.opendc.compute.core.virt.driver.VirtDriver -import org.opendc.compute.core.virt.service.VirtProvisioningEvent -import org.opendc.experiments.sc20.telemetry.HostEvent -import org.opendc.experiments.sc20.telemetry.ProvisionerEvent -import org.opendc.experiments.sc20.telemetry.parquet.ParquetHostEventWriter -import org.opendc.experiments.sc20.telemetry.parquet.ParquetProvisionerEventWriter -import java.io.File - -/** - * The logger instance to use. - */ -private val logger = KotlinLogging.logger {} - -/** - * An [ExperimentMonitor] that logs the events to a Parquet file. - */ -public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: Int) : ExperimentMonitor { - private val hostWriter = ParquetHostEventWriter( - File(base, "host-metrics/$partition/data.parquet"), - bufferSize - ) - private val provisionerWriter = ParquetProvisionerEventWriter( - File(base, "provisioner-metrics/$partition/data.parquet"), - bufferSize - ) - private val currentHostEvent = mutableMapOf() - private var startTime = -1L - - override fun reportVmStateChange(time: Long, server: Server) { - if (startTime < 0) { - startTime = time - - // Update timestamp of initial event - currentHostEvent.replaceAll { _, v -> v.copy(timestamp = startTime) } - } - } - - override fun reportHostStateChange( - time: Long, - driver: VirtDriver, - server: Server - ) { - logger.debug { "Host ${server.uid} changed state ${server.state} [$time]" } - - val previousEvent = currentHostEvent[server] - - val roundedTime = previousEvent?.let { - val duration = time - it.timestamp - val k = 5 * 60 * 1000L // 5 min in ms - val rem = duration % k - - if (rem == 0L) { - time - } else { - it.timestamp + duration + k - rem - } - } ?: time - - reportHostSlice( - roundedTime, - 0, - 0, - 0, - 0, - 0.0, - 0.0, - 0, - server - ) - } - - private val lastPowerConsumption = mutableMapOf() - - override fun reportPowerConsumption(host: Server, draw: Double) { - lastPowerConsumption[host] = draw - } - - override fun reportHostSlice( - time: Long, - requestedBurst: Long, - grantedBurst: Long, - overcommissionedBurst: Long, - interferedBurst: Long, - cpuUsage: Double, - cpuDemand: Double, - numberOfDeployedImages: Int, - hostServer: Server, - duration: Long - ) { - val previousEvent = currentHostEvent[hostServer] - when { - previousEvent == null -> { - val event = HostEvent( - time, - 5 * 60 * 1000L, - hostServer, - numberOfDeployedImages, - requestedBurst, - grantedBurst, - overcommissionedBurst, - interferedBurst, - cpuUsage, - cpuDemand, - lastPowerConsumption[hostServer] ?: 200.0, - hostServer.flavor.cpuCount - ) - - currentHostEvent[hostServer] = event - } - previousEvent.timestamp == time -> { - val event = HostEvent( - time, - previousEvent.duration, - hostServer, - numberOfDeployedImages, - requestedBurst, - grantedBurst, - overcommissionedBurst, - interferedBurst, - cpuUsage, - cpuDemand, - lastPowerConsumption[hostServer] ?: 200.0, - hostServer.flavor.cpuCount - ) - - currentHostEvent[hostServer] = event - } - else -> { - hostWriter.write(previousEvent) - - val event = HostEvent( - time, - time - previousEvent.timestamp, - hostServer, - numberOfDeployedImages, - requestedBurst, - grantedBurst, - overcommissionedBurst, - interferedBurst, - cpuUsage, - cpuDemand, - lastPowerConsumption[hostServer] ?: 200.0, - hostServer.flavor.cpuCount - ) - - currentHostEvent[hostServer] = event - } - } - } - - override fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) { - provisionerWriter.write( - ProvisionerEvent( - time, - event.totalHostCount, - event.availableHostCount, - event.totalVmCount, - event.activeVmCount, - event.inactiveVmCount, - event.waitingVmCount, - event.failedVmCount - ) - ) - } - - override fun close() { - // Flush remaining events - for ((_, event) in currentHostEvent) { - hostWriter.write(event) - } - currentHostEvent.clear() - - hostWriter.close() - provisionerWriter.close() - } -} diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/Event.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/Event.kt deleted file mode 100644 index 38a4a227..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/Event.kt +++ /dev/null @@ -1,35 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.telemetry - -/** - * An event that occurs within the system. - */ -public abstract class Event(public val name: String) { - /** - * The time of occurrence of this event. - */ - public abstract val timestamp: Long -} diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/HostEvent.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/HostEvent.kt deleted file mode 100644 index 042d204e..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/HostEvent.kt +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.telemetry - -import org.opendc.compute.core.Server - -/** - * A periodic report of the host machine metrics. - */ -public data class HostEvent( - override val timestamp: Long, - public val duration: Long, - public val host: Server, - public val vmCount: Int, - public val requestedBurst: Long, - public val grantedBurst: Long, - public val overcommissionedBurst: Long, - public val interferedBurst: Long, - public val cpuUsage: Double, - public val cpuDemand: Double, - public val powerDraw: Double, - public val cores: Int -) : Event("host-metrics") diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/ProvisionerEvent.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/ProvisionerEvent.kt deleted file mode 100644 index d063bc54..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/ProvisionerEvent.kt +++ /dev/null @@ -1,39 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.telemetry - -/** - * A periodic report of the provisioner's metrics. - */ -public data class ProvisionerEvent( - override val timestamp: Long, - public val totalHostCount: Int, - public val availableHostCount: Int, - public val totalVmCount: Int, - public val activeVmCount: Int, - public val inactiveVmCount: Int, - public val waitingVmCount: Int, - public val failedVmCount: Int -) : Event("provisioner-metrics") diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/RunEvent.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/RunEvent.kt deleted file mode 100644 index 4f4706f0..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/RunEvent.kt +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.telemetry - -import org.opendc.experiments.sc20.experiment.Portfolio - -/** - * A periodic report of the host machine metrics. - */ -public data class RunEvent( - val portfolio: Portfolio, - val repeat: Int, - override val timestamp: Long -) : Event("run") diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/VmEvent.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/VmEvent.kt deleted file mode 100644 index c18069c9..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/VmEvent.kt +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.telemetry - -import org.opendc.compute.core.Server - -/** - * A periodic report of a virtual machine's metrics. - */ -public data class VmEvent( - override val timestamp: Long, - public val duration: Long, - public val vm: Server, - public val host: Server, - public val requestedBurst: Long, - public val grantedBurst: Long, - public val overcommissionedBurst: Long, - public val interferedBurst: Long, - public val cpuUsage: Double, - public val cpuDemand: Double -) : Event("vm-metrics") diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt deleted file mode 100644 index 79bc23db..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.telemetry.parquet - -import mu.KotlinLogging -import org.apache.avro.Schema -import org.apache.avro.generic.GenericData -import org.apache.hadoop.fs.Path -import org.apache.parquet.avro.AvroParquetWriter -import org.apache.parquet.hadoop.metadata.CompressionCodecName -import org.opendc.experiments.sc20.telemetry.Event -import java.io.Closeable -import java.io.File -import java.util.concurrent.ArrayBlockingQueue -import java.util.concurrent.BlockingQueue -import kotlin.concurrent.thread - -/** - * The logging instance to use. - */ -private val logger = KotlinLogging.logger {} - -/** - * A writer that writes events in Parquet format. - */ -public open class ParquetEventWriter( - private val path: File, - private val schema: Schema, - private val converter: (T, GenericData.Record) -> Unit, - private val bufferSize: Int = 4096 -) : Runnable, Closeable { - /** - * The writer to write the Parquet file. - */ - private val writer = AvroParquetWriter.builder(Path(path.absolutePath)) - .withSchema(schema) - .withCompressionCodec(CompressionCodecName.SNAPPY) - .withPageSize(4 * 1024 * 1024) // For compression - .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) - .build() - - /** - * The queue of commands to process. - */ - private val queue: BlockingQueue = ArrayBlockingQueue(bufferSize) - - /** - * The thread that is responsible for writing the Parquet records. - */ - private val writerThread = thread(start = false, name = "parquet-writer") { run() } - - /** - * Write the specified metrics to the database. - */ - public fun write(event: T) { - queue.put(Action.Write(event)) - } - - /** - * Signal the writer to stop. - */ - public override fun close() { - queue.put(Action.Stop) - writerThread.join() - } - - init { - writerThread.start() - } - - /** - * Start the writer thread. - */ - override fun run() { - try { - loop@ while (true) { - val action = queue.take() - when (action) { - is Action.Stop -> break@loop - is Action.Write<*> -> { - val record = GenericData.Record(schema) - @Suppress("UNCHECKED_CAST") - converter(action.event as T, record) - writer.write(record) - } - } - } - } catch (e: Throwable) { - logger.error("Writer failed", e) - } finally { - writer.close() - } - } - - public sealed class Action { - /** - * A poison pill that will stop the writer thread. - */ - public object Stop : Action() - - /** - * Write the specified metrics to the database. - */ - public data class Write(val event: T) : Action() - } -} diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt deleted file mode 100644 index bd4eae37..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.telemetry.parquet - -import org.apache.avro.Schema -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData -import org.opendc.experiments.sc20.telemetry.HostEvent -import java.io.File - -/** - * A Parquet event writer for [HostEvent]s. - */ -public class ParquetHostEventWriter(path: File, bufferSize: Int) : - ParquetEventWriter(path, schema, convert, bufferSize) { - - override fun toString(): String = "host-writer" - - public companion object { - private val convert: (HostEvent, GenericData.Record) -> Unit = { event, record -> - // record.put("portfolio_id", event.run.parent.parent.id) - // record.put("scenario_id", event.run.parent.id) - // record.put("run_id", event.run.id) - record.put("host_id", event.host.name) - record.put("state", event.host.state.name) - record.put("timestamp", event.timestamp) - record.put("duration", event.duration) - record.put("vm_count", event.vmCount) - record.put("requested_burst", event.requestedBurst) - record.put("granted_burst", event.grantedBurst) - record.put("overcommissioned_burst", event.overcommissionedBurst) - record.put("interfered_burst", event.interferedBurst) - record.put("cpu_usage", event.cpuUsage) - record.put("cpu_demand", event.cpuDemand) - record.put("power_draw", event.powerDraw * (1.0 / 12)) - record.put("cores", event.cores) - } - - private val schema: Schema = SchemaBuilder - .record("host_metrics") - .namespace("org.opendc.experiments.sc20") - .fields() - // .name("portfolio_id").type().intType().noDefault() - // .name("scenario_id").type().intType().noDefault() - // .name("run_id").type().intType().noDefault() - .name("timestamp").type().longType().noDefault() - .name("duration").type().longType().noDefault() - .name("host_id").type().stringType().noDefault() - .name("state").type().stringType().noDefault() - .name("vm_count").type().intType().noDefault() - .name("requested_burst").type().longType().noDefault() - .name("granted_burst").type().longType().noDefault() - .name("overcommissioned_burst").type().longType().noDefault() - .name("interfered_burst").type().longType().noDefault() - .name("cpu_usage").type().doubleType().noDefault() - .name("cpu_demand").type().doubleType().noDefault() - .name("power_draw").type().doubleType().noDefault() - .name("cores").type().intType().noDefault() - .endRecord() - } -} diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt deleted file mode 100644 index 5d53a7bb..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.telemetry.parquet - -import org.apache.avro.Schema -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData -import org.opendc.experiments.sc20.telemetry.ProvisionerEvent -import java.io.File - -/** - * A Parquet event writer for [ProvisionerEvent]s. - */ -public class ParquetProvisionerEventWriter(path: File, bufferSize: Int) : - ParquetEventWriter(path, schema, convert, bufferSize) { - - override fun toString(): String = "provisioner-writer" - - public companion object { - private val convert: (ProvisionerEvent, GenericData.Record) -> Unit = { event, record -> - record.put("timestamp", event.timestamp) - record.put("host_total_count", event.totalHostCount) - record.put("host_available_count", event.availableHostCount) - record.put("vm_total_count", event.totalVmCount) - record.put("vm_active_count", event.activeVmCount) - record.put("vm_inactive_count", event.inactiveVmCount) - record.put("vm_waiting_count", event.waitingVmCount) - record.put("vm_failed_count", event.failedVmCount) - } - - private val schema: Schema = SchemaBuilder - .record("provisioner_metrics") - .namespace("org.opendc.experiments.sc20") - .fields() - .name("timestamp").type().longType().noDefault() - .name("host_total_count").type().intType().noDefault() - .name("host_available_count").type().intType().noDefault() - .name("vm_total_count").type().intType().noDefault() - .name("vm_active_count").type().intType().noDefault() - .name("vm_inactive_count").type().intType().noDefault() - .name("vm_waiting_count").type().intType().noDefault() - .name("vm_failed_count").type().intType().noDefault() - .endRecord() - } -} diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt deleted file mode 100644 index b50a698c..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.telemetry.parquet - -import org.apache.avro.Schema -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData -import org.opendc.experiments.sc20.telemetry.RunEvent -import java.io.File - -/** - * A Parquet event writer for [RunEvent]s. - */ -public class ParquetRunEventWriter(path: File, bufferSize: Int) : - ParquetEventWriter(path, schema, convert, bufferSize) { - - override fun toString(): String = "run-writer" - - public companion object { - private val convert: (RunEvent, GenericData.Record) -> Unit = { event, record -> - val portfolio = event.portfolio - record.put("portfolio_name", portfolio.name) - record.put("scenario_id", portfolio.id) - record.put("run_id", event.repeat) - record.put("topology", portfolio.topology.name) - record.put("workload_name", portfolio.workload.name) - record.put("workload_fraction", portfolio.workload.fraction) - record.put("workload_sampler", portfolio.workload.samplingStrategy) - record.put("allocation_policy", portfolio.allocationPolicy) - record.put("failure_frequency", portfolio.operationalPhenomena.failureFrequency) - record.put("interference", portfolio.operationalPhenomena.hasInterference) - record.put("seed", event.repeat) - } - - private val schema: Schema = SchemaBuilder - .record("runs") - .namespace("org.opendc.experiments.sc20") - .fields() - .name("portfolio_name").type().stringType().noDefault() - .name("scenario_id").type().intType().noDefault() - .name("run_id").type().intType().noDefault() - .name("topology").type().stringType().noDefault() - .name("workload_name").type().stringType().noDefault() - .name("workload_fraction").type().doubleType().noDefault() - .name("workload_sampler").type().stringType().noDefault() - .name("allocation_policy").type().stringType().noDefault() - .name("failure_frequency").type().doubleType().noDefault() - .name("interference").type().booleanType().noDefault() - .name("seed").type().intType().noDefault() - .endRecord() - } -} diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt deleted file mode 100644 index d735ea4b..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.trace - -import org.opendc.compute.core.workload.VmWorkload -import org.opendc.compute.simulator.SimWorkloadImage -import org.opendc.experiments.sc20.experiment.model.CompositeWorkload -import org.opendc.experiments.sc20.experiment.model.Workload -import org.opendc.format.trace.TraceEntry -import org.opendc.format.trace.TraceReader -import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel -import java.util.TreeSet - -/** - * A [TraceReader] for the internal VM workload trace format. - * - * @param reader The internal trace reader to use. - * @param performanceInterferenceModel The performance model covering the workload in the VM trace. - * @param run The run to which this reader belongs. - */ -@OptIn(ExperimentalStdlibApi::class) -public class Sc20ParquetTraceReader( - rawReaders: List, - performanceInterferenceModel: Map, - workload: Workload, - seed: Int -) : TraceReader { - /** - * The iterator over the actual trace. - */ - private val iterator: Iterator> = - rawReaders - .map { it.read() } - .run { - if (workload is CompositeWorkload) { - this.zip(workload.workloads) - } else { - this.zip(listOf(workload)) - } - } - .map { sampleWorkload(it.first, workload, it.second, seed) } - .flatten() - .run { - // Apply performance interference model - if (performanceInterferenceModel.isEmpty()) - this - else { - map { entry -> - val image = entry.workload.image - val id = image.name - val relevantPerformanceInterferenceModelItems = - performanceInterferenceModel[id] ?: PerformanceInterferenceModel(TreeSet()) - - val newImage = - SimWorkloadImage( - image.uid, - image.name, - image.tags + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), - (image as SimWorkloadImage).workload - ) - val newWorkload = entry.workload.copy(image = newImage) - Sc20RawParquetTraceReader.TraceEntryImpl(entry.submissionTime, newWorkload) - } - } - } - .iterator() - - override fun hasNext(): Boolean = iterator.hasNext() - - override fun next(): TraceEntry = iterator.next() - - override fun close() {} -} diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt deleted file mode 100644 index 4a318df4..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.trace - -import mu.KotlinLogging -import org.apache.avro.generic.GenericData -import org.apache.hadoop.fs.Path -import org.apache.parquet.avro.AvroParquetReader -import org.opendc.compute.core.workload.VmWorkload -import org.opendc.compute.simulator.SimWorkloadImage -import org.opendc.core.User -import org.opendc.format.trace.TraceEntry -import org.opendc.format.trace.TraceReader -import org.opendc.simulator.compute.workload.SimTraceWorkload -import java.io.File -import java.util.UUID - -private val logger = KotlinLogging.logger {} - -/** - * A [TraceReader] for the internal VM workload trace format. - * - * @param path The directory of the traces. - */ -@OptIn(ExperimentalStdlibApi::class) -public class Sc20RawParquetTraceReader(private val path: File) { - /** - * Read the fragments into memory. - */ - private fun parseFragments(path: File): Map> { - val reader = AvroParquetReader.builder(Path(path.absolutePath, "trace.parquet")) - .disableCompatibility() - .build() - - val fragments = mutableMapOf>() - - return try { - while (true) { - val record = reader.read() ?: break - - val id = record["id"].toString() - val tick = record["time"] as Long - val duration = record["duration"] as Long - val cores = record["cores"] as Int - val cpuUsage = record["cpuUsage"] as Double - val flops = record["flops"] as Long - - val fragment = SimTraceWorkload.Fragment( - duration, - cpuUsage, - cores - ) - - fragments.getOrPut(id) { mutableListOf() }.add(fragment) - } - - fragments - } finally { - reader.close() - } - } - - /** - * Read the metadata into a workload. - */ - private fun parseMeta(path: File, fragments: Map>): List { - val metaReader = AvroParquetReader.builder(Path(path.absolutePath, "meta.parquet")) - .disableCompatibility() - .build() - - var counter = 0 - val entries = mutableListOf() - - return try { - while (true) { - val record = metaReader.read() ?: break - - val id = record["id"].toString() - if (!fragments.containsKey(id)) { - continue - } - - val submissionTime = record["submissionTime"] as Long - val endTime = record["endTime"] as Long - val maxCores = record["maxCores"] as Int - val requiredMemory = record["requiredMemory"] as Long - val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray()) - - val vmFragments = fragments.getValue(id).asSequence() - val totalLoad = vmFragments.sumByDouble { it.usage } * 5 * 60 // avg MHz * duration = MFLOPs - val vmWorkload = VmWorkload( - uid, - id, - UnnamedUser, - SimWorkloadImage( - uid, - id, - mapOf( - "submit-time" to submissionTime, - "end-time" to endTime, - "total-load" to totalLoad, - "cores" to maxCores, - "required-memory" to requiredMemory - ), - SimTraceWorkload(vmFragments) - ) - ) - entries.add(TraceEntryImpl(submissionTime, vmWorkload)) - } - - entries - } catch (e: Exception) { - e.printStackTrace() - throw e - } finally { - metaReader.close() - } - } - - /** - * The entries in the trace. - */ - private val entries: List - - init { - val fragments = parseFragments(path) - entries = parseMeta(path, fragments) - } - - /** - * Read the entries in the trace. - */ - public fun read(): List> = entries - - /** - * An unnamed user. - */ - private object UnnamedUser : User { - override val name: String = "" - override val uid: UUID = UUID.randomUUID() - } - - /** - * An entry in the trace. - */ - internal data class TraceEntryImpl( - override var submissionTime: Long, - override val workload: VmWorkload - ) : TraceEntry -} diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt deleted file mode 100644 index ba22ae15..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt +++ /dev/null @@ -1,305 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.trace - -import mu.KotlinLogging -import org.apache.avro.generic.GenericData -import org.apache.hadoop.fs.Path -import org.apache.parquet.avro.AvroParquetReader -import org.apache.parquet.filter2.compat.FilterCompat -import org.apache.parquet.filter2.predicate.FilterApi -import org.apache.parquet.filter2.predicate.Statistics -import org.apache.parquet.filter2.predicate.UserDefinedPredicate -import org.apache.parquet.io.api.Binary -import org.opendc.compute.core.workload.VmWorkload -import org.opendc.compute.simulator.SimWorkloadImage -import org.opendc.core.User -import org.opendc.format.trace.TraceEntry -import org.opendc.format.trace.TraceReader -import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel -import org.opendc.simulator.compute.workload.SimTraceWorkload -import java.io.File -import java.io.Serializable -import java.util.SortedSet -import java.util.TreeSet -import java.util.UUID -import java.util.concurrent.ArrayBlockingQueue -import kotlin.concurrent.thread -import kotlin.random.Random - -private val logger = KotlinLogging.logger {} - -/** - * A [TraceReader] for the internal VM workload trace format that streams workloads on the fly. - * - * @param traceFile The directory of the traces. - * @param performanceInterferenceModel The performance model covering the workload in the VM trace. - */ -@OptIn(ExperimentalStdlibApi::class) -public class Sc20StreamingParquetTraceReader( - traceFile: File, - performanceInterferenceModel: PerformanceInterferenceModel, - selectedVms: List, - random: Random -) : TraceReader { - /** - * The internal iterator to use for this reader. - */ - private val iterator: Iterator> - - /** - * The intermediate buffer to store the read records in. - */ - private val queue = ArrayBlockingQueue>(1024) - - /** - * An optional filter for filtering the selected VMs - */ - private val filter = - if (selectedVms.isEmpty()) - null - else - FilterCompat.get( - FilterApi.userDefined( - FilterApi.binaryColumn("id"), - SelectedVmFilter( - TreeSet(selectedVms) - ) - ) - ) - - /** - * A poisonous fragment. - */ - private val poison = Pair("\u0000", SimTraceWorkload.Fragment(0, 0.0, 0)) - - /** - * The thread to read the records in. - */ - private val readerThread = thread(start = true, name = "sc20-reader") { - val reader = AvroParquetReader.builder(Path(traceFile.absolutePath, "trace.parquet")) - .disableCompatibility() - .run { if (filter != null) withFilter(filter) else this } - .build() - - try { - while (true) { - val record = reader.read() - - if (record == null) { - queue.put(poison) - break - } - - val id = record["id"].toString() - val tick = record["time"] as Long - val duration = record["duration"] as Long - val cores = record["cores"] as Int - val cpuUsage = record["cpuUsage"] as Double - val flops = record["flops"] as Long - - val fragment = SimTraceWorkload.Fragment( - duration, - cpuUsage, - cores - ) - - queue.put(id to fragment) - } - } catch (e: InterruptedException) { - // Do not rethrow this - } finally { - reader.close() - } - } - - /** - * Fill the buffers with the VMs - */ - private fun pull(buffers: Map>>) { - if (!hasNext) { - return - } - - val fragments = mutableListOf>() - queue.drainTo(fragments) - - for ((id, fragment) in fragments) { - if (id == poison.first) { - hasNext = false - return - } - buffers[id]?.forEach { it.add(fragment) } - } - } - - /** - * A flag to indicate whether the reader has more entries. - */ - private var hasNext: Boolean = true - - /** - * Initialize the reader. - */ - init { - val takenIds = mutableSetOf() - val entries = mutableMapOf() - val buffers = mutableMapOf>>() - - val metaReader = AvroParquetReader.builder(Path(traceFile.absolutePath, "meta.parquet")) - .disableCompatibility() - .run { if (filter != null) withFilter(filter) else this } - .build() - - while (true) { - val record = metaReader.read() ?: break - val id = record["id"].toString() - entries[id] = record - } - - metaReader.close() - - val selection = if (selectedVms.isEmpty()) entries.keys else selectedVms - - // Create the entry iterator - iterator = selection.asSequence() - .mapNotNull { entries[it] } - .mapIndexed { index, record -> - val id = record["id"].toString() - val submissionTime = record["submissionTime"] as Long - val endTime = record["endTime"] as Long - val maxCores = record["maxCores"] as Int - val requiredMemory = record["requiredMemory"] as Long - val uid = UUID.nameUUIDFromBytes("$id-$index".toByteArray()) - - assert(uid !in takenIds) - takenIds += uid - - logger.info("Processing VM $id") - - val internalBuffer = mutableListOf() - val externalBuffer = mutableListOf() - buffers.getOrPut(id) { mutableListOf() }.add(externalBuffer) - val fragments = sequence { - var time = submissionTime - repeat@ while (true) { - if (externalBuffer.isEmpty()) { - if (hasNext) { - pull(buffers) - continue - } else { - break - } - } - - internalBuffer.addAll(externalBuffer) - externalBuffer.clear() - - for (fragment in internalBuffer) { - yield(fragment) - - time += fragment.duration - if (time >= endTime) { - break@repeat - } - } - - internalBuffer.clear() - } - - buffers.remove(id) - } - val relevantPerformanceInterferenceModelItems = - PerformanceInterferenceModel( - performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSortedSet(), - Random(random.nextInt()) - ) - val vmWorkload = VmWorkload( - uid, - "VM Workload $id", - UnnamedUser, - SimWorkloadImage( - uid, - id, - mapOf( - IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems, - "cores" to maxCores, - "required-memory" to requiredMemory - ), - SimTraceWorkload(fragments), - ) - ) - - TraceEntryImpl( - submissionTime, - vmWorkload - ) - } - .sortedBy { it.submissionTime } - .toList() - .iterator() - } - - override fun hasNext(): Boolean = iterator.hasNext() - - override fun next(): TraceEntry = iterator.next() - - override fun close() { - readerThread.interrupt() - } - - private class SelectedVmFilter(val selectedVms: SortedSet) : UserDefinedPredicate(), Serializable { - override fun keep(value: Binary?): Boolean = value != null && selectedVms.contains(value.toStringUsingUTF8()) - - override fun canDrop(statistics: Statistics): Boolean { - val min = statistics.min - val max = statistics.max - - return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isEmpty() - } - - override fun inverseCanDrop(statistics: Statistics): Boolean { - val min = statistics.min - val max = statistics.max - - return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isNotEmpty() - } - } - - /** - * An unnamed user. - */ - private object UnnamedUser : User { - override val name: String = "" - override val uid: UUID = UUID.randomUUID() - } - - /** - * An entry in the trace. - */ - private data class TraceEntryImpl( - override var submissionTime: Long, - override val workload: VmWorkload - ) : TraceEntry -} diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20TraceConverter.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20TraceConverter.kt deleted file mode 100644 index 9a9598e7..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20TraceConverter.kt +++ /dev/null @@ -1,621 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.trace - -import com.github.ajalt.clikt.core.CliktCommand -import com.github.ajalt.clikt.parameters.arguments.argument -import com.github.ajalt.clikt.parameters.groups.OptionGroup -import com.github.ajalt.clikt.parameters.groups.groupChoice -import com.github.ajalt.clikt.parameters.options.convert -import com.github.ajalt.clikt.parameters.options.default -import com.github.ajalt.clikt.parameters.options.defaultLazy -import com.github.ajalt.clikt.parameters.options.option -import com.github.ajalt.clikt.parameters.options.required -import com.github.ajalt.clikt.parameters.options.split -import com.github.ajalt.clikt.parameters.types.file -import com.github.ajalt.clikt.parameters.types.long -import me.tongfei.progressbar.ProgressBar -import org.apache.avro.Schema -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData -import org.apache.hadoop.fs.Path -import org.apache.parquet.avro.AvroParquetWriter -import org.apache.parquet.hadoop.ParquetWriter -import org.apache.parquet.hadoop.metadata.CompressionCodecName -import org.opendc.format.trace.sc20.Sc20VmPlacementReader -import java.io.BufferedReader -import java.io.File -import java.io.FileReader -import java.util.Random -import kotlin.math.max -import kotlin.math.min - -/** - * Represents the command for converting traces - */ -public class TraceConverterCli : CliktCommand(name = "trace-converter") { - /** - * The directory where the trace should be stored. - */ - private val outputPath by option("-O", "--output", help = "path to store the trace") - .file(canBeFile = false, mustExist = false) - .defaultLazy { File("output") } - - /** - * The directory where the input trace is located. - */ - private val inputPath by argument("input", help = "path to the input trace") - .file(canBeFile = false) - - /** - * The input type of the trace. - */ - private val type by option("-t", "--type", help = "input type of trace").groupChoice( - "solvinity" to SolvinityConversion(), - "bitbrains" to BitbrainsConversion(), - "azure" to AzureConversion() - ) - - override fun run() { - val metaSchema = SchemaBuilder - .record("meta") - .namespace("org.opendc.format.sc20") - .fields() - .name("id").type().stringType().noDefault() - .name("submissionTime").type().longType().noDefault() - .name("endTime").type().longType().noDefault() - .name("maxCores").type().intType().noDefault() - .name("requiredMemory").type().longType().noDefault() - .endRecord() - val schema = SchemaBuilder - .record("trace") - .namespace("org.opendc.format.sc20") - .fields() - .name("id").type().stringType().noDefault() - .name("time").type().longType().noDefault() - .name("duration").type().longType().noDefault() - .name("cores").type().intType().noDefault() - .name("cpuUsage").type().doubleType().noDefault() - .name("flops").type().longType().noDefault() - .endRecord() - - val metaParquet = File(outputPath, "meta.parquet") - val traceParquet = File(outputPath, "trace.parquet") - - if (metaParquet.exists()) { - metaParquet.delete() - } - if (traceParquet.exists()) { - traceParquet.delete() - } - - val metaWriter = AvroParquetWriter.builder(Path(metaParquet.toURI())) - .withSchema(metaSchema) - .withCompressionCodec(CompressionCodecName.SNAPPY) - .withPageSize(4 * 1024 * 1024) // For compression - .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) - .build() - - val writer = AvroParquetWriter.builder(Path(traceParquet.toURI())) - .withSchema(schema) - .withCompressionCodec(CompressionCodecName.SNAPPY) - .withPageSize(4 * 1024 * 1024) // For compression - .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) - .build() - - try { - val type = type ?: throw IllegalArgumentException("Invalid trace conversion") - val allFragments = type.read(inputPath, metaSchema, metaWriter) - allFragments.sortWith(compareBy { it.tick }.thenBy { it.id }) - - for (fragment in allFragments) { - val record = GenericData.Record(schema) - record.put("id", fragment.id) - record.put("time", fragment.tick) - record.put("duration", fragment.duration) - record.put("cores", fragment.cores) - record.put("cpuUsage", fragment.usage) - record.put("flops", fragment.flops) - - writer.write(record) - } - } finally { - writer.close() - metaWriter.close() - } - } -} - -/** - * The supported trace conversions. - */ -public sealed class TraceConversion(name: String) : OptionGroup(name) { - /** - * Read the fragments of the trace. - */ - public abstract fun read( - traceDirectory: File, - metaSchema: Schema, - metaWriter: ParquetWriter - ): MutableList -} - -public class SolvinityConversion : TraceConversion("Solvinity") { - private val clusters by option() - .split(",") - - private val vmPlacements by option("--vm-placements", help = "file containing the VM placements") - .file(canBeDir = false) - .convert { it.inputStream().buffered().use { Sc20VmPlacementReader(it).construct() } } - .required() - - override fun read( - traceDirectory: File, - metaSchema: Schema, - metaWriter: ParquetWriter - ): MutableList { - val clusters = clusters?.toSet() ?: emptySet() - val timestampCol = 0 - val cpuUsageCol = 1 - val coreCol = 12 - val provisionedMemoryCol = 20 - val traceInterval = 5 * 60 * 1000L - - // Identify start time of the entire trace - var minTimestamp = Long.MAX_VALUE - traceDirectory.walk() - .filterNot { it.isDirectory } - .filter { it.extension == "csv" || it.extension == "txt" } - .toList() - .forEach file@{ vmFile -> - BufferedReader(FileReader(vmFile)).use { reader -> - reader.lineSequence() - .chunked(128) - .forEach { lines -> - for (line in lines) { - // Ignore comments in the trace - if (line.startsWith("#") || line.isBlank()) { - continue - } - - val vmId = vmFile.name - - // Check if VM in topology - val clusterName = vmPlacements[vmId] - if (clusterName == null || !clusters.contains(clusterName)) { - continue - } - - val values = line.split("\t") - val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - - if (timestamp < minTimestamp) { - minTimestamp = timestamp - } - return@file - } - } - } - } - - println("Start of trace at $minTimestamp") - - val allFragments = mutableListOf() - - val begin = 15 * 24 * 60 * 60 * 1000L - val end = 45 * 24 * 60 * 60 * 1000L - - traceDirectory.walk() - .filterNot { it.isDirectory } - .filter { it.extension == "csv" || it.extension == "txt" } - .toList() - .forEach { vmFile -> - println(vmFile) - - var vmId = "" - var maxCores = -1 - var requiredMemory = -1L - var cores: Int - var minTime = Long.MAX_VALUE - - val flopsFragments = sequence { - var last: Fragment? = null - - BufferedReader(FileReader(vmFile)).use { reader -> - reader.lineSequence() - .chunked(128) - .forEach { lines -> - for (line in lines) { - // Ignore comments in the trace - if (line.startsWith("#") || line.isBlank()) { - continue - } - - val values = line.split("\t") - - vmId = vmFile.name - - // Check if VM in topology - val clusterName = vmPlacements[vmId] - if (clusterName == null || !clusters.contains(clusterName)) { - continue - } - - val timestamp = - (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - minTimestamp - if (begin > timestamp || timestamp > end) { - continue - } - - cores = values[coreCol].trim().toInt() - requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) - maxCores = max(maxCores, cores) - minTime = min(minTime, timestamp) - val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz - requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) - maxCores = max(maxCores, cores) - - val flops: Long = (cpuUsage * 5 * 60).toLong() - - last = if (last != null && last!!.flops == 0L && flops == 0L) { - val oldFragment = last!! - Fragment( - vmId, - oldFragment.tick, - oldFragment.flops + flops, - oldFragment.duration + traceInterval, - cpuUsage, - cores - ) - } else { - val fragment = - Fragment( - vmId, - timestamp, - flops, - traceInterval, - cpuUsage, - cores - ) - if (last != null) { - yield(last!!) - } - fragment - } - } - } - } - - if (last != null) { - yield(last!!) - } - } - - var maxTime = Long.MIN_VALUE - flopsFragments.filter { it.tick in begin until end }.forEach { fragment -> - allFragments.add(fragment) - maxTime = max(maxTime, fragment.tick) - } - - if (minTime in begin until end) { - val metaRecord = GenericData.Record(metaSchema) - metaRecord.put("id", vmId) - metaRecord.put("submissionTime", minTime) - metaRecord.put("endTime", maxTime) - metaRecord.put("maxCores", maxCores) - metaRecord.put("requiredMemory", requiredMemory) - metaWriter.write(metaRecord) - } - } - - return allFragments - } -} - -/** - * Conversion of the Bitbrains public trace. - */ -public class BitbrainsConversion : TraceConversion("Bitbrains") { - override fun read( - traceDirectory: File, - metaSchema: Schema, - metaWriter: ParquetWriter - ): MutableList { - val timestampCol = 0 - val cpuUsageCol = 3 - val coreCol = 1 - val provisionedMemoryCol = 5 - val traceInterval = 5 * 60 * 1000L - - val allFragments = mutableListOf() - - traceDirectory.walk() - .filterNot { it.isDirectory } - .filter { it.extension == "csv" || it.extension == "txt" } - .toList() - .forEach { vmFile -> - println(vmFile) - - var vmId = "" - var maxCores = -1 - var requiredMemory = -1L - var cores: Int - var minTime = Long.MAX_VALUE - - val flopsFragments = sequence { - var last: Fragment? = null - - BufferedReader(FileReader(vmFile)).use { reader -> - reader.lineSequence() - .drop(1) - .chunked(128) - .forEach { lines -> - for (line in lines) { - // Ignore comments in the trace - if (line.startsWith("#") || line.isBlank()) { - continue - } - - val values = line.split(";\t") - - vmId = vmFile.name - - val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - - cores = values[coreCol].trim().toInt() - val provisionedMemory = values[provisionedMemoryCol].trim().toDouble() // KB - requiredMemory = max(requiredMemory, (provisionedMemory / 1000).toLong()) - maxCores = max(maxCores, cores) - minTime = min(minTime, timestamp) - val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz - - val flops: Long = (cpuUsage * 5 * 60).toLong() - - last = if (last != null && last!!.flops == 0L && flops == 0L) { - val oldFragment = last!! - Fragment( - vmId, - oldFragment.tick, - oldFragment.flops + flops, - oldFragment.duration + traceInterval, - cpuUsage, - cores - ) - } else { - val fragment = - Fragment( - vmId, - timestamp, - flops, - traceInterval, - cpuUsage, - cores - ) - if (last != null) { - yield(last!!) - } - fragment - } - } - } - } - - if (last != null) { - yield(last!!) - } - } - - var maxTime = Long.MIN_VALUE - flopsFragments.forEach { fragment -> - allFragments.add(fragment) - maxTime = max(maxTime, fragment.tick) - } - - val metaRecord = GenericData.Record(metaSchema) - metaRecord.put("id", vmId) - metaRecord.put("submissionTime", minTime) - metaRecord.put("endTime", maxTime) - metaRecord.put("maxCores", maxCores) - metaRecord.put("requiredMemory", requiredMemory) - metaWriter.write(metaRecord) - } - - return allFragments - } -} - -/** - * Conversion of the Azure public VM trace. - */ -public class AzureConversion : TraceConversion("Azure") { - private val seed by option(help = "seed for trace sampling") - .long() - .default(0) - - override fun read( - traceDirectory: File, - metaSchema: Schema, - metaWriter: ParquetWriter - ): MutableList { - val random = Random(seed) - val fraction = 0.01 - - // Read VM table - val vmIdTableCol = 0 - val coreTableCol = 9 - val provisionedMemoryTableCol = 10 - - var vmId: String - var cores: Int - var requiredMemory: Long - - val vmIds = mutableSetOf() - val vmIdToMetadata = mutableMapOf() - - BufferedReader(FileReader(File(traceDirectory, "vmtable.csv"))).use { reader -> - reader.lineSequence() - .chunked(1024) - .forEach { lines -> - for (line in lines) { - // Ignore comments in the trace - if (line.startsWith("#") || line.isBlank()) { - continue - } - // Sample only a fraction of the VMs - if (random.nextDouble() > fraction) { - continue - } - - val values = line.split(",") - - // Exclude VMs with a large number of cores (not specified exactly) - if (values[coreTableCol].contains(">")) { - continue - } - - vmId = values[vmIdTableCol].trim() - cores = values[coreTableCol].trim().toInt() - requiredMemory = values[provisionedMemoryTableCol].trim().toInt() * 1_000L // GB -> MB - - vmIds.add(vmId) - vmIdToMetadata[vmId] = VmInfo(cores, requiredMemory, Long.MAX_VALUE, -1L) - } - } - } - - // Read VM metric reading files - val timestampCol = 0 - val vmIdCol = 1 - val cpuUsageCol = 4 - val traceInterval = 5 * 60 * 1000L - - val vmIdToFragments = mutableMapOf>() - val vmIdToLastFragment = mutableMapOf() - val allFragments = mutableListOf() - - for (i in ProgressBar.wrap((1..195).toList(), "Reading Trace")) { - val readingsFile = File(File(traceDirectory, "readings"), "readings-$i.csv") - var timestamp: Long - var cpuUsage: Double - - BufferedReader(FileReader(readingsFile)).use { reader -> - reader.lineSequence() - .chunked(128) - .forEach { lines -> - for (line in lines) { - // Ignore comments in the trace - if (line.startsWith("#") || line.isBlank()) { - continue - } - - val values = line.split(",") - vmId = values[vmIdCol].trim() - - // Ignore readings for VMs not in the sample - if (!vmIds.contains(vmId)) { - continue - } - - timestamp = values[timestampCol].trim().toLong() * 1000L - vmIdToMetadata[vmId]!!.minTime = min(vmIdToMetadata[vmId]!!.minTime, timestamp) - cpuUsage = values[cpuUsageCol].trim().toDouble() * 3_000 // MHz - vmIdToMetadata[vmId]!!.maxTime = max(vmIdToMetadata[vmId]!!.maxTime, timestamp) - - val flops: Long = (cpuUsage * 5 * 60).toLong() - val lastFragment = vmIdToLastFragment[vmId] - - vmIdToLastFragment[vmId] = - if (lastFragment != null && lastFragment.flops == 0L && flops == 0L) { - Fragment( - vmId, - lastFragment.tick, - lastFragment.flops + flops, - lastFragment.duration + traceInterval, - cpuUsage, - vmIdToMetadata[vmId]!!.cores - ) - } else { - val fragment = - Fragment( - vmId, - timestamp, - flops, - traceInterval, - cpuUsage, - vmIdToMetadata[vmId]!!.cores - ) - if (lastFragment != null) { - if (vmIdToFragments[vmId] == null) { - vmIdToFragments[vmId] = mutableListOf() - } - vmIdToFragments[vmId]!!.add(lastFragment) - allFragments.add(lastFragment) - } - fragment - } - } - } - } - } - - for (entry in vmIdToLastFragment) { - if (entry.value != null) { - if (vmIdToFragments[entry.key] == null) { - vmIdToFragments[entry.key] = mutableListOf() - } - vmIdToFragments[entry.key]!!.add(entry.value!!) - } - } - - println("Read ${vmIdToLastFragment.size} VMs") - - for (entry in vmIdToMetadata) { - val metaRecord = GenericData.Record(metaSchema) - metaRecord.put("id", entry.key) - metaRecord.put("submissionTime", entry.value.minTime) - metaRecord.put("endTime", entry.value.maxTime) - println("${entry.value.minTime} - ${entry.value.maxTime}") - metaRecord.put("maxCores", entry.value.cores) - metaRecord.put("requiredMemory", entry.value.requiredMemory) - metaWriter.write(metaRecord) - } - - return allFragments - } -} - -public data class Fragment( - public val id: String, - public val tick: Long, - public val flops: Long, - public val duration: Long, - public val usage: Double, - public val cores: Int -) - -public class VmInfo(public val cores: Int, public val requiredMemory: Long, public var minTime: Long, public var maxTime: Long) - -/** - * A script to convert a trace in text format into a Parquet trace. - */ -public fun main(args: Array): Unit = TraceConverterCli().main(args) diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/WorkloadSampler.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/WorkloadSampler.kt deleted file mode 100644 index a8b83aef..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/WorkloadSampler.kt +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20.trace - -import mu.KotlinLogging -import org.opendc.compute.core.workload.VmWorkload -import org.opendc.compute.simulator.SimWorkloadImage -import org.opendc.experiments.sc20.experiment.model.CompositeWorkload -import org.opendc.experiments.sc20.experiment.model.SamplingStrategy -import org.opendc.experiments.sc20.experiment.model.Workload -import org.opendc.format.trace.TraceEntry -import java.util.* -import kotlin.random.Random - -private val logger = KotlinLogging.logger {} - -/** - * Sample the workload for the specified [run]. - */ -public fun sampleWorkload( - trace: List>, - workload: Workload, - subWorkload: Workload, - seed: Int -): List> { - return when { - workload is CompositeWorkload -> sampleRegularWorkload(trace, workload, subWorkload, seed) - workload.samplingStrategy == SamplingStrategy.HPC -> - sampleHpcWorkload(trace, workload, seed, sampleOnLoad = false) - workload.samplingStrategy == SamplingStrategy.HPC_LOAD -> - sampleHpcWorkload(trace, workload, seed, sampleOnLoad = true) - else -> - sampleRegularWorkload(trace, workload, workload, seed) - } -} - -/** - * Sample a regular (non-HPC) workload. - */ -public fun sampleRegularWorkload( - trace: List>, - workload: Workload, - subWorkload: Workload, - seed: Int -): List> { - val fraction = subWorkload.fraction - - val shuffled = trace.shuffled(Random(seed)) - val res = mutableListOf>() - val totalLoad = if (workload is CompositeWorkload) { - workload.totalLoad - } else { - shuffled.sumByDouble { it.workload.image.tags.getValue("total-load") as Double } - } - var currentLoad = 0.0 - - for (entry in shuffled) { - val entryLoad = entry.workload.image.tags.getValue("total-load") as Double - if ((currentLoad + entryLoad) / totalLoad > fraction) { - break - } - - currentLoad += entryLoad - res += entry - } - - logger.info { "Sampled ${trace.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } - - return res -} - -/** - * Sample a HPC workload. - */ -public fun sampleHpcWorkload( - trace: List>, - workload: Workload, - seed: Int, - sampleOnLoad: Boolean -): List> { - val pattern = Regex("^vm__workload__(ComputeNode|cn).*") - val random = Random(seed) - - val fraction = workload.fraction - val (hpc, nonHpc) = trace.partition { entry -> - val name = entry.workload.image.name - name.matches(pattern) - } - - val hpcSequence = generateSequence(0) { it + 1 } - .map { index -> - val res = mutableListOf>() - hpc.mapTo(res) { sample(it, index) } - res.shuffle(random) - res - } - .flatten() - - val nonHpcSequence = generateSequence(0) { it + 1 } - .map { index -> - val res = mutableListOf>() - nonHpc.mapTo(res) { sample(it, index) } - res.shuffle(random) - res - } - .flatten() - - logger.debug { "Found ${hpc.size} HPC workloads and ${nonHpc.size} non-HPC workloads" } - - val totalLoad = if (workload is CompositeWorkload) { - workload.totalLoad - } else { - trace.sumByDouble { it.workload.image.tags.getValue("total-load") as Double } - } - - logger.debug { "Total trace load: $totalLoad" } - var hpcCount = 0 - var hpcLoad = 0.0 - var nonHpcCount = 0 - var nonHpcLoad = 0.0 - - val res = mutableListOf>() - - if (sampleOnLoad) { - var currentLoad = 0.0 - for (entry in hpcSequence) { - val entryLoad = entry.workload.image.tags.getValue("total-load") as Double - if ((currentLoad + entryLoad) / totalLoad > fraction) { - break - } - - hpcLoad += entryLoad - hpcCount += 1 - currentLoad += entryLoad - res += entry - } - - for (entry in nonHpcSequence) { - val entryLoad = entry.workload.image.tags.getValue("total-load") as Double - if ((currentLoad + entryLoad) / totalLoad > 1) { - break - } - - nonHpcLoad += entryLoad - nonHpcCount += 1 - currentLoad += entryLoad - res += entry - } - } else { - hpcSequence - .take((fraction * trace.size).toInt()) - .forEach { entry -> - hpcLoad += entry.workload.image.tags.getValue("total-load") as Double - hpcCount += 1 - res.add(entry) - } - - nonHpcSequence - .take(((1 - fraction) * trace.size).toInt()) - .forEach { entry -> - nonHpcLoad += entry.workload.image.tags.getValue("total-load") as Double - nonHpcCount += 1 - res.add(entry) - } - } - - logger.debug { "HPC $hpcCount (load $hpcLoad) and non-HPC $nonHpcCount (load $nonHpcLoad)" } - logger.debug { "Total sampled load: ${hpcLoad + nonHpcLoad}" } - logger.info { "Sampled ${trace.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } - - return res -} - -/** - * Sample a random trace entry. - */ -private fun sample(entry: TraceEntry, i: Int): TraceEntry { - val id = UUID.nameUUIDFromBytes("${entry.workload.image.uid}-$i".toByteArray()) - val image = SimWorkloadImage( - id, - entry.workload.image.name, - entry.workload.image.tags, - (entry.workload.image as SimWorkloadImage).workload - ) - val vmWorkload = entry.workload.copy(uid = id, image = image, name = entry.workload.name) - return VmTraceEntry(vmWorkload, entry.submissionTime) -} - -private class VmTraceEntry(override val workload: VmWorkload, override val submissionTime: Long) : - TraceEntry diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/resources/log4j2.xml b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/resources/log4j2.xml deleted file mode 100644 index 8029092e..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/resources/log4j2.xml +++ /dev/null @@ -1,49 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt deleted file mode 100644 index c5ad345d..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt +++ /dev/null @@ -1,256 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc20 - -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.cancel -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.launch -import kotlinx.coroutines.test.TestCoroutineScope -import org.junit.jupiter.api.AfterEach -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.BeforeEach -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertAll -import org.opendc.compute.core.Server -import org.opendc.compute.core.workload.VmWorkload -import org.opendc.compute.simulator.SimVirtProvisioningService -import org.opendc.compute.simulator.allocation.AvailableCoreMemoryAllocationPolicy -import org.opendc.experiments.sc20.experiment.attachMonitor -import org.opendc.experiments.sc20.experiment.createFailureDomain -import org.opendc.experiments.sc20.experiment.createProvisioner -import org.opendc.experiments.sc20.experiment.model.Workload -import org.opendc.experiments.sc20.experiment.monitor.ExperimentMonitor -import org.opendc.experiments.sc20.experiment.processTrace -import org.opendc.experiments.sc20.trace.Sc20ParquetTraceReader -import org.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader -import org.opendc.format.environment.EnvironmentReader -import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader -import org.opendc.format.trace.TraceReader -import org.opendc.simulator.utils.DelayControllerClockAdapter -import org.opendc.trace.core.EventTracer -import java.io.File -import java.time.Clock - -/** - * An integration test suite for the SC20 experiments. - */ -@OptIn(ExperimentalCoroutinesApi::class) -class Sc20IntegrationTest { - /** - * The [TestCoroutineScope] to use. - */ - private lateinit var testScope: TestCoroutineScope - - /** - * The simulation clock to use. - */ - private lateinit var clock: Clock - - /** - * The monitor used to keep track of the metrics. - */ - private lateinit var monitor: TestExperimentReporter - - /** - * Setup the experimental environment. - */ - @BeforeEach - fun setUp() { - testScope = TestCoroutineScope() - clock = DelayControllerClockAdapter(testScope) - - monitor = TestExperimentReporter() - } - - /** - * Tear down the experimental environment. - */ - @AfterEach - fun tearDown() = testScope.cleanupTestCoroutines() - - @Test - fun testLarge() { - val failures = false - val seed = 0 - val chan = Channel(Channel.CONFLATED) - val allocationPolicy = AvailableCoreMemoryAllocationPolicy() - val traceReader = createTestTraceReader() - val environmentReader = createTestEnvironmentReader() - lateinit var scheduler: SimVirtProvisioningService - val tracer = EventTracer(clock) - - testScope.launch { - val res = createProvisioner( - this, - clock, - environmentReader, - allocationPolicy, - tracer - ) - val bareMetalProvisioner = res.first - scheduler = res.second - - val failureDomain = if (failures) { - println("ENABLING failures") - createFailureDomain( - this, - clock, - seed, - 24.0 * 7, - bareMetalProvisioner, - chan - ) - } else { - null - } - - attachMonitor(this, clock, scheduler, monitor) - processTrace( - this, - clock, - traceReader, - scheduler, - chan, - monitor - ) - - println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") - - failureDomain?.cancel() - scheduler.terminate() - monitor.close() - } - - runSimulation() - - // Note that these values have been verified beforehand - assertAll( - { assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") }, - { assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") }, - { assertEquals(1684849230562, monitor.totalRequestedBurst) }, - { assertEquals(447612683996, monitor.totalGrantedBurst) }, - { assertEquals(1219535757406, monitor.totalOvercommissionedBurst) }, - { assertEquals(0, monitor.totalInterferedBurst) } - ) - } - - @Test - fun testSmall() { - val seed = 1 - val chan = Channel(Channel.CONFLATED) - val allocationPolicy = AvailableCoreMemoryAllocationPolicy() - val traceReader = createTestTraceReader(0.5, seed) - val environmentReader = createTestEnvironmentReader("single") - lateinit var scheduler: SimVirtProvisioningService - val tracer = EventTracer(clock) - - testScope.launch { - val res = createProvisioner( - this, - clock, - environmentReader, - allocationPolicy, - tracer - ) - scheduler = res.second - - attachMonitor(this, clock, scheduler, monitor) - processTrace( - this, - clock, - traceReader, - scheduler, - chan, - monitor - ) - - println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") - - scheduler.terminate() - monitor.close() - } - - runSimulation() - - // Note that these values have been verified beforehand - assertAll( - { assertEquals(705128393966, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, - { assertEquals(173489747029, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, - { assertEquals(526858997740, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, - { assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } } - ) - } - - /** - * Run the simulation. - */ - private fun runSimulation() = testScope.advanceUntilIdle() - - /** - * Obtain the trace reader for the test. - */ - private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader { - return Sc20ParquetTraceReader( - listOf(Sc20RawParquetTraceReader(File("src/test/resources/trace"))), - emptyMap(), - Workload("test", fraction), - seed - ) - } - - /** - * Obtain the environment reader for the test. - */ - private fun createTestEnvironmentReader(name: String = "topology"): EnvironmentReader { - val stream = object {}.javaClass.getResourceAsStream("/env/$name.txt") - return Sc20ClusterEnvironmentReader(stream) - } - - class TestExperimentReporter : ExperimentMonitor { - var totalRequestedBurst = 0L - var totalGrantedBurst = 0L - var totalOvercommissionedBurst = 0L - var totalInterferedBurst = 0L - - override fun reportHostSlice( - time: Long, - requestedBurst: Long, - grantedBurst: Long, - overcommissionedBurst: Long, - interferedBurst: Long, - cpuUsage: Double, - cpuDemand: Double, - numberOfDeployedImages: Int, - hostServer: Server, - duration: Long - ) { - totalRequestedBurst += requestedBurst - totalGrantedBurst += grantedBurst - totalOvercommissionedBurst += overcommissionedBurst - totalInterferedBurst += interferedBurst - } - - override fun close() {} - } -} diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/test/resources/env/single.txt b/simulator/opendc-experiments/opendc-experiments-sc20/src/test/resources/env/single.txt deleted file mode 100644 index 53b3c2d7..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/test/resources/env/single.txt +++ /dev/null @@ -1,3 +0,0 @@ -ClusterID;ClusterName;Cores;Speed;Memory;numberOfHosts;memoryCapacityPerHost;coreCountPerHost -A01;A01;8;3.2;64;1;64;8 - diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/test/resources/env/topology.txt b/simulator/opendc-experiments/opendc-experiments-sc20/src/test/resources/env/topology.txt deleted file mode 100644 index 6b347bff..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc20/src/test/resources/env/topology.txt +++ /dev/null @@ -1,5 +0,0 @@ -ClusterID;ClusterName;Cores;Speed;Memory;numberOfHosts;memoryCapacityPerHost;coreCountPerHost -A01;A01;32;3.2;2048;1;256;32 -B01;B01;48;2.93;1256;6;64;8 -C01;C01;32;3.2;2048;2;128;16 - diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/test/resources/trace/meta.parquet b/simulator/opendc-experiments/opendc-experiments-sc20/src/test/resources/trace/meta.parquet deleted file mode 100644 index ce7a812c..00000000 Binary files a/simulator/opendc-experiments/opendc-experiments-sc20/src/test/resources/trace/meta.parquet and /dev/null differ diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/test/resources/trace/trace.parquet b/simulator/opendc-experiments/opendc-experiments-sc20/src/test/resources/trace/trace.parquet deleted file mode 100644 index 1d7ce882..00000000 Binary files a/simulator/opendc-experiments/opendc-experiments-sc20/src/test/resources/trace/trace.parquet and /dev/null differ -- cgit v1.2.3 From 15fcd1a10018605f59ca7a644b8f3b3960e7b6b0 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 23 Feb 2021 21:40:53 +0100 Subject: Use Java Platform for shared dependency constraints This change uses the Java Platform functionality from Gradle to enable shared dependency constraints across modules. --- simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts | 1 + simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts | 1 + 2 files changed, 2 insertions(+) (limited to 'simulator/opendc-experiments') diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts index 041cf36a..636f291c 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts +++ b/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts @@ -30,6 +30,7 @@ plugins { } dependencies { + api(platform(project(":opendc-platform"))) api(project(":opendc-core")) api(project(":opendc-harness")) implementation(project(":opendc-format")) diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts b/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts index f4d947d8..00aa0395 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts +++ b/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts @@ -29,6 +29,7 @@ plugins { } dependencies { + api(platform(project(":opendc-platform"))) api(project(":opendc-core")) api(project(":opendc-harness")) implementation(project(":opendc-format")) -- cgit v1.2.3