summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-19 14:33:05 +0200
committerGitHub <noreply@github.com>2021-09-19 14:33:05 +0200
commit453c25c4b453fa0af26bebbd8863abfb79218119 (patch)
tree7977e81ec34ce7f57d8b14a717ccb63f54cd03cb
parentc1b9719aad10566c9d17f9eb757236c58a602b89 (diff)
parent76a0f8889a4990108bc7906556dec6381647404b (diff)
merge: Enable re-use of virtual machine workload helpers
This pull request enables re-use of virtual machine workload helpers by extracting the helpers into a separate module which may be used by other experiments. - Support workload/machine CPU count mismatch - Extract common code out of Capelin experiments - Support flexible topology creation - Add option for optimizing SimHost simulation - Support creating CPU-optimized topology - Make workload sampling model extensible - Add support for extended Bitbrains trace format - Add support for Azure VM trace format - Add support for internal OpenDC VM trace format - Optimize OpenDC VM trace format - Add tool for converting workload traces - Remove dependency on SnakeYaml **Breaking API Changes** - `RESOURCE_NCPU` and `RESOURCE_STATE_NCPU` are renamed to `RESOURCE_CPU_COUNT` and `RESOURCE_STATE_CPU_COUNT` respectively.
-rw-r--r--gradle/libs.versions.toml3
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt24
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt2
-rw-r--r--opendc-compute/opendc-compute-workload/build.gradle.kts (renamed from opendc-experiments/opendc-experiments-radice/build.gradle.kts)22
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceReader.kt)17
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt)91
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt)145
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt62
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModel.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt)7
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModels.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt)41
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceEntry.kt)35
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt)8
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetExportMonitor.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt)4
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt)9
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt)10
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt)7
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt66
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt143
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt61
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt37
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/HostSpec.kt48
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/Topology.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/EnvironmentReader.kt)12
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/TopologyHelpers.kt36
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReader.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReader.kt)16
-rw-r--r--opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReaderTest.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt)2
-rw-r--r--opendc-compute/opendc-compute-workload/src/test/resources/perf-interference.json (renamed from opendc-experiments/opendc-experiments-capelin/src/test/resources/perf-interference.json)0
-rw-r--r--opendc-experiments/opendc-experiments-capelin/build.gradle.kts10
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt28
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt10
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt18
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt10
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt10
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt54
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt3
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt3
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt120
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt23
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/ClusterSpec.kt46
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/ClusterSpecReader.kt121
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt103
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt62
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt261
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt198
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/Schemas.kt55
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/VmPlacementReader.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt)2
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml2
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt126
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/meta.parquetbin0 -> 2099 bytes
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/trace.parquetbin0 -> 1125930 bytes
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquetbin2081 -> 0 bytes
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquetbin1647189 -> 0 bytes
-rw-r--r--opendc-simulator/opendc-simulator-compute/build.gradle.kts1
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/InterpolationPowerModel.kt13
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt6
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/power/PowerModelTest.kt3
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt2
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt2
-rw-r--r--opendc-trace/opendc-trace-azure/build.gradle.kts (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/MachineDef.kt)26
-rw-r--r--opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTable.kt)6
-rw-r--r--opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTableReader.kt)4
-rw-r--r--opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTable.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTable.kt)4
-rw-r--r--opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTableReader.kt)11
-rw-r--r--opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTrace.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTrace.kt)4
-rw-r--r--opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTraceFormat.kt)6
-rw-r--r--opendc-trace/opendc-trace-azure/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat1
-rw-r--r--opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt113
-rw-r--r--opendc-trace/opendc-trace-azure/src/test/resources/trace/vm_cpu_readings/vm_cpu_readings-file-1-of-125.csv100
-rw-r--r--opendc-trace/opendc-trace-azure/src/test/resources/trace/vmtable/vmtable.csv10
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt)14
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt)12
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTrace.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTrace.kt)8
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTraceFormat.kt)10
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt4
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt6
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat1
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt94
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/test/resources/vm.txt2
-rw-r--r--opendc-trace/opendc-trace-opendc/build.gradle.kts39
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt)10
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTableReader.kt)62
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt)12
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt)59
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTrace.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTrace.kt)12
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt82
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat1
-rw-r--r--opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt121
-rw-r--r--opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.0/meta.parquetbin0 -> 1582 bytes
-rw-r--r--opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.0/trace.parquetbin0 -> 83524 bytes
-rw-r--r--opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/meta.parquetbin0 -> 1679 bytes
-rw-r--r--opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/trace.parquetbin0 -> 65174 bytes
-rw-r--r--opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/AvroUtils.kt)8
-rw-r--r--opendc-trace/opendc-trace-tools/build.gradle.kts (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTraceFormat.kt)42
-rw-r--r--opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt)107
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt145
-rw-r--r--settings.gradle.kts4
-rw-r--r--traces/bitbrains-small/meta.parquetbin2140 -> 2099 bytes
-rw-r--r--traces/bitbrains-small/trace.parquetbin1610917 -> 1125930 bytes
97 files changed, 2008 insertions, 1332 deletions
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 3f0e180b..82da905c 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -20,7 +20,6 @@ parquet = "1.12.0"
progressbar = "0.9.0"
sentry = "5.1.2"
slf4j = "1.7.32"
-yaml = "1.29"
[libraries]
# Kotlin
@@ -53,11 +52,11 @@ progressbar = { module = "me.tongfei:progressbar", version.ref = "progressbar" }
# Format
jackson-core = { module = "com.fasterxml.jackson.core:jackson-core", version.ref = "jackson" }
+jackson-databind = { module = "com.fasterxml.jackson.core:jackson-databind", version.ref = "jackson" }
jackson-module-kotlin = { module = "com.fasterxml.jackson.module:jackson-module-kotlin", version.ref = "jackson" }
jackson-datatype-jsr310 = { module = "com.fasterxml.jackson.datatype:jackson-datatype-jsr310", version.ref = "jackson" }
jackson-dataformat-csv = { module = "com.fasterxml.jackson.dataformat:jackson-dataformat-csv", version.ref = "jackson" }
parquet = { module = "org.apache.parquet:parquet-avro", version.ref = "parquet" }
-yaml = { module = "org.yaml:snakeyaml", version.ref = "yaml" }
config = { module = "com.typesafe:config", version.ref = "config" }
# HTTP client
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index 793db907..ff55c585 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -68,7 +68,8 @@ public class SimHost(
scalingGovernor: ScalingGovernor = PerformanceScalingGovernor(),
powerDriver: PowerDriver = SimplePowerDriver(ConstantPowerModel(0.0)),
private val mapper: SimWorkloadMapper = SimMetaWorkloadMapper(),
- interferenceDomain: VmInterferenceDomain? = null
+ interferenceDomain: VmInterferenceDomain? = null,
+ private val optimize: Boolean = false
) : Host, AutoCloseable {
/**
* The [CoroutineScope] of the host bounded by the lifecycle of the host.
@@ -98,7 +99,7 @@ public class SimHost(
/**
* The machine to run on.
*/
- public val machine: SimBareMetalMachine = SimBareMetalMachine(interpreter, model, powerDriver)
+ public val machine: SimBareMetalMachine = SimBareMetalMachine(interpreter, model.optimize(), powerDriver)
/**
* The hypervisor to run multiple workloads.
@@ -319,6 +320,25 @@ public class SimHost(
val processingUnits = (0 until cpuCount).map { originalCpu.copy(id = it, node = processingNode) }
val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize))
+ return MachineModel(processingUnits, memoryUnits).optimize()
+ }
+
+ /**
+ * Optimize the [MachineModel] for simulation.
+ */
+ private fun MachineModel.optimize(): MachineModel {
+ if (!optimize) {
+ return this
+ }
+
+ val originalCpu = cpus[0]
+ val freq = cpus.sumOf { it.frequency }
+ val processingNode = originalCpu.node.copy(coreCount = 1)
+ val processingUnits = listOf(originalCpu.copy(frequency = freq, node = processingNode))
+
+ val memorySize = memory.sumOf { it.size }
+ val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize))
+
return MachineModel(processingUnits, memoryUnits)
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt
index 87903623..fcd9dd7e 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt
@@ -24,8 +24,8 @@ package org.opendc.compute.simulator.failure
import org.apache.commons.math3.distribution.RealDistribution
import org.opendc.compute.simulator.SimHost
+import java.util.*
import kotlin.math.roundToInt
-import kotlin.random.Random
/**
* A [VictimSelector] that stochastically selects a set of hosts to be failed.
diff --git a/opendc-experiments/opendc-experiments-radice/build.gradle.kts b/opendc-compute/opendc-compute-workload/build.gradle.kts
index 0c716183..e82cf203 100644
--- a/opendc-experiments/opendc-experiments-radice/build.gradle.kts
+++ b/opendc-compute/opendc-compute-workload/build.gradle.kts
@@ -20,28 +20,28 @@
* SOFTWARE.
*/
-description = "Experiments for the Risk Analysis work"
+description = "Support library for simulating VM-based workloads with OpenDC"
/* Build configuration */
plugins {
- `experiment-conventions`
+ `kotlin-library-conventions`
`testing-conventions`
}
dependencies {
api(platform(projects.opendcPlatform))
- api(projects.opendcHarness.opendcHarnessApi)
- implementation(projects.opendcFormat)
+ api(projects.opendcCompute.opendcComputeSimulator)
+
+ implementation(projects.opendcTrace.opendcTraceOpendc)
+ implementation(projects.opendcTrace.opendcTraceParquet)
implementation(projects.opendcSimulator.opendcSimulatorCore)
implementation(projects.opendcSimulator.opendcSimulatorCompute)
- implementation(projects.opendcCompute.opendcComputeSimulator)
implementation(projects.opendcTelemetry.opendcTelemetrySdk)
+ implementation(projects.opendcTelemetry.opendcTelemetryCompute)
+ implementation(libs.opentelemetry.semconv)
implementation(libs.kotlin.logging)
- implementation(libs.config)
- implementation(libs.progressbar)
- implementation(libs.clikt)
-
- implementation(libs.parquet)
- testImplementation(libs.log4j.slf4j)
+ implementation(libs.jackson.databind)
+ implementation(libs.jackson.module.kotlin)
+ implementation(kotlin("reflect"))
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt
index 08304edc..78002c2f 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceReader.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt
@@ -20,13 +20,16 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.trace
+package org.opendc.compute.workload
+
+import java.util.*
/**
- * An interface for reading workloads into memory.
- *
- * This interface must guarantee that the entries are delivered in order of submission time.
- *
- * @param T The shape of the workloads supported by this reader.
+ * An interface that describes how a workload is resolved.
*/
-public interface TraceReader<T> : Iterator<TraceEntry<T>>, AutoCloseable
+public interface ComputeWorkload {
+ /**
+ * Resolve the workload into a list of [VirtualMachine]s to simulate.
+ */
+ public fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine>
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
index ca937328..c92b212f 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,30 +20,42 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.trace
+package org.opendc.compute.workload
-import org.opendc.experiments.capelin.trace.bp.BPTraceFormat
+import mu.KotlinLogging
import org.opendc.simulator.compute.workload.SimTraceWorkload
-import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.trace.*
+import org.opendc.trace.opendc.OdcVmTraceFormat
import java.io.File
-import java.util.UUID
+import java.util.*
+import java.util.concurrent.ConcurrentHashMap
+import kotlin.math.roundToLong
/**
- * A [TraceReader] for the internal VM workload trace format.
+ * A helper class for loading compute workload traces into memory.
*
- * @param path The directory of the traces.
+ * @param baseDir The directory containing the traces.
*/
-class RawParquetTraceReader(private val path: File) {
+public class ComputeWorkloadLoader(private val baseDir: File) {
/**
- * The [Trace] that represents this trace.
+ * The logger for this instance.
*/
- private val trace = BPTraceFormat().open(path.toURI().toURL())
+ private val logger = KotlinLogging.logger {}
+
+ /**
+ * The [OdcVmTraceFormat] instance to load the traces
+ */
+ private val format = OdcVmTraceFormat()
+
+ /**
+ * The cache of workloads.
+ */
+ private val cache = ConcurrentHashMap<String, List<VirtualMachine>>()
/**
* Read the fragments into memory.
*/
- private fun parseFragments(): Map<String, List<SimTraceWorkload.Fragment>> {
+ private fun parseFragments(trace: Trace): Map<String, List<SimTraceWorkload.Fragment>> {
val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader()
val fragments = mutableMapOf<String, MutableList<SimTraceWorkload.Fragment>>()
@@ -53,7 +65,7 @@ class RawParquetTraceReader(private val path: File) {
val id = reader.get(RESOURCE_STATE_ID)
val time = reader.get(RESOURCE_STATE_TIMESTAMP)
val duration = reader.get(RESOURCE_STATE_DURATION)
- val cores = reader.getInt(RESOURCE_STATE_NCPUS)
+ val cores = reader.getInt(RESOURCE_STATE_CPU_COUNT)
val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE)
val fragment = SimTraceWorkload.Fragment(
@@ -63,7 +75,7 @@ class RawParquetTraceReader(private val path: File) {
cores
)
- fragments.getOrPut(id) { mutableListOf() }.add(fragment)
+ fragments.computeIfAbsent(id) { mutableListOf() }.add(fragment)
}
fragments
@@ -75,11 +87,11 @@ class RawParquetTraceReader(private val path: File) {
/**
* Read the metadata into a workload.
*/
- private fun parseMeta(fragments: Map<String, List<SimTraceWorkload.Fragment>>): List<TraceEntry<SimWorkload>> {
+ private fun parseMeta(trace: Trace, fragments: Map<String, List<SimTraceWorkload.Fragment>>): List<VirtualMachine> {
val reader = checkNotNull(trace.getTable(TABLE_RESOURCES)).newReader()
var counter = 0
- val entries = mutableListOf<TraceEntry<SimWorkload>>()
+ val entries = mutableListOf<VirtualMachine>()
return try {
while (reader.nextRow()) {
@@ -91,28 +103,30 @@ class RawParquetTraceReader(private val path: File) {
val submissionTime = reader.get(RESOURCE_START_TIME)
val endTime = reader.get(RESOURCE_STOP_TIME)
- val maxCores = reader.getInt(RESOURCE_NCPUS)
+ val maxCores = reader.getInt(RESOURCE_CPU_COUNT)
val requiredMemory = reader.getDouble(RESOURCE_MEM_CAPACITY) / 1000.0 // Convert from KB to MB
val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray())
val vmFragments = fragments.getValue(id).asSequence()
- val totalLoad = vmFragments.sumOf { it.usage } * 5 * 60 // avg MHz * duration = MFLOPs
- val workload = SimTraceWorkload(vmFragments)
+ val totalLoad = vmFragments.sumOf { (it.usage * it.duration) / 1000.0 } // avg MHz * duration = MFLOPs
+
entries.add(
- TraceEntry(
- uid, id, submissionTime.toEpochMilli(), workload,
- mapOf(
- "submit-time" to submissionTime.toEpochMilli(),
- "end-time" to endTime.toEpochMilli(),
- "total-load" to totalLoad,
- "cores" to maxCores,
- "required-memory" to requiredMemory.toLong(),
- "workload" to workload
- )
+ VirtualMachine(
+ uid,
+ id,
+ maxCores,
+ requiredMemory.roundToLong(),
+ totalLoad,
+ submissionTime,
+ endTime,
+ vmFragments
)
)
}
+ // Make sure the virtual machines are ordered by start time
+ entries.sortBy { it.startTime }
+
entries
} catch (e: Exception) {
e.printStackTrace()
@@ -123,17 +137,24 @@ class RawParquetTraceReader(private val path: File) {
}
/**
- * The entries in the trace.
+ * Load the trace with the specified [name].
*/
- private val entries: List<TraceEntry<SimWorkload>>
+ public fun get(name: String): List<VirtualMachine> {
+ return cache.computeIfAbsent(name) {
+ val path = baseDir.resolve(it)
+
+ logger.info { "Loading trace $it at $path" }
- init {
- val fragments = parseFragments()
- entries = parseMeta(fragments)
+ val trace = format.open(path.toURI().toURL())
+ val fragments = parseFragments(trace)
+ parseMeta(trace, fragments)
+ }
}
/**
- * Read the entries in the trace.
+ * Clear the workload cache.
*/
- fun read(): List<TraceEntry<SimWorkload>> = entries
+ public fun reset() {
+ cache.clear()
+ }
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt
index 065a8c93..ed45bd8a 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.util
+package org.opendc.compute.workload
import io.opentelemetry.sdk.metrics.SdkMeterProvider
import io.opentelemetry.sdk.metrics.export.MetricProducer
@@ -33,42 +33,42 @@ import kotlinx.coroutines.yield
import org.opendc.compute.service.ComputeService
import org.opendc.compute.service.scheduler.ComputeScheduler
import org.opendc.compute.simulator.SimHost
-import org.opendc.experiments.capelin.env.MachineDef
-import org.opendc.experiments.capelin.trace.TraceReader
-import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider
-import org.opendc.simulator.compute.kernel.SimHypervisorProvider
+import org.opendc.compute.workload.topology.HostSpec
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
-import org.opendc.simulator.compute.power.SimplePowerDriver
import org.opendc.simulator.compute.workload.SimTraceWorkload
-import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.resources.SimResourceInterpreter
import org.opendc.telemetry.compute.*
import org.opendc.telemetry.sdk.toOtelClock
import java.time.Clock
+import java.util.*
import kotlin.coroutines.CoroutineContext
import kotlin.math.max
/**
- * Helper class to manage a [ComputeService] simulation.
+ * Helper class to simulated VM-based workloads in OpenDC.
+ *
+ * @param context [CoroutineContext] to run the simulation in.
+ * @param clock [Clock] instance tracking simulation time.
+ * @param scheduler [ComputeScheduler] implementation to use for the service.
+ * @param failureModel A failure model to use for injecting failures.
+ * @param interferenceModel The model to use for performance interference.
*/
-class ComputeServiceSimulator(
+public class ComputeWorkloadRunner(
private val context: CoroutineContext,
private val clock: Clock,
scheduler: ComputeScheduler,
- machines: List<MachineDef>,
private val failureModel: FailureModel? = null,
- interferenceModel: VmInterferenceModel? = null,
- hypervisorProvider: SimHypervisorProvider = SimFairShareHypervisorProvider()
+ private val interferenceModel: VmInterferenceModel? = null,
) : AutoCloseable {
/**
* The [ComputeService] that has been configured by the manager.
*/
- val service: ComputeService
+ public val service: ComputeService
/**
* The [MetricProducer] that are used by the [ComputeService] and the simulated hosts.
*/
- val producers: List<MetricProducer>
+ public val producers: List<MetricProducer>
get() = _metricProducers
private val _metricProducers = mutableListOf<MetricProducer>()
@@ -86,20 +86,14 @@ class ComputeServiceSimulator(
val (service, serviceMeterProvider) = createService(scheduler)
this._metricProducers.add(serviceMeterProvider)
this.service = service
-
- for (def in machines) {
- val (host, hostMeterProvider) = createHost(def, hypervisorProvider, interferenceModel)
- this._metricProducers.add(hostMeterProvider)
- hosts.add(host)
- this.service.addHost(host)
- }
}
/**
- * Run a simulation of the [ComputeService] by replaying the workload trace given by [reader].
+ * Run a simulation of the [ComputeService] by replaying the workload trace given by [trace].
*/
- suspend fun run(reader: TraceReader<SimWorkload>) {
- val injector = failureModel?.createInjector(context, clock, service)
+ public suspend fun run(trace: List<VirtualMachine>, seed: Long) {
+ val random = Random(seed)
+ val injector = failureModel?.createInjector(context, clock, service, random)
val client = service.newClient()
// Create new image for the virtual machine
@@ -112,35 +106,36 @@ class ComputeServiceSimulator(
var offset = Long.MIN_VALUE
- while (reader.hasNext()) {
- val entry = reader.next()
+ for (entry in trace.sortedBy { it.startTime }) {
+ val now = clock.millis()
+ val start = entry.startTime.toEpochMilli()
if (offset < 0) {
- offset = entry.start - clock.millis()
+ offset = start - now
}
// Make sure the trace entries are ordered by submission time
- assert(entry.start - offset >= 0) { "Invalid trace order" }
- delay(max(0, (entry.start - offset) - clock.millis()))
+ assert(start - offset >= 0) { "Invalid trace order" }
+ delay(max(0, (start - offset) - now))
launch {
val workloadOffset = -offset + 300001
- val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace, workloadOffset)
+ val workload = SimTraceWorkload(entry.trace, workloadOffset)
val server = client.newServer(
entry.name,
image,
client.newFlavor(
entry.name,
- entry.meta["cores"] as Int,
- entry.meta["required-memory"] as Long
+ entry.cpuCount,
+ entry.memCapacity
),
- meta = entry.meta + mapOf("workload" to workload)
+ meta = mapOf("workload" to workload)
)
// Wait for the server reach its end time
- val endTime = entry.meta["end-time"] as Long
- delay(endTime + workloadOffset - clock.millis() + 1)
+ val endTime = entry.stopTime.toEpochMilli()
+ delay(endTime + workloadOffset - clock.millis() + 5 * 60 * 1000)
// Delete the server after reaching the end-time of the virtual machine
server.delete()
@@ -151,11 +146,52 @@ class ComputeServiceSimulator(
yield()
} finally {
injector?.close()
- reader.close()
client.close()
}
}
+ /**
+ * Register a host for this simulation.
+ *
+ * @param spec The definition of the host.
+ * @param optimize Merge the CPU resources of the host into a single CPU resource.
+ * @return The [SimHost] that has been constructed by the runner.
+ */
+ public fun registerHost(spec: HostSpec, optimize: Boolean = false): SimHost {
+ val resource = Resource.builder()
+ .put(HOST_ID, spec.uid.toString())
+ .put(HOST_NAME, spec.name)
+ .put(HOST_ARCH, ResourceAttributes.HostArchValues.AMD64)
+ .put(HOST_NCPUS, spec.model.cpus.size)
+ .put(HOST_MEM_CAPACITY, spec.model.memory.sumOf { it.size })
+ .build()
+
+ val meterProvider = SdkMeterProvider.builder()
+ .setClock(clock.toOtelClock())
+ .setResource(resource)
+ .build()
+ _metricProducers.add(meterProvider)
+
+ val host = SimHost(
+ spec.uid,
+ spec.name,
+ spec.model,
+ spec.meta,
+ context,
+ interpreter,
+ meterProvider,
+ spec.hypervisor,
+ powerDriver = spec.powerDriver,
+ interferenceDomain = interferenceModel?.newDomain(),
+ optimize = optimize
+ )
+
+ hosts.add(host)
+ service.addHost(host)
+
+ return host
+ }
+
override fun close() {
service.close()
@@ -182,41 +218,4 @@ class ComputeServiceSimulator(
val service = ComputeService(context, clock, meterProvider, scheduler)
return service to meterProvider
}
-
- /**
- * Construct a [SimHost] instance for the specified [MachineDef].
- */
- private fun createHost(
- def: MachineDef,
- hypervisorProvider: SimHypervisorProvider,
- interferenceModel: VmInterferenceModel? = null
- ): Pair<SimHost, SdkMeterProvider> {
- val resource = Resource.builder()
- .put(HOST_ID, def.uid.toString())
- .put(HOST_NAME, def.name)
- .put(HOST_ARCH, ResourceAttributes.HostArchValues.AMD64)
- .put(HOST_NCPUS, def.model.cpus.size)
- .put(HOST_MEM_CAPACITY, def.model.memory.sumOf { it.size })
- .build()
-
- val meterProvider = SdkMeterProvider.builder()
- .setClock(clock.toOtelClock())
- .setResource(resource)
- .build()
-
- val host = SimHost(
- def.uid,
- def.name,
- def.model,
- def.meta,
- context,
- interpreter,
- meterProvider,
- hypervisorProvider,
- powerDriver = SimplePowerDriver(def.powerModel),
- interferenceDomain = interferenceModel?.newDomain()
- )
-
- return host to meterProvider
- }
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt
new file mode 100644
index 00000000..f58ce587
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt
@@ -0,0 +1,62 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+@file:JvmName("ComputeWorkloads")
+package org.opendc.compute.workload
+
+import org.opendc.compute.workload.internal.CompositeComputeWorkload
+import org.opendc.compute.workload.internal.HpcSampledComputeWorkload
+import org.opendc.compute.workload.internal.LoadSampledComputeWorkload
+import org.opendc.compute.workload.internal.TraceComputeWorkload
+
+/**
+ * Construct a workload from a trace.
+ */
+public fun trace(name: String): ComputeWorkload = TraceComputeWorkload(name)
+
+/**
+ * Construct a composite workload with the specified fractions.
+ */
+public fun composite(vararg pairs: Pair<ComputeWorkload, Double>): ComputeWorkload {
+ return CompositeComputeWorkload(pairs.toMap())
+}
+
+/**
+ * Sample a workload by a [fraction] of the total load.
+ */
+public fun ComputeWorkload.sampleByLoad(fraction: Double): ComputeWorkload {
+ return LoadSampledComputeWorkload(this, fraction)
+}
+
+/**
+ * Sample a workload by a [fraction] of the HPC VMs (count)
+ */
+public fun ComputeWorkload.sampleByHpc(fraction: Double): ComputeWorkload {
+ return HpcSampledComputeWorkload(this, fraction)
+}
+
+/**
+ * Sample a workload by a [fraction] of the HPC load
+ */
+public fun ComputeWorkload.sampleByHpcLoad(fraction: Double): ComputeWorkload {
+ return HpcSampledComputeWorkload(this, fraction, sampleLoad = true)
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModel.kt
index 83393896..4d9ef15d 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModel.kt
@@ -20,19 +20,20 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.util
+package org.opendc.compute.workload
import org.opendc.compute.service.ComputeService
import org.opendc.compute.simulator.failure.HostFaultInjector
import java.time.Clock
+import java.util.*
import kotlin.coroutines.CoroutineContext
/**
* Factory interface for constructing [HostFaultInjector] for modeling failures of compute service hosts.
*/
-interface FailureModel {
+public interface FailureModel {
/**
* Construct a [HostFaultInjector] for the specified [service].
*/
- fun createInjector(context: CoroutineContext, clock: Clock, service: ComputeService): HostFaultInjector
+ public fun createInjector(context: CoroutineContext, clock: Clock, service: ComputeService, random: Random): HostFaultInjector
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModels.kt
index 89b4a31c..be7120b9 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModels.kt
@@ -21,7 +21,7 @@
*/
@file:JvmName("FailureModels")
-package org.opendc.experiments.capelin
+package org.opendc.compute.workload
import org.apache.commons.math3.distribution.LogNormalDistribution
import org.apache.commons.math3.random.Well19937c
@@ -30,12 +30,11 @@ import org.opendc.compute.simulator.SimHost
import org.opendc.compute.simulator.failure.HostFaultInjector
import org.opendc.compute.simulator.failure.StartStopHostFault
import org.opendc.compute.simulator.failure.StochasticVictimSelector
-import org.opendc.experiments.capelin.util.FailureModel
import java.time.Clock
import java.time.Duration
+import java.util.*
import kotlin.coroutines.CoroutineContext
import kotlin.math.ln
-import kotlin.random.Random
/**
* Obtain a [FailureModel] based on the GRID'5000 failure trace.
@@ -43,14 +42,15 @@ import kotlin.random.Random
* This fault injector uses parameters from the GRID'5000 failure trace as described in
* "A Framework for the Study of Grid Inter-Operation Mechanisms", A. Iosup, 2009.
*/
-fun grid5000(failureInterval: Duration, seed: Int): FailureModel {
+public fun grid5000(failureInterval: Duration): FailureModel {
return object : FailureModel {
override fun createInjector(
context: CoroutineContext,
clock: Clock,
- service: ComputeService
+ service: ComputeService,
+ random: Random
): HostFaultInjector {
- val rng = Well19937c(seed)
+ val rng = Well19937c(random.nextLong())
val hosts = service.hosts.map { it as SimHost }.toSet()
// Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009
@@ -60,7 +60,7 @@ fun grid5000(failureInterval: Duration, seed: Int): FailureModel {
clock,
hosts,
iat = LogNormalDistribution(rng, ln(failureInterval.toHours().toDouble()), 1.03),
- selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), Random(seed)),
+ selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), random),
fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71))
)
}
@@ -68,30 +68,3 @@ fun grid5000(failureInterval: Duration, seed: Int): FailureModel {
override fun toString(): String = "Grid5000FailureModel"
}
}
-
-/**
- * Obtain the [HostFaultInjector] to use for the experiments.
- *
- * This fault injector uses parameters from the GRID'5000 failure trace as described in
- * "A Framework for the Study of Grid Inter-Operation Mechanisms", A. Iosup, 2009.
- */
-fun createFaultInjector(
- context: CoroutineContext,
- clock: Clock,
- hosts: Set<SimHost>,
- seed: Int,
- failureInterval: Double
-): HostFaultInjector {
- val rng = Well19937c(seed)
-
- // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009
- // GRID'5000
- return HostFaultInjector(
- context,
- clock,
- hosts,
- iat = LogNormalDistribution(rng, ln(failureInterval), 1.03),
- selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), Random(seed)),
- fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71))
- )
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceEntry.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt
index 303a6a8c..40484b68 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceEntry.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt
@@ -1,7 +1,5 @@
/*
- * MIT License
- *
- * Copyright (c) 2019 atlarge-research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -22,23 +20,30 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.trace
+package org.opendc.compute.workload
-import java.util.UUID
+import org.opendc.simulator.compute.workload.SimTraceWorkload
+import java.time.Instant
+import java.util.*
/**
- * An entry in a workload trace.
+ * A virtual machine workload.
*
- * @param uid The unique identifier of the entry.
- * @param name The name of the entry.
- * @param start The start time of the workload.
- * @param workload The workload of the entry.
- * @param meta The meta-data associated with the workload.
+ * @param uid The unique identifier of the virtual machine.
+ * @param name The name of the virtual machine.
+ * @param cpuCount The number of vCPUs in the VM.
+ * @param memCapacity The provisioned memory for the VM.
+ * @param startTime The start time of the VM.
+ * @param stopTime The stop time of the VM.
+ * @param trace The trace fragments that belong to this VM.
*/
-public data class TraceEntry<out T>(
+public data class VirtualMachine(
val uid: UUID,
val name: String,
- val start: Long,
- val workload: T,
- val meta: Map<String, Any>
+ val cpuCount: Int,
+ val memCapacity: Long,
+ val totalLoad: Double,
+ val startTime: Instant,
+ val stopTime: Instant,
+ val trace: Sequence<SimTraceWorkload.Fragment>,
)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt
index e3d15c3b..4172d729 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.export.parquet
+package org.opendc.compute.workload.export.parquet
import mu.KotlinLogging
import org.apache.avro.Schema
@@ -39,7 +39,7 @@ import kotlin.concurrent.thread
/**
* A writer that writes data in Parquet format.
*/
-abstract class ParquetDataWriter<in T>(
+public abstract class ParquetDataWriter<in T>(
path: File,
private val schema: Schema,
bufferSize: Int = 4096
@@ -113,7 +113,7 @@ abstract class ParquetDataWriter<in T>(
/**
* Write the specified metrics to the database.
*/
- fun write(data: T) {
+ public fun write(data: T) {
val exception = exception
if (exception != null) {
throw IllegalStateException("Writer thread failed", exception)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetExportMonitor.kt
index b057e932..f41a2241 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetExportMonitor.kt
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.export.parquet
+package org.opendc.compute.workload.export.parquet
import org.opendc.telemetry.compute.ComputeMonitor
import org.opendc.telemetry.compute.table.HostData
@@ -31,7 +31,7 @@ import java.io.File
/**
* A [ComputeMonitor] that logs the events to a Parquet file.
*/
-class ParquetExportMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable {
+public class ParquetExportMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable {
private val serverWriter = ParquetServerDataWriter(
File(base, "server/$partition/data.parquet").also { it.parentFile.mkdirs() },
bufferSize
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt
index 58388cb1..37066a0d 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.export.parquet
+package org.opendc.compute.workload.export.parquet
import org.apache.avro.Schema
import org.apache.avro.SchemaBuilder
@@ -29,6 +29,9 @@ import org.apache.avro.generic.GenericRecordBuilder
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
import org.opendc.telemetry.compute.table.HostData
+import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA
+import org.opendc.trace.util.parquet.UUID_SCHEMA
+import org.opendc.trace.util.parquet.optional
import java.io.File
/**
@@ -74,7 +77,7 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) :
override fun toString(): String = "host-writer"
- companion object {
+ private companion object {
private val SCHEMA: Schema = SchemaBuilder
.record("host")
.namespace("org.opendc.telemetry.compute")
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt
index 43b5f469..bea23d32 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.export.parquet
+package org.opendc.compute.workload.export.parquet
import org.apache.avro.Schema
import org.apache.avro.SchemaBuilder
@@ -29,8 +29,10 @@ import org.apache.avro.generic.GenericRecordBuilder
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
import org.opendc.telemetry.compute.table.ServerData
+import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA
+import org.opendc.trace.util.parquet.UUID_SCHEMA
+import org.opendc.trace.util.parquet.optional
import java.io.File
-import java.util.*
/**
* A Parquet event writer for [ServerData]s.
@@ -71,7 +73,7 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) :
override fun toString(): String = "server-writer"
- companion object {
+ private companion object {
private val SCHEMA: Schema = SchemaBuilder
.record("server")
.namespace("org.opendc.telemetry.compute")
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt
index 2928f445..47824b29 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,12 +20,13 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.export.parquet
+package org.opendc.compute.workload.export.parquet
import org.apache.avro.Schema
import org.apache.avro.SchemaBuilder
import org.apache.avro.generic.GenericRecordBuilder
import org.opendc.telemetry.compute.table.ServiceData
+import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA
import java.io.File
/**
@@ -47,7 +48,7 @@ public class ParquetServiceDataWriter(path: File, bufferSize: Int) :
override fun toString(): String = "service-writer"
- companion object {
+ private companion object {
private val SCHEMA: Schema = SchemaBuilder
.record("service")
.namespace("org.opendc.telemetry.compute")
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt
new file mode 100644
index 00000000..9b2bec55
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt
@@ -0,0 +1,66 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.internal
+
+import mu.KotlinLogging
+import org.opendc.compute.workload.ComputeWorkload
+import org.opendc.compute.workload.ComputeWorkloadLoader
+import org.opendc.compute.workload.VirtualMachine
+import java.util.*
+
+/**
+ * A [ComputeWorkload] that samples multiple workloads based on the total load of all workloads.
+ */
+internal class CompositeComputeWorkload(val sources: Map<ComputeWorkload, Double>) : ComputeWorkload {
+ /**
+ * The logging instance of this class.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> {
+ val traces = sources.map { (source, fraction) -> fraction to source.resolve(loader, random) }
+
+ val totalLoad = traces.sumOf { (_, vms) -> vms.sumOf { it.totalLoad } }
+
+ val res = mutableListOf<VirtualMachine>()
+
+ for ((fraction, vms) in traces) {
+ var currentLoad = 0.0
+
+ for (entry in vms) {
+ val entryLoad = entry.totalLoad
+ if ((currentLoad + entryLoad) / totalLoad > fraction) {
+ break
+ }
+
+ currentLoad += entryLoad
+ res += entry
+ }
+ }
+
+ val vmCount = traces.sumOf { (_, vms) -> vms.size }
+ logger.info { "Sampled $vmCount VMs into subset of ${res.size} VMs" }
+
+ return res
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt
new file mode 100644
index 00000000..52f4c672
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt
@@ -0,0 +1,143 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.internal
+
+import mu.KotlinLogging
+import org.opendc.compute.workload.ComputeWorkload
+import org.opendc.compute.workload.ComputeWorkloadLoader
+import org.opendc.compute.workload.VirtualMachine
+import java.util.*
+
+/**
+ * A [ComputeWorkload] that samples HPC VMs in the workload.
+ *
+ * @param fraction The fraction of load/virtual machines to sample
+ * @param sampleLoad A flag to indicate that the sampling should be based on the total load or on the number of VMs.
+ */
+internal class HpcSampledComputeWorkload(val source: ComputeWorkload, val fraction: Double, val sampleLoad: Boolean = false) : ComputeWorkload {
+ /**
+ * The logging instance of this class.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
+ * The pattern to match compute nodes in the workload.
+ */
+ private val pattern = Regex("^(ComputeNode|cn).*")
+
+ override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> {
+ val vms = source.resolve(loader, random)
+
+ val (hpc, nonHpc) = vms.partition { entry ->
+ val name = entry.name
+ name.matches(pattern)
+ }
+
+ val hpcSequence = generateSequence(0) { it + 1 }
+ .map { index ->
+ val res = mutableListOf<VirtualMachine>()
+ hpc.mapTo(res) { sample(it, index) }
+ res.shuffle(random)
+ res
+ }
+ .flatten()
+
+ val nonHpcSequence = generateSequence(0) { it + 1 }
+ .map { index ->
+ val res = mutableListOf<VirtualMachine>()
+ nonHpc.mapTo(res) { sample(it, index) }
+ res.shuffle(random)
+ res
+ }
+ .flatten()
+
+ logger.debug { "Found ${hpc.size} HPC workloads and ${nonHpc.size} non-HPC workloads" }
+
+ val totalLoad = vms.sumOf { it.totalLoad }
+
+ logger.debug { "Total trace load: $totalLoad" }
+ var hpcCount = 0
+ var hpcLoad = 0.0
+ var nonHpcCount = 0
+ var nonHpcLoad = 0.0
+
+ val res = mutableListOf<VirtualMachine>()
+
+ if (sampleLoad) {
+ var currentLoad = 0.0
+ for (entry in hpcSequence) {
+ val entryLoad = entry.totalLoad
+ if ((currentLoad + entryLoad) / totalLoad > fraction) {
+ break
+ }
+
+ hpcLoad += entryLoad
+ hpcCount += 1
+ currentLoad += entryLoad
+ res += entry
+ }
+
+ for (entry in nonHpcSequence) {
+ val entryLoad = entry.totalLoad
+ if ((currentLoad + entryLoad) / totalLoad > 1) {
+ break
+ }
+
+ nonHpcLoad += entryLoad
+ nonHpcCount += 1
+ currentLoad += entryLoad
+ res += entry
+ }
+ } else {
+ hpcSequence
+ .take((fraction * vms.size).toInt())
+ .forEach { entry ->
+ hpcLoad += entry.totalLoad
+ hpcCount += 1
+ res.add(entry)
+ }
+
+ nonHpcSequence
+ .take(((1 - fraction) * vms.size).toInt())
+ .forEach { entry ->
+ nonHpcLoad += entry.totalLoad
+ nonHpcCount += 1
+ res.add(entry)
+ }
+ }
+
+ logger.debug { "HPC $hpcCount (load $hpcLoad) and non-HPC $nonHpcCount (load $nonHpcLoad)" }
+ logger.debug { "Total sampled load: ${hpcLoad + nonHpcLoad}" }
+ logger.info { "Sampled ${vms.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }
+
+ return res
+ }
+
+ /**
+ * Sample a random trace entry.
+ */
+ private fun sample(entry: VirtualMachine, i: Int): VirtualMachine {
+ val uid = UUID.nameUUIDFromBytes("${entry.uid}-$i".toByteArray())
+ return entry.copy(uid = uid)
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt
new file mode 100644
index 00000000..ef6de729
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt
@@ -0,0 +1,61 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.internal
+
+import mu.KotlinLogging
+import org.opendc.compute.workload.ComputeWorkload
+import org.opendc.compute.workload.ComputeWorkloadLoader
+import org.opendc.compute.workload.VirtualMachine
+import java.util.*
+
+/**
+ * A [ComputeWorkload] that is sampled based on total load.
+ */
+internal class LoadSampledComputeWorkload(val source: ComputeWorkload, val fraction: Double) : ComputeWorkload {
+ /**
+ * The logging instance of this class.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> {
+ val vms = source.resolve(loader, random)
+ val res = mutableListOf<VirtualMachine>()
+
+ val totalLoad = vms.sumOf { it.totalLoad }
+ var currentLoad = 0.0
+
+ for (entry in vms) {
+ val entryLoad = entry.totalLoad
+ if ((currentLoad + entryLoad) / totalLoad > fraction) {
+ break
+ }
+
+ currentLoad += entryLoad
+ res += entry
+ }
+
+ logger.info { "Sampled ${vms.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }
+
+ return res
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt
new file mode 100644
index 00000000..d657ff01
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.internal
+
+import org.opendc.compute.workload.ComputeWorkload
+import org.opendc.compute.workload.ComputeWorkloadLoader
+import org.opendc.compute.workload.VirtualMachine
+import java.util.*
+
+/**
+ * A [ComputeWorkload] from a trace.
+ */
+internal class TraceComputeWorkload(val name: String) : ComputeWorkload {
+ override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> {
+ return loader.get(name)
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/HostSpec.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/HostSpec.kt
new file mode 100644
index 00000000..f3dc1e9e
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/HostSpec.kt
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.topology
+
+import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider
+import org.opendc.simulator.compute.kernel.SimHypervisorProvider
+import org.opendc.simulator.compute.model.MachineModel
+import org.opendc.simulator.compute.power.PowerDriver
+import java.util.*
+
+/**
+ * Description of a physical host that will be simulated by OpenDC and host the virtual machines.
+ *
+ * @param uid Unique identifier of the host.
+ * @param name The name of the host.
+ * @param meta The metadata of the host.
+ * @param model The physical model of the machine.
+ * @param powerDriver The [PowerDriver] to model the power consumption of the machine.
+ * @param hypervisor The hypervisor implementation to use.
+ */
+public data class HostSpec(
+ val uid: UUID,
+ val name: String,
+ val meta: Map<String, Any>,
+ val model: MachineModel,
+ val powerDriver: PowerDriver,
+ val hypervisor: SimHypervisorProvider = SimFairShareHypervisorProvider()
+)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/EnvironmentReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/Topology.kt
index a968b043..3b8dc918 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/EnvironmentReader.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/Topology.kt
@@ -20,16 +20,14 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.env
-
-import java.io.Closeable
+package org.opendc.compute.workload.topology
/**
- * An interface for reading descriptions of topology environments into memory.
+ * Representation of the environment of the compute service, describing the physical details of every host.
*/
-public interface EnvironmentReader : Closeable {
+public interface Topology {
/**
- * Read the environment into a list.
+ * Resolve the [Topology] into a list of [HostSpec]s.
*/
- public fun read(): List<MachineDef>
+ public fun resolve(): List<HostSpec>
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/TopologyHelpers.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/TopologyHelpers.kt
new file mode 100644
index 00000000..74f9a1f8
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/TopologyHelpers.kt
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+@file:JvmName("TopologyHelpers")
+package org.opendc.compute.workload.topology
+
+import org.opendc.compute.workload.ComputeWorkloadRunner
+
+/**
+ * Apply the specified [topology] to the given [ComputeWorkloadRunner].
+ */
+public fun ComputeWorkloadRunner.apply(topology: Topology, optimize: Boolean = false) {
+ val hosts = topology.resolve()
+ for (spec in hosts) {
+ registerHost(spec, optimize)
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReader.kt
index 9549af42..67f9626c 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReader.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReader.kt
@@ -20,19 +20,20 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.trace
+package org.opendc.compute.workload.util
import com.fasterxml.jackson.annotation.JsonProperty
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import org.opendc.simulator.compute.kernel.interference.VmInterferenceGroup
+import java.io.File
import java.io.InputStream
/**
* A parser for the JSON performance interference setup files used for the TPDS article on Capelin.
*/
-class PerformanceInterferenceReader {
+public class PerformanceInterferenceReader {
/**
* The [ObjectMapper] to use.
*/
@@ -43,10 +44,17 @@ class PerformanceInterferenceReader {
}
/**
+ * Read the performance interface model from [file].
+ */
+ public fun read(file: File): List<VmInterferenceGroup> {
+ return mapper.readValue(file)
+ }
+
+ /**
* Read the performance interface model from the input.
*/
- fun read(input: InputStream): List<VmInterferenceGroup> {
- return input.use { mapper.readValue(input) }
+ public fun read(input: InputStream): List<VmInterferenceGroup> {
+ return mapper.readValue(input)
}
private data class GroupMixin(
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReaderTest.kt
index fbc39b87..c79f0584 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt
+++ b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReaderTest.kt
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.trace
+package org.opendc.compute.workload.util
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/perf-interference.json b/opendc-compute/opendc-compute-workload/src/test/resources/perf-interference.json
index 1be5852b..1be5852b 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/resources/perf-interference.json
+++ b/opendc-compute/opendc-compute-workload/src/test/resources/perf-interference.json
diff --git a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
index 7dadd14d..4bcbaf61 100644
--- a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
+++ b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
@@ -31,6 +31,8 @@ plugins {
dependencies {
api(platform(projects.opendcPlatform))
api(projects.opendcHarness.opendcHarnessApi)
+ api(projects.opendcCompute.opendcComputeWorkload)
+
implementation(projects.opendcTrace.opendcTraceParquet)
implementation(projects.opendcTrace.opendcTraceBitbrains)
implementation(projects.opendcSimulator.opendcSimulatorCore)
@@ -38,16 +40,14 @@ dependencies {
implementation(projects.opendcCompute.opendcComputeSimulator)
implementation(projects.opendcTelemetry.opendcTelemetrySdk)
implementation(projects.opendcTelemetry.opendcTelemetryCompute)
- implementation(libs.opentelemetry.semconv)
- implementation(libs.kotlin.logging)
implementation(libs.config)
- implementation(libs.progressbar)
- implementation(libs.clikt)
+ implementation(libs.kotlin.logging)
+ implementation(libs.jackson.databind)
implementation(libs.jackson.module.kotlin)
implementation(libs.jackson.dataformat.csv)
implementation(kotlin("reflect"))
+ implementation(libs.opentelemetry.semconv)
- implementation(libs.parquet)
testImplementation(libs.log4j.slf4j)
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt
index faabe5cb..31e8f961 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt
@@ -22,7 +22,8 @@
package org.opendc.experiments.capelin
-import org.opendc.experiments.capelin.model.CompositeWorkload
+import org.opendc.compute.workload.composite
+import org.opendc.compute.workload.trace
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
@@ -42,30 +43,25 @@ public class CompositeWorkloadPortfolio : Portfolio("composite-workload") {
)
override val workload: Workload by anyOf(
- CompositeWorkload(
+ Workload(
"all-azure",
- listOf(Workload("solvinity-short", 0.0), Workload("azure", 1.0)),
- totalSampleLoad
+ composite(trace("solvinity-short") to 0.0, trace("azure") to 1.0)
),
- CompositeWorkload(
+ Workload(
"solvinity-25-azure-75",
- listOf(Workload("solvinity-short", 0.25), Workload("azure", 0.75)),
- totalSampleLoad
+ composite(trace("solvinity-short") to 0.25, trace("azure") to 0.75)
),
- CompositeWorkload(
+ Workload(
"solvinity-50-azure-50",
- listOf(Workload("solvinity-short", 0.5), Workload("azure", 0.5)),
- totalSampleLoad
+ composite(trace("solvinity-short") to 0.5, trace("azure") to 0.5)
),
- CompositeWorkload(
+ Workload(
"solvinity-75-azure-25",
- listOf(Workload("solvinity-short", 0.75), Workload("azure", 0.25)),
- totalSampleLoad
+ composite(trace("solvinity-short") to 0.75, trace("azure") to 0.25)
),
- CompositeWorkload(
+ Workload(
"all-solvinity",
- listOf(Workload("solvinity-short", 1.0), Workload("azure", 0.0)),
- totalSampleLoad
+ composite(trace("solvinity-short") to 1.0, trace("azure") to 0.0)
)
)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt
index e1cf8517..cd093e6c 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt
@@ -22,6 +22,8 @@
package org.opendc.experiments.capelin
+import org.opendc.compute.workload.sampleByLoad
+import org.opendc.compute.workload.trace
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
@@ -44,10 +46,10 @@ public class HorVerPortfolio : Portfolio("horizontal_vs_vertical") {
)
override val workload: Workload by anyOf(
- Workload("solvinity", 0.1),
- Workload("solvinity", 0.25),
- Workload("solvinity", 0.5),
- Workload("solvinity", 1.0)
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.1)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.25)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.5)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(1.0))
)
override val operationalPhenomena: OperationalPhenomena by anyOf(
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt
index a995e467..73e59a58 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt
@@ -22,8 +22,10 @@
package org.opendc.experiments.capelin
+import org.opendc.compute.workload.sampleByHpc
+import org.opendc.compute.workload.sampleByHpcLoad
+import org.opendc.compute.workload.trace
import org.opendc.experiments.capelin.model.OperationalPhenomena
-import org.opendc.experiments.capelin.model.SamplingStrategy
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
import org.opendc.harness.dsl.anyOf
@@ -40,13 +42,13 @@ public class MoreHpcPortfolio : Portfolio("more_hpc") {
)
override val workload: Workload by anyOf(
- Workload("solvinity", 0.0, samplingStrategy = SamplingStrategy.HPC),
- Workload("solvinity", 0.25, samplingStrategy = SamplingStrategy.HPC),
- Workload("solvinity", 0.5, samplingStrategy = SamplingStrategy.HPC),
- Workload("solvinity", 1.0, samplingStrategy = SamplingStrategy.HPC),
- Workload("solvinity", 0.25, samplingStrategy = SamplingStrategy.HPC_LOAD),
- Workload("solvinity", 0.5, samplingStrategy = SamplingStrategy.HPC_LOAD),
- Workload("solvinity", 1.0, samplingStrategy = SamplingStrategy.HPC_LOAD)
+ Workload("solvinity", trace("solvinity").sampleByHpc(0.0)),
+ Workload("solvinity", trace("solvinity").sampleByHpc(0.25)),
+ Workload("solvinity", trace("solvinity").sampleByHpc(0.5)),
+ Workload("solvinity", trace("solvinity").sampleByHpc(1.0)),
+ Workload("solvinity", trace("solvinity").sampleByHpcLoad(0.25)),
+ Workload("solvinity", trace("solvinity").sampleByHpcLoad(0.5)),
+ Workload("solvinity", trace("solvinity").sampleByHpcLoad(1.0))
)
override val operationalPhenomena: OperationalPhenomena by anyOf(
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt
index 49559e0e..9d5717bb 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt
@@ -22,6 +22,8 @@
package org.opendc.experiments.capelin
+import org.opendc.compute.workload.sampleByLoad
+import org.opendc.compute.workload.trace
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
@@ -40,10 +42,10 @@ public class MoreVelocityPortfolio : Portfolio("more_velocity") {
)
override val workload: Workload by anyOf(
- Workload("solvinity", 0.1),
- Workload("solvinity", 0.25),
- Workload("solvinity", 0.5),
- Workload("solvinity", 1.0)
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.1)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.25)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.5)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(1.0))
)
override val operationalPhenomena: OperationalPhenomena by anyOf(
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt
index 1aac4f9e..7ab586b3 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt
@@ -22,6 +22,8 @@
package org.opendc.experiments.capelin
+import org.opendc.compute.workload.sampleByLoad
+import org.opendc.compute.workload.trace
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
@@ -36,10 +38,10 @@ public class OperationalPhenomenaPortfolio : Portfolio("operational_phenomena")
)
override val workload: Workload by anyOf(
- Workload("solvinity", 0.1),
- Workload("solvinity", 0.25),
- Workload("solvinity", 0.5),
- Workload("solvinity", 1.0)
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.1)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.25)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.5)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(1.0))
)
override val operationalPhenomena: OperationalPhenomena by anyOf(
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index 6261ebbf..630b76c4 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -24,16 +24,16 @@ package org.opendc.experiments.capelin
import com.typesafe.config.ConfigFactory
import mu.KotlinLogging
-import org.opendc.experiments.capelin.env.ClusterEnvironmentReader
-import org.opendc.experiments.capelin.export.parquet.ParquetExportMonitor
-import org.opendc.experiments.capelin.model.CompositeWorkload
+import org.opendc.compute.workload.ComputeWorkloadLoader
+import org.opendc.compute.workload.ComputeWorkloadRunner
+import org.opendc.compute.workload.export.parquet.ParquetExportMonitor
+import org.opendc.compute.workload.grid5000
+import org.opendc.compute.workload.topology.apply
+import org.opendc.compute.workload.util.PerformanceInterferenceReader
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
-import org.opendc.experiments.capelin.trace.ParquetTraceReader
-import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader
-import org.opendc.experiments.capelin.trace.RawParquetTraceReader
-import org.opendc.experiments.capelin.util.ComputeServiceSimulator
+import org.opendc.experiments.capelin.topology.clusterTopology
import org.opendc.experiments.capelin.util.createComputeScheduler
import org.opendc.harness.dsl.Experiment
import org.opendc.harness.dsl.anyOf
@@ -43,10 +43,8 @@ import org.opendc.telemetry.compute.ComputeMetricExporter
import org.opendc.telemetry.compute.collectServiceMetrics
import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import java.io.File
-import java.io.FileInputStream
import java.time.Duration
import java.util.*
-import java.util.concurrent.ConcurrentHashMap
import kotlin.math.roundToLong
/**
@@ -91,33 +89,19 @@ abstract class Portfolio(name: String) : Experiment(name) {
abstract val allocationPolicy: String
/**
- * A map of trace readers.
+ * A helper class to load workload traces.
*/
- private val traceReaders = ConcurrentHashMap<String, RawParquetTraceReader>()
+ private val workloadLoader = ComputeWorkloadLoader(File(config.getString("trace-path")))
/**
* Perform a single trial for this portfolio.
*/
override fun doRun(repeat: Int): Unit = runBlockingSimulation {
val seeder = Random(repeat.toLong())
- val environment = ClusterEnvironmentReader(File(config.getString("env-path"), "${topology.name}.txt"))
- val workload = workload
- val workloadNames = if (workload is CompositeWorkload) {
- workload.workloads.map { it.name }
- } else {
- listOf(workload.name)
- }
- val rawReaders = workloadNames.map { workloadName ->
- traceReaders.computeIfAbsent(workloadName) {
- logger.info { "Loading trace $workloadName" }
- RawParquetTraceReader(File(config.getString("trace-path"), workloadName))
- }
- }
- val trace = ParquetTraceReader(rawReaders, workload, seeder.nextInt())
val performanceInterferenceModel = if (operationalPhenomena.hasInterference)
PerformanceInterferenceReader()
- .read(FileInputStream(config.getString("interference-model")))
+ .read(File(config.getString("interference-model")))
.let { VmInterferenceModel(it, Random(seeder.nextLong())) }
else
null
@@ -125,14 +109,13 @@ abstract class Portfolio(name: String) : Experiment(name) {
val computeScheduler = createComputeScheduler(allocationPolicy, seeder, vmPlacements)
val failureModel =
if (operationalPhenomena.failureFrequency > 0)
- grid5000(Duration.ofSeconds((operationalPhenomena.failureFrequency * 60).roundToLong()), seeder.nextInt())
+ grid5000(Duration.ofSeconds((operationalPhenomena.failureFrequency * 60).roundToLong()))
else
null
- val simulator = ComputeServiceSimulator(
+ val runner = ComputeWorkloadRunner(
coroutineContext,
clock,
computeScheduler,
- environment.read(),
failureModel,
performanceInterferenceModel
)
@@ -142,17 +125,22 @@ abstract class Portfolio(name: String) : Experiment(name) {
"portfolio_id=$name/scenario_id=$id/run_id=$repeat",
4096
)
- val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
+ val metricReader = CoroutineMetricReader(this, runner.producers, ComputeMetricExporter(clock, monitor))
+ val topology = clusterTopology(File(config.getString("env-path"), "${topology.name}.txt"))
try {
- simulator.run(trace)
+ // Instantiate the desired topology
+ runner.apply(topology)
+
+ // Run the workload trace
+ runner.run(workload.source.resolve(workloadLoader, seeder), seeder.nextLong())
} finally {
- simulator.close()
+ runner.close()
metricReader.close()
monitor.close()
}
- val monitorResults = collectServiceMetrics(clock.instant(), simulator.producers[0])
+ val monitorResults = collectServiceMetrics(clock.instant(), runner.producers[0])
logger.debug {
"Scheduler " +
"Success=${monitorResults.attemptsSuccess} " +
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt
index b6d3b30c..17ec48d4 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt
@@ -22,6 +22,7 @@
package org.opendc.experiments.capelin
+import org.opendc.compute.workload.trace
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
@@ -36,7 +37,7 @@ public class ReplayPortfolio : Portfolio("replay") {
)
override val workload: Workload by anyOf(
- Workload("solvinity", 1.0)
+ Workload("solvinity", trace("solvinity"))
)
override val operationalPhenomena: OperationalPhenomena by anyOf(
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt
index 90840db8..98eb989d 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt
@@ -22,6 +22,7 @@
package org.opendc.experiments.capelin
+import org.opendc.compute.workload.trace
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
@@ -36,7 +37,7 @@ public class TestPortfolio : Portfolio("test") {
)
override val workload: Workload by anyOf(
- Workload("solvinity", 1.0)
+ Workload("solvinity", trace("solvinity"))
)
override val operationalPhenomena: OperationalPhenomena by anyOf(
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt
deleted file mode 100644
index babd8ada..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.env
-
-import org.opendc.simulator.compute.model.MachineModel
-import org.opendc.simulator.compute.model.MemoryUnit
-import org.opendc.simulator.compute.model.ProcessingNode
-import org.opendc.simulator.compute.model.ProcessingUnit
-import org.opendc.simulator.compute.power.LinearPowerModel
-import java.io.File
-import java.io.FileInputStream
-import java.io.InputStream
-import java.util.*
-
-/**
- * A [EnvironmentReader] for the internal environment format.
- *
- * @param input The input stream describing the physical cluster.
- */
-class ClusterEnvironmentReader(private val input: InputStream) : EnvironmentReader {
- /**
- * Construct a [ClusterEnvironmentReader] for the specified [file].
- */
- constructor(file: File) : this(FileInputStream(file))
-
- override fun read(): List<MachineDef> {
- var clusterIdCol = 0
- var speedCol = 0
- var numberOfHostsCol = 0
- var memoryPerHostCol = 0
- var coresPerHostCol = 0
-
- var clusterIdx = 0
- var clusterId: String
- var speed: Double
- var numberOfHosts: Int
- var memoryPerHost: Long
- var coresPerHost: Int
-
- val nodes = mutableListOf<MachineDef>()
- val random = Random(0)
-
- input.bufferedReader().use { reader ->
- reader.lineSequence()
- .filter { line ->
- // Ignore comments in the file
- !line.startsWith("#") && line.isNotBlank()
- }
- .forEachIndexed { idx, line ->
- val values = line.split(";")
-
- if (idx == 0) {
- val header = values.mapIndexed { col, name -> Pair(name.trim(), col) }.toMap()
- clusterIdCol = header["ClusterID"]!!
- speedCol = header["Speed"]!!
- numberOfHostsCol = header["numberOfHosts"]!!
- memoryPerHostCol = header["memoryCapacityPerHost"]!!
- coresPerHostCol = header["coreCountPerHost"]!!
- return@forEachIndexed
- }
-
- clusterIdx++
- clusterId = values[clusterIdCol].trim()
- speed = values[speedCol].trim().toDouble() * 1000.0
- numberOfHosts = values[numberOfHostsCol].trim().toInt()
- memoryPerHost = values[memoryPerHostCol].trim().toLong() * 1000L
- coresPerHost = values[coresPerHostCol].trim().toInt()
-
- val unknownProcessingNode = ProcessingNode("unknown", "unknown", "unknown", coresPerHost)
- val unknownMemoryUnit = MemoryUnit("unknown", "unknown", -1.0, memoryPerHost)
-
- repeat(numberOfHosts) {
- nodes.add(
- MachineDef(
- UUID(random.nextLong(), random.nextLong()),
- "node-$clusterId-$it",
- mapOf("cluster" to clusterId),
- MachineModel(
- List(coresPerHost) { coreId ->
- ProcessingUnit(unknownProcessingNode, coreId, speed)
- },
- listOf(unknownMemoryUnit)
- ),
- // For now we assume a simple linear load model with an idle draw of ~200W and a maximum
- // power draw of 350W.
- // Source: https://stackoverflow.com/questions/6128960
- LinearPowerModel(350.0, idlePower = 200.0)
- )
- )
- }
- }
- }
-
- return nodes
- }
-
- override fun close() {
- input.close()
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt
index c4ddd158..a2e71243 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt
@@ -22,23 +22,12 @@
package org.opendc.experiments.capelin.model
-public enum class SamplingStrategy {
- REGULAR,
- HPC,
- HPC_LOAD
-}
+import org.opendc.compute.workload.ComputeWorkload
/**
- * A workload that is considered for a scenario.
- */
-public open class Workload(
- public open val name: String,
- public val fraction: Double,
- public val samplingStrategy: SamplingStrategy = SamplingStrategy.REGULAR
-)
-
-/**
- * A workload that is composed of multiple workloads.
+ * A single workload originating from a trace.
+ *
+ * @param name the name of the workload.
+ * @param source The source of the workload data.
*/
-public class CompositeWorkload(override val name: String, public val workloads: List<Workload>, public val totalLoad: Double) :
- Workload(name, -1.0)
+data class Workload(val name: String, val source: ComputeWorkload)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/ClusterSpec.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/ClusterSpec.kt
new file mode 100644
index 00000000..b8b65d28
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/ClusterSpec.kt
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.topology
+
+/**
+ * Definition of a compute cluster modeled in the simulation.
+ *
+ * @param id A unique identifier representing the compute cluster.
+ * @param name The name of the cluster.
+ * @param cpuCount The total number of CPUs in the cluster.
+ * @param cpuSpeed The speed of a CPU in the cluster in MHz.
+ * @param memCapacity The total memory capacity of the cluster (in MiB).
+ * @param hostCount The number of hosts in the cluster.
+ * @param memCapacityPerHost The memory capacity per host in the cluster (MiB).
+ * @param cpuCountPerHost The number of CPUs per host in the cluster.
+ */
+public data class ClusterSpec(
+ val id: String,
+ val name: String,
+ val cpuCount: Int,
+ val cpuSpeed: Double,
+ val memCapacity: Double,
+ val hostCount: Int,
+ val memCapacityPerHost: Double,
+ val cpuCountPerHost: Int
+)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/ClusterSpecReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/ClusterSpecReader.kt
new file mode 100644
index 00000000..5a175f2c
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/ClusterSpecReader.kt
@@ -0,0 +1,121 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.topology
+
+import com.fasterxml.jackson.annotation.JsonProperty
+import com.fasterxml.jackson.databind.MappingIterator
+import com.fasterxml.jackson.databind.ObjectReader
+import com.fasterxml.jackson.dataformat.csv.CsvMapper
+import com.fasterxml.jackson.dataformat.csv.CsvSchema
+import java.io.File
+import java.io.InputStream
+
+/**
+ * A helper class for reading a cluster specification file.
+ */
+class ClusterSpecReader {
+ /**
+ * The [CsvMapper] to map the environment file to an object.
+ */
+ private val mapper = CsvMapper()
+
+ /**
+ * The [ObjectReader] to convert the lines into objects.
+ */
+ private val reader: ObjectReader = mapper.readerFor(Entry::class.java).with(schema)
+
+ /**
+ * Read the specified [file].
+ */
+ fun read(file: File): List<ClusterSpec> {
+ return reader.readValues<Entry>(file).use { read(it) }
+ }
+
+ /**
+ * Read the specified [input].
+ */
+ fun read(input: InputStream): List<ClusterSpec> {
+ return reader.readValues<Entry>(input).use { read(it) }
+ }
+
+ /**
+ * Convert the specified [MappingIterator] into a list of [ClusterSpec]s.
+ */
+ private fun read(it: MappingIterator<Entry>): List<ClusterSpec> {
+ val result = mutableListOf<ClusterSpec>()
+
+ for (entry in it) {
+ val def = ClusterSpec(
+ entry.id,
+ entry.name,
+ entry.cpuCount,
+ entry.cpuSpeed * 1000, // Convert to MHz
+ entry.memCapacity * 1000, // Convert to MiB
+ entry.hostCount,
+ entry.memCapacityPerHost * 1000,
+ entry.cpuCountPerHost
+ )
+ result.add(def)
+ }
+
+ return result
+ }
+
+ private open class Entry(
+ @JsonProperty("ClusterID")
+ val id: String,
+ @JsonProperty("ClusterName")
+ val name: String,
+ @JsonProperty("Cores")
+ val cpuCount: Int,
+ @JsonProperty("Speed")
+ val cpuSpeed: Double,
+ @JsonProperty("Memory")
+ val memCapacity: Double,
+ @JsonProperty("numberOfHosts")
+ val hostCount: Int,
+ @JsonProperty("memoryCapacityPerHost")
+ val memCapacityPerHost: Double,
+ @JsonProperty("coreCountPerHost")
+ val cpuCountPerHost: Int
+ )
+
+ companion object {
+ /**
+ * The [CsvSchema] that is used to parse the trace.
+ */
+ private val schema = CsvSchema.builder()
+ .addColumn("ClusterID", CsvSchema.ColumnType.STRING)
+ .addColumn("ClusterName", CsvSchema.ColumnType.STRING)
+ .addColumn("Cores", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Speed", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Memory", CsvSchema.ColumnType.NUMBER)
+ .addColumn("numberOfHosts", CsvSchema.ColumnType.NUMBER)
+ .addColumn("memoryCapacityPerHost", CsvSchema.ColumnType.NUMBER)
+ .addColumn("coreCountPerHost", CsvSchema.ColumnType.NUMBER)
+ .setAllowComments(true)
+ .setColumnSeparator(';')
+ .setUseHeader(true)
+ .build()
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt
new file mode 100644
index 00000000..5ab4261a
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt
@@ -0,0 +1,103 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+@file:JvmName("TopologyFactories")
+package org.opendc.experiments.capelin.topology
+
+import org.opendc.compute.workload.topology.HostSpec
+import org.opendc.compute.workload.topology.Topology
+import org.opendc.simulator.compute.model.MachineModel
+import org.opendc.simulator.compute.model.MemoryUnit
+import org.opendc.simulator.compute.model.ProcessingNode
+import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.power.LinearPowerModel
+import org.opendc.simulator.compute.power.PowerModel
+import org.opendc.simulator.compute.power.SimplePowerDriver
+import java.io.File
+import java.io.InputStream
+import java.util.*
+import kotlin.math.roundToLong
+
+/**
+ * A [ClusterSpecReader] that is used to read the cluster definition file.
+ */
+private val reader = ClusterSpecReader()
+
+/**
+ * Construct a [Topology] from the specified [file].
+ */
+fun clusterTopology(
+ file: File,
+ powerModel: PowerModel = LinearPowerModel(350.0, idlePower = 200.0),
+ random: Random = Random(0)
+): Topology = clusterTopology(reader.read(file), powerModel, random)
+
+/**
+ * Construct a [Topology] from the specified [input].
+ */
+fun clusterTopology(
+ input: InputStream,
+ powerModel: PowerModel = LinearPowerModel(350.0, idlePower = 200.0),
+ random: Random = Random(0)
+): Topology = clusterTopology(reader.read(input), powerModel, random)
+
+/**
+ * Construct a [Topology] from the given list of [clusters].
+ */
+fun clusterTopology(
+ clusters: List<ClusterSpec>,
+ powerModel: PowerModel,
+ random: Random = Random(0)
+): Topology {
+ return object : Topology {
+ override fun resolve(): List<HostSpec> {
+ val hosts = mutableListOf<HostSpec>()
+ for (cluster in clusters) {
+ val cpuSpeed = cluster.cpuSpeed
+ val memoryPerHost = cluster.memCapacityPerHost.roundToLong()
+
+ val unknownProcessingNode = ProcessingNode("unknown", "unknown", "unknown", cluster.cpuCountPerHost)
+ val unknownMemoryUnit = MemoryUnit("unknown", "unknown", -1.0, memoryPerHost)
+ val machineModel = MachineModel(
+ List(cluster.cpuCountPerHost) { coreId -> ProcessingUnit(unknownProcessingNode, coreId, cpuSpeed) },
+ listOf(unknownMemoryUnit)
+ )
+
+ repeat(cluster.hostCount) {
+ val spec = HostSpec(
+ UUID(random.nextLong(), it.toLong()),
+ "node-${cluster.name}-$it",
+ mapOf("cluster" to cluster.id),
+ machineModel,
+ SimplePowerDriver(powerModel)
+ )
+
+ hosts += spec
+ }
+ }
+
+ return hosts
+ }
+
+ override fun toString(): String = "ClusterSpecTopology"
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt
deleted file mode 100644
index 0bf4ada6..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace
-
-import org.opendc.experiments.capelin.model.CompositeWorkload
-import org.opendc.experiments.capelin.model.Workload
-import org.opendc.simulator.compute.workload.SimWorkload
-
-/**
- * A [TraceReader] for the internal VM workload trace format.
- *
- * @param rawReaders The internal raw trace readers to use.
- * @param workload The workload to read.
- * @param seed The seed to use for sampling.
- */
-public class ParquetTraceReader(
- rawReaders: List<RawParquetTraceReader>,
- workload: Workload,
- seed: Int
-) : TraceReader<SimWorkload> {
- /**
- * The iterator over the actual trace.
- */
- private val iterator: Iterator<TraceEntry<SimWorkload>> =
- rawReaders
- .map { it.read() }
- .run {
- if (workload is CompositeWorkload) {
- this.zip(workload.workloads)
- } else {
- this.zip(listOf(workload))
- }
- }
- .flatMap { sampleWorkload(it.first, workload, it.second, seed).sortedBy(TraceEntry<SimWorkload>::start) }
- .iterator()
-
- override fun hasNext(): Boolean = iterator.hasNext()
-
- override fun next(): TraceEntry<SimWorkload> = iterator.next()
-
- override fun close() {}
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt
deleted file mode 100644
index ed82217d..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace
-
-import mu.KotlinLogging
-import org.apache.avro.generic.GenericData
-import org.apache.parquet.avro.AvroParquetReader
-import org.apache.parquet.filter2.compat.FilterCompat
-import org.apache.parquet.filter2.predicate.FilterApi
-import org.apache.parquet.filter2.predicate.Statistics
-import org.apache.parquet.filter2.predicate.UserDefinedPredicate
-import org.apache.parquet.io.api.Binary
-import org.opendc.simulator.compute.workload.SimTraceWorkload
-import org.opendc.simulator.compute.workload.SimWorkload
-import org.opendc.trace.util.parquet.LocalInputFile
-import java.io.File
-import java.io.Serializable
-import java.util.SortedSet
-import java.util.TreeSet
-import java.util.UUID
-import java.util.concurrent.ArrayBlockingQueue
-import kotlin.concurrent.thread
-
-/**
- * A [TraceReader] for the internal VM workload trace format that streams workloads on the fly.
- *
- * @param traceFile The directory of the traces.
- * @param selectedVms The list of VMs to read from the trace.
- */
-class StreamingParquetTraceReader(traceFile: File, selectedVms: List<String> = emptyList()) : TraceReader<SimWorkload> {
- private val logger = KotlinLogging.logger {}
-
- /**
- * The internal iterator to use for this reader.
- */
- private val iterator: Iterator<TraceEntry<SimWorkload>>
-
- /**
- * The intermediate buffer to store the read records in.
- */
- private val queue = ArrayBlockingQueue<Pair<String, SimTraceWorkload.Fragment>>(1024)
-
- /**
- * An optional filter for filtering the selected VMs
- */
- private val filter =
- if (selectedVms.isEmpty())
- null
- else
- FilterCompat.get(
- FilterApi.userDefined(
- FilterApi.binaryColumn("id"),
- SelectedVmFilter(
- TreeSet(selectedVms)
- )
- )
- )
-
- /**
- * A poisonous fragment.
- */
- private val poison = Pair("\u0000", SimTraceWorkload.Fragment(0L, 0, 0.0, 0))
-
- /**
- * The thread to read the records in.
- */
- private val readerThread = thread(start = true, name = "sc20-reader") {
- val reader = AvroParquetReader
- .builder<GenericData.Record>(LocalInputFile(File(traceFile, "trace.parquet")))
- .disableCompatibility()
- .withFilter(filter)
- .build()
-
- try {
- while (true) {
- val record = reader.read()
-
- if (record == null) {
- queue.put(poison)
- break
- }
-
- val id = record["id"].toString()
- val time = record["time"] as Long
- val duration = record["duration"] as Long
- val cores = record["cores"] as Int
- val cpuUsage = record["cpuUsage"] as Double
-
- val fragment = SimTraceWorkload.Fragment(
- time,
- duration,
- cpuUsage,
- cores
- )
-
- queue.put(id to fragment)
- }
- } catch (e: InterruptedException) {
- // Do not rethrow this
- } finally {
- reader.close()
- }
- }
-
- /**
- * Fill the buffers with the VMs
- */
- private fun pull(buffers: Map<String, List<MutableList<SimTraceWorkload.Fragment>>>) {
- if (!hasNext) {
- return
- }
-
- val fragments = mutableListOf<Pair<String, SimTraceWorkload.Fragment>>()
- queue.drainTo(fragments)
-
- for ((id, fragment) in fragments) {
- if (id == poison.first) {
- hasNext = false
- return
- }
- buffers[id]?.forEach { it.add(fragment) }
- }
- }
-
- /**
- * A flag to indicate whether the reader has more entries.
- */
- private var hasNext: Boolean = true
-
- /**
- * Initialize the reader.
- */
- init {
- val takenIds = mutableSetOf<UUID>()
- val entries = mutableMapOf<String, GenericData.Record>()
- val buffers = mutableMapOf<String, MutableList<MutableList<SimTraceWorkload.Fragment>>>()
-
- val metaReader = AvroParquetReader
- .builder<GenericData.Record>(LocalInputFile(File(traceFile, "meta.parquet")))
- .disableCompatibility()
- .withFilter(filter)
- .build()
-
- while (true) {
- val record = metaReader.read() ?: break
- val id = record["id"].toString()
- entries[id] = record
- }
-
- metaReader.close()
-
- val selection = selectedVms.ifEmpty { entries.keys }
-
- // Create the entry iterator
- iterator = selection.asSequence()
- .mapNotNull { entries[it] }
- .mapIndexed { index, record ->
- val id = record["id"].toString()
- val submissionTime = record["submissionTime"] as Long
- val endTime = record["endTime"] as Long
- val maxCores = record["maxCores"] as Int
- val requiredMemory = record["requiredMemory"] as Long
- val uid = UUID.nameUUIDFromBytes("$id-$index".toByteArray())
-
- assert(uid !in takenIds)
- takenIds += uid
-
- logger.info { "Processing VM $id" }
-
- val internalBuffer = mutableListOf<SimTraceWorkload.Fragment>()
- val externalBuffer = mutableListOf<SimTraceWorkload.Fragment>()
- buffers.getOrPut(id) { mutableListOf() }.add(externalBuffer)
- val fragments = sequence {
- var time = submissionTime
- repeat@ while (true) {
- if (externalBuffer.isEmpty()) {
- if (hasNext) {
- pull(buffers)
- continue
- } else {
- break
- }
- }
-
- internalBuffer.addAll(externalBuffer)
- externalBuffer.clear()
-
- for (fragment in internalBuffer) {
- yield(fragment)
-
- time += fragment.duration
- if (time >= endTime) {
- break@repeat
- }
- }
-
- internalBuffer.clear()
- }
-
- buffers.remove(id)
- }
- val workload = SimTraceWorkload(fragments)
- val meta = mapOf(
- "cores" to maxCores,
- "required-memory" to requiredMemory,
- "workload" to workload
- )
-
- TraceEntry(uid, id, submissionTime, workload, meta)
- }
- .sortedBy { it.start }
- .toList()
- .iterator()
- }
-
- override fun hasNext(): Boolean = iterator.hasNext()
-
- override fun next(): TraceEntry<SimWorkload> = iterator.next()
-
- override fun close() {
- readerThread.interrupt()
- }
-
- private class SelectedVmFilter(val selectedVms: SortedSet<String>) : UserDefinedPredicate<Binary>(), Serializable {
- override fun keep(value: Binary?): Boolean = value != null && selectedVms.contains(value.toStringUsingUTF8())
-
- override fun canDrop(statistics: Statistics<Binary>): Boolean {
- val min = statistics.min
- val max = statistics.max
-
- return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isEmpty()
- }
-
- override fun inverseCanDrop(statistics: Statistics<Binary>): Boolean {
- val min = statistics.min
- val max = statistics.max
-
- return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isNotEmpty()
- }
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt
deleted file mode 100644
index cb32ce88..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace
-
-import mu.KotlinLogging
-import org.opendc.experiments.capelin.model.CompositeWorkload
-import org.opendc.experiments.capelin.model.SamplingStrategy
-import org.opendc.experiments.capelin.model.Workload
-import org.opendc.simulator.compute.workload.SimWorkload
-import java.util.*
-import kotlin.random.Random
-
-private val logger = KotlinLogging.logger {}
-
-/**
- * Sample the workload for the specified [run].
- */
-public fun sampleWorkload(
- trace: List<TraceEntry<SimWorkload>>,
- workload: Workload,
- subWorkload: Workload,
- seed: Int
-): List<TraceEntry<SimWorkload>> {
- return when {
- workload is CompositeWorkload -> sampleRegularWorkload(trace, workload, subWorkload, seed)
- workload.samplingStrategy == SamplingStrategy.HPC ->
- sampleHpcWorkload(trace, workload, seed, sampleOnLoad = false)
- workload.samplingStrategy == SamplingStrategy.HPC_LOAD ->
- sampleHpcWorkload(trace, workload, seed, sampleOnLoad = true)
- else ->
- sampleRegularWorkload(trace, workload, workload, seed)
- }
-}
-
-/**
- * Sample a regular (non-HPC) workload.
- */
-public fun sampleRegularWorkload(
- trace: List<TraceEntry<SimWorkload>>,
- workload: Workload,
- subWorkload: Workload,
- seed: Int
-): List<TraceEntry<SimWorkload>> {
- val fraction = subWorkload.fraction
-
- val shuffled = trace.shuffled(Random(seed))
- val res = mutableListOf<TraceEntry<SimWorkload>>()
- val totalLoad = if (workload is CompositeWorkload) {
- workload.totalLoad
- } else {
- shuffled.sumOf { it.meta.getValue("total-load") as Double }
- }
- var currentLoad = 0.0
-
- for (entry in shuffled) {
- val entryLoad = entry.meta.getValue("total-load") as Double
- if ((currentLoad + entryLoad) / totalLoad > fraction) {
- break
- }
-
- currentLoad += entryLoad
- res += entry
- }
-
- logger.info { "Sampled ${trace.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }
-
- return res
-}
-
-/**
- * Sample a HPC workload.
- */
-public fun sampleHpcWorkload(
- trace: List<TraceEntry<SimWorkload>>,
- workload: Workload,
- seed: Int,
- sampleOnLoad: Boolean
-): List<TraceEntry<SimWorkload>> {
- val pattern = Regex("^vm__workload__(ComputeNode|cn).*")
- val random = Random(seed)
-
- val fraction = workload.fraction
- val (hpc, nonHpc) = trace.partition { entry ->
- val name = entry.name
- name.matches(pattern)
- }
-
- val hpcSequence = generateSequence(0) { it + 1 }
- .map { index ->
- val res = mutableListOf<TraceEntry<SimWorkload>>()
- hpc.mapTo(res) { sample(it, index) }
- res.shuffle(random)
- res
- }
- .flatten()
-
- val nonHpcSequence = generateSequence(0) { it + 1 }
- .map { index ->
- val res = mutableListOf<TraceEntry<SimWorkload>>()
- nonHpc.mapTo(res) { sample(it, index) }
- res.shuffle(random)
- res
- }
- .flatten()
-
- logger.debug { "Found ${hpc.size} HPC workloads and ${nonHpc.size} non-HPC workloads" }
-
- val totalLoad = if (workload is CompositeWorkload) {
- workload.totalLoad
- } else {
- trace.sumOf { it.meta.getValue("total-load") as Double }
- }
-
- logger.debug { "Total trace load: $totalLoad" }
- var hpcCount = 0
- var hpcLoad = 0.0
- var nonHpcCount = 0
- var nonHpcLoad = 0.0
-
- val res = mutableListOf<TraceEntry<SimWorkload>>()
-
- if (sampleOnLoad) {
- var currentLoad = 0.0
- for (entry in hpcSequence) {
- val entryLoad = entry.meta.getValue("total-load") as Double
- if ((currentLoad + entryLoad) / totalLoad > fraction) {
- break
- }
-
- hpcLoad += entryLoad
- hpcCount += 1
- currentLoad += entryLoad
- res += entry
- }
-
- for (entry in nonHpcSequence) {
- val entryLoad = entry.meta.getValue("total-load") as Double
- if ((currentLoad + entryLoad) / totalLoad > 1) {
- break
- }
-
- nonHpcLoad += entryLoad
- nonHpcCount += 1
- currentLoad += entryLoad
- res += entry
- }
- } else {
- hpcSequence
- .take((fraction * trace.size).toInt())
- .forEach { entry ->
- hpcLoad += entry.meta.getValue("total-load") as Double
- hpcCount += 1
- res.add(entry)
- }
-
- nonHpcSequence
- .take(((1 - fraction) * trace.size).toInt())
- .forEach { entry ->
- nonHpcLoad += entry.meta.getValue("total-load") as Double
- nonHpcCount += 1
- res.add(entry)
- }
- }
-
- logger.debug { "HPC $hpcCount (load $hpcLoad) and non-HPC $nonHpcCount (load $nonHpcLoad)" }
- logger.debug { "Total sampled load: ${hpcLoad + nonHpcLoad}" }
- logger.info { "Sampled ${trace.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }
-
- return res
-}
-
-/**
- * Sample a random trace entry.
- */
-private fun sample(entry: TraceEntry<SimWorkload>, i: Int): TraceEntry<SimWorkload> {
- val uid = UUID.nameUUIDFromBytes("${entry.uid}-$i".toByteArray())
- return entry.copy(uid = uid)
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/Schemas.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/Schemas.kt
deleted file mode 100644
index 7dd8161d..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/Schemas.kt
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace.bp
-
-import org.apache.avro.Schema
-import org.apache.avro.SchemaBuilder
-
-/**
- * Schema for the resources table in the trace.
- */
-val BP_RESOURCES_SCHEMA: Schema = SchemaBuilder
- .record("meta")
- .namespace("org.opendc.trace.capelin")
- .fields()
- .requiredString("id")
- .requiredLong("submissionTime")
- .requiredLong("endTime")
- .requiredInt("maxCores")
- .requiredLong("requiredMemory")
- .endRecord()
-
-/**
- * Schema for the resource states table in the trace.
- */
-val BP_RESOURCE_STATES_SCHEMA: Schema = SchemaBuilder
- .record("meta")
- .namespace("org.opendc.trace.capelin")
- .fields()
- .requiredString("id")
- .requiredLong("time")
- .requiredLong("duration")
- .requiredInt("cores")
- .requiredDouble("cpuUsage")
- .requiredLong("flops")
- .endRecord()
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/VmPlacementReader.kt
index b55bd577..67de2777 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/VmPlacementReader.kt
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.trace
+package org.opendc.experiments.capelin.util
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml b/opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml
index d1c01b8e..d46b50c3 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml
@@ -36,7 +36,7 @@
<Logger name="org.opendc.experiments.capelin" level="info" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
- <Logger name="org.opendc.experiments.capelin.trace" level="debug" additivity="false">
+ <Logger name="org.opendc.experiments.vm.trace" level="debug" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
<Logger name="org.apache.hadoop" level="warn" additivity="false">
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 727530e3..ac2ea646 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -31,16 +31,12 @@ import org.opendc.compute.service.scheduler.filters.ComputeFilter
import org.opendc.compute.service.scheduler.filters.RamFilter
import org.opendc.compute.service.scheduler.filters.VCpuFilter
import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
-import org.opendc.experiments.capelin.env.ClusterEnvironmentReader
-import org.opendc.experiments.capelin.env.EnvironmentReader
-import org.opendc.experiments.capelin.model.Workload
-import org.opendc.experiments.capelin.trace.ParquetTraceReader
-import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader
-import org.opendc.experiments.capelin.trace.RawParquetTraceReader
-import org.opendc.experiments.capelin.trace.TraceReader
-import org.opendc.experiments.capelin.util.ComputeServiceSimulator
+import org.opendc.compute.workload.*
+import org.opendc.compute.workload.topology.Topology
+import org.opendc.compute.workload.topology.apply
+import org.opendc.compute.workload.util.PerformanceInterferenceReader
+import org.opendc.experiments.capelin.topology.clusterTopology
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
-import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.telemetry.compute.ComputeMetricExporter
import org.opendc.telemetry.compute.ComputeMonitor
@@ -66,6 +62,11 @@ class CapelinIntegrationTest {
private lateinit var computeScheduler: FilterScheduler
/**
+ * The [ComputeWorkloadLoader] responsible for loading the traces.
+ */
+ private lateinit var workloadLoader: ComputeWorkloadLoader
+
+ /**
* Setup the experimental environment.
*/
@BeforeEach
@@ -75,6 +76,7 @@ class CapelinIntegrationTest {
filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
weighers = listOf(CoreRamWeigher(multiplier = 1.0))
)
+ workloadLoader = ComputeWorkloadLoader(File("src/test/resources/trace"))
}
/**
@@ -82,26 +84,24 @@ class CapelinIntegrationTest {
*/
@Test
fun testLarge() = runBlockingSimulation {
- val traceReader = createTestTraceReader()
- val environmentReader = createTestEnvironmentReader()
-
- val simulator = ComputeServiceSimulator(
+ val workload = createTestWorkload(1.0)
+ val runner = ComputeWorkloadRunner(
coroutineContext,
clock,
- computeScheduler,
- environmentReader.read(),
+ computeScheduler
)
-
- val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
+ val topology = createTopology()
+ val metricReader = CoroutineMetricReader(this, runner.producers, ComputeMetricExporter(clock, monitor))
try {
- simulator.run(traceReader)
+ runner.apply(topology)
+ runner.run(workload, 0)
} finally {
- simulator.close()
+ runner.close()
metricReader.close()
}
- val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0])
+ val serviceMetrics = collectServiceMetrics(clock.instant(), runner.producers[0])
println(
"Scheduler " +
"Success=${serviceMetrics.attemptsSuccess} " +
@@ -117,11 +117,11 @@ class CapelinIntegrationTest {
{ assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") },
{ assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") },
{ assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") },
- { assertEquals(223856043, monitor.idleTime) { "Incorrect idle time" } },
- { assertEquals(66481557, monitor.activeTime) { "Incorrect active time" } },
- { assertEquals(360441, monitor.stealTime) { "Incorrect steal time" } },
+ { assertEquals(223331032, monitor.idleTime) { "Incorrect idle time" } },
+ { assertEquals(67006568, monitor.activeTime) { "Incorrect active time" } },
+ { assertEquals(3159379, monitor.stealTime) { "Incorrect steal time" } },
{ assertEquals(0, monitor.lostTime) { "Incorrect lost time" } },
- { assertEquals(5.418336360461193E9, monitor.energyUsage, 0.01) { "Incorrect power draw" } },
+ { assertEquals(5.841120890240688E9, monitor.energyUsage, 0.01) { "Incorrect power draw" } },
)
}
@@ -131,20 +131,19 @@ class CapelinIntegrationTest {
@Test
fun testSmall() = runBlockingSimulation {
val seed = 1
- val traceReader = createTestTraceReader(0.25, seed)
- val environmentReader = createTestEnvironmentReader("single")
+ val workload = createTestWorkload(0.25, seed)
- val simulator = ComputeServiceSimulator(
+ val simulator = ComputeWorkloadRunner(
coroutineContext,
clock,
- computeScheduler,
- environmentReader.read(),
+ computeScheduler
)
-
+ val topology = createTopology("single")
val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
try {
- simulator.run(traceReader)
+ simulator.apply(topology)
+ simulator.run(workload, seed.toLong())
} finally {
simulator.close()
metricReader.close()
@@ -162,9 +161,9 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(9597804, monitor.idleTime) { "Idle time incorrect" } },
- { assertEquals(11140596, monitor.activeTime) { "Active time incorrect" } },
- { assertEquals(326138, monitor.stealTime) { "Steal time incorrect" } },
+ { assertEquals(10998110, monitor.idleTime) { "Idle time incorrect" } },
+ { assertEquals(9740290, monitor.activeTime) { "Active time incorrect" } },
+ { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } },
{ assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }
)
}
@@ -174,28 +173,26 @@ class CapelinIntegrationTest {
*/
@Test
fun testInterference() = runBlockingSimulation {
- val seed = 1
- val traceReader = createTestTraceReader(0.25, seed)
- val environmentReader = createTestEnvironmentReader("single")
-
+ val seed = 0
+ val workload = createTestWorkload(1.0, seed)
val perfInterferenceInput = checkNotNull(CapelinIntegrationTest::class.java.getResourceAsStream("/bitbrains-perf-interference.json"))
val performanceInterferenceModel =
PerformanceInterferenceReader()
.read(perfInterferenceInput)
.let { VmInterferenceModel(it, Random(seed.toLong())) }
- val simulator = ComputeServiceSimulator(
+ val simulator = ComputeWorkloadRunner(
coroutineContext,
clock,
computeScheduler,
- environmentReader.read(),
interferenceModel = performanceInterferenceModel
)
-
+ val topology = createTopology("single")
val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
try {
- simulator.run(traceReader)
+ simulator.apply(topology)
+ simulator.run(workload, seed.toLong())
} finally {
simulator.close()
metricReader.close()
@@ -213,10 +210,10 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(9597804, monitor.idleTime) { "Idle time incorrect" } },
- { assertEquals(11140596, monitor.activeTime) { "Active time incorrect" } },
- { assertEquals(326138, monitor.stealTime) { "Steal time incorrect" } },
- { assertEquals(925305, monitor.lostTime) { "Lost time incorrect" } }
+ { assertEquals(6013899, monitor.idleTime) { "Idle time incorrect" } },
+ { assertEquals(14724501, monitor.activeTime) { "Active time incorrect" } },
+ { assertEquals(12530742, monitor.stealTime) { "Steal time incorrect" } },
+ { assertEquals(473394, monitor.lostTime) { "Lost time incorrect" } }
)
}
@@ -226,21 +223,19 @@ class CapelinIntegrationTest {
@Test
fun testFailures() = runBlockingSimulation {
val seed = 1
- val traceReader = createTestTraceReader(0.25, seed)
- val environmentReader = createTestEnvironmentReader("single")
-
- val simulator = ComputeServiceSimulator(
+ val simulator = ComputeWorkloadRunner(
coroutineContext,
clock,
computeScheduler,
- environmentReader.read(),
- grid5000(Duration.ofDays(7), seed)
+ grid5000(Duration.ofDays(7))
)
-
+ val topology = createTopology("single")
+ val workload = createTestWorkload(0.25, seed)
val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
try {
- simulator.run(traceReader)
+ simulator.apply(topology)
+ simulator.run(workload, seed.toLong())
} finally {
simulator.close()
metricReader.close()
@@ -258,31 +253,28 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(9836315, monitor.idleTime) { "Idle time incorrect" } },
- { assertEquals(10902085, monitor.activeTime) { "Active time incorrect" } },
- { assertEquals(306249, monitor.stealTime) { "Steal time incorrect" } },
+ { assertEquals(11134319, monitor.idleTime) { "Idle time incorrect" } },
+ { assertEquals(9604081, monitor.activeTime) { "Active time incorrect" } },
+ { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } },
{ assertEquals(0, monitor.lostTime) { "Lost time incorrect" } },
- { assertEquals(2540877457, monitor.uptime) { "Uptime incorrect" } }
+ { assertEquals(2559005056, monitor.uptime) { "Uptime incorrect" } }
)
}
/**
* Obtain the trace reader for the test.
*/
- private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<SimWorkload> {
- return ParquetTraceReader(
- listOf(RawParquetTraceReader(File("src/test/resources/trace"))),
- Workload("test", fraction),
- seed
- )
+ private fun createTestWorkload(fraction: Double, seed: Int = 0): List<VirtualMachine> {
+ val source = trace("bitbrains-small").sampleByLoad(fraction)
+ return source.resolve(workloadLoader, Random(seed.toLong()))
}
/**
- * Obtain the environment reader for the test.
+ * Obtain the topology factory for the test.
*/
- private fun createTestEnvironmentReader(name: String = "topology"): EnvironmentReader {
+ private fun createTopology(name: String = "topology"): Topology {
val stream = checkNotNull(object {}.javaClass.getResourceAsStream("/env/$name.txt"))
- return ClusterEnvironmentReader(stream)
+ return stream.use { clusterTopology(stream) }
}
class TestExperimentReporter : ComputeMonitor {
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/meta.parquet b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/meta.parquet
new file mode 100644
index 00000000..da6e5330
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/meta.parquet
Binary files differ
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/trace.parquet b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/trace.parquet
new file mode 100644
index 00000000..fe0a254c
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/trace.parquet
Binary files differ
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet
deleted file mode 100644
index ee76d38f..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet
+++ /dev/null
Binary files differ
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet
deleted file mode 100644
index 9b1cde13..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet
+++ /dev/null
Binary files differ
diff --git a/opendc-simulator/opendc-simulator-compute/build.gradle.kts b/opendc-simulator/opendc-simulator-compute/build.gradle.kts
index 74384480..7d06ee62 100644
--- a/opendc-simulator/opendc-simulator-compute/build.gradle.kts
+++ b/opendc-simulator/opendc-simulator-compute/build.gradle.kts
@@ -36,5 +36,4 @@ dependencies {
api(projects.opendcSimulator.opendcSimulatorNetwork)
implementation(projects.opendcSimulator.opendcSimulatorCore)
implementation(projects.opendcUtils)
- implementation(libs.yaml)
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/InterpolationPowerModel.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/InterpolationPowerModel.kt
index 0c995f06..2694700c 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/InterpolationPowerModel.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/InterpolationPowerModel.kt
@@ -22,7 +22,6 @@
package org.opendc.simulator.compute.power
-import org.yaml.snakeyaml.Yaml
import kotlin.math.ceil
import kotlin.math.floor
import kotlin.math.max
@@ -37,8 +36,6 @@ import kotlin.math.min
* @see <a href="http://www.spec.org/power_ssj2008/results/res2011q1/">Machines used in the SPEC benchmark</a>
*/
public class InterpolationPowerModel(private val powerValues: List<Double>) : PowerModel {
- public constructor(hardwareName: String) : this(loadAveragePowerValue(hardwareName))
-
public override fun computePower(utilization: Double): Double {
val clampedUtilization = min(1.0, max(0.0, utilization))
val utilizationFlr = floor(clampedUtilization * 10).toInt()
@@ -63,14 +60,4 @@ public class InterpolationPowerModel(private val powerValues: List<Double>) : Po
* @return the power consumption for the given utilization percentage
*/
private fun getAveragePowerValue(index: Int): Double = powerValues[index]
-
- private companion object {
- private fun loadAveragePowerValue(hardwareName: String, path: String = "spec_machines.yml"): List<Double> {
- val content = this::class
- .java.classLoader
- .getResourceAsStream(path)
- val hardwareToAveragePowerValues: Map<String, List<Double>> = Yaml().load(content)
- return hardwareToAveragePowerValues.getOrDefault(hardwareName, listOf())
- }
- }
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
index 48be8e1a..5a4c4f44 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
@@ -27,6 +27,7 @@ import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.resources.SimResourceCommand
import org.opendc.simulator.resources.SimResourceConsumer
import org.opendc.simulator.resources.SimResourceContext
+import kotlin.math.min
/**
* A [SimWorkload] that replays a workload trace consisting of multiple fragments, each indicating the resource
@@ -89,15 +90,16 @@ public class SimTraceWorkload(public val trace: Sequence<Fragment>, private val
return SimResourceCommand.Idle(timestamp)
}
+ val cores = min(cpu.node.coreCount, fragment.cores)
val usage = if (fragment.cores > 0)
- fragment.usage / fragment.cores
+ fragment.usage / cores
else
0.0
val deadline = timestamp + fragment.duration
val duration = deadline - now
val work = duration * usage / 1000
- return if (cpu.id < fragment.cores && work > 0.0)
+ return if (cpu.id < cores && work > 0.0)
SimResourceCommand.Consume(work, usage, deadline)
else
SimResourceCommand.Idle(deadline)
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/power/PowerModelTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/power/PowerModelTest.kt
index ac2ed303..7852534a 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/power/PowerModelTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/power/PowerModelTest.kt
@@ -61,7 +61,8 @@ internal class PowerModelTest {
@Test
fun `compute power draw by the SPEC benchmark model`() {
- val powerModel = InterpolationPowerModel("IBMx3550M3_XeonX5675")
+ val ibm = listOf(58.4, 98.0, 109.0, 118.0, 128.0, 140.0, 153.0, 170.0, 189.0, 205.0, 222.0)
+ val powerModel = InterpolationPowerModel(ibm)
assertAll(
{ assertEquals(58.4, powerModel.computePower(0.0)) },
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt
index e2e5ea6d..219002e0 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt
@@ -47,7 +47,7 @@ public val RESOURCE_STOP_TIME: TableColumn<Instant> = TableColumn("resource:stop
* Number of CPUs for the resource.
*/
@JvmField
-public val RESOURCE_NCPUS: TableColumn<Int> = intColumn("resource:num_cpus")
+public val RESOURCE_CPU_COUNT: TableColumn<Int> = intColumn("resource:cpu_count")
/**
* Memory capacity for the resource in KB.
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt
index 1933967e..b683923b 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt
@@ -60,7 +60,7 @@ public val RESOURCE_STATE_POWERED_ON: TableColumn<Boolean> = booleanColumn("reso
* Number of CPUs for the resource.
*/
@JvmField
-public val RESOURCE_STATE_NCPUS: TableColumn<Int> = intColumn("resource_state:ncpus")
+public val RESOURCE_STATE_CPU_COUNT: TableColumn<Int> = intColumn("resource_state:cpu_count")
/**
* Total CPU capacity of the resource in MHz.
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/MachineDef.kt b/opendc-trace/opendc-trace-azure/build.gradle.kts
index b0c0318f..8bde56cb 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/MachineDef.kt
+++ b/opendc-trace/opendc-trace-azure/build.gradle.kts
@@ -20,19 +20,17 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.env
+description = "Support for Azure VM traces in OpenDC"
-import org.opendc.simulator.compute.model.MachineModel
-import org.opendc.simulator.compute.power.PowerModel
-import java.util.*
+/* Build configuration */
+plugins {
+ `kotlin-library-conventions`
+ `testing-conventions`
+ `jacoco-conventions`
+}
-/**
- * A definition of a machine in a cluster.
- */
-public data class MachineDef(
- val uid: UUID,
- val name: String,
- val meta: Map<String, Any>,
- val model: MachineModel,
- val powerModel: PowerModel
-)
+dependencies {
+ api(platform(projects.opendcPlatform))
+ api(projects.opendcTrace.opendcTraceApi)
+ implementation(libs.jackson.dataformat.csv)
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTable.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt
index f98f4b2c..84c9b347 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTable.kt
+++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.trace.azure
+package org.opendc.trace.azure
import com.fasterxml.jackson.dataformat.csv.CsvFactory
import org.opendc.trace.*
@@ -37,7 +37,7 @@ internal class AzureResourceStateTable(private val factory: CsvFactory, path: Pa
/**
* The partitions that belong to the table.
*/
- private val partitions = Files.walk(path, 1)
+ private val partitions = Files.walk(path.resolve("vm_cpu_readings"), 1)
.filter { !Files.isDirectory(it) && it.extension == "csv" }
.collect(Collectors.toMap({ it.nameWithoutExtension }, { it }))
.toSortedMap()
@@ -68,9 +68,9 @@ internal class AzureResourceStateTable(private val factory: CsvFactory, path: Pa
delegate.close()
delegate = nextDelegate()
+ this.delegate = delegate
}
- this.delegate = delegate
return delegate != null
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTableReader.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt
index f80c0e82..c17a17ab 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTableReader.kt
+++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.trace.azure
+package org.opendc.trace.azure
import com.fasterxml.jackson.core.JsonToken
import com.fasterxml.jackson.dataformat.csv.CsvParser
@@ -53,7 +53,7 @@ internal class AzureResourceStateTableReader(private val parser: CsvParser) : Ta
when (parser.currentName) {
"timestamp" -> timestamp = Instant.ofEpochSecond(parser.longValue)
"vm id" -> id = parser.text
- "avg cpu" -> cpuUsagePct = parser.doubleValue
+ "CPU avg cpu" -> cpuUsagePct = parser.doubleValue
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTable.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTable.kt
index c9d4f7eb..96ee3158 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTable.kt
+++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTable.kt
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.trace.azure
+package org.opendc.trace.azure
import com.fasterxml.jackson.dataformat.csv.CsvFactory
import org.opendc.trace.*
@@ -38,7 +38,7 @@ internal class AzureResourceTable(private val factory: CsvFactory, private val p
RESOURCE_ID,
RESOURCE_START_TIME,
RESOURCE_STOP_TIME,
- RESOURCE_NCPUS,
+ RESOURCE_CPU_COUNT,
RESOURCE_MEM_CAPACITY
)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTableReader.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt
index b712b854..5ea97483 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTableReader.kt
+++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt
@@ -20,12 +20,11 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.trace.azure
+package org.opendc.trace.azure
import com.fasterxml.jackson.core.JsonToken
import com.fasterxml.jackson.dataformat.csv.CsvParser
import com.fasterxml.jackson.dataformat.csv.CsvSchema
-import org.apache.parquet.example.Paper.schema
import org.opendc.trace.*
import java.time.Instant
@@ -68,7 +67,7 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe
RESOURCE_ID -> true
RESOURCE_START_TIME -> true
RESOURCE_STOP_TIME -> true
- RESOURCE_NCPUS -> true
+ RESOURCE_CPU_COUNT -> true
RESOURCE_MEM_CAPACITY -> true
else -> false
}
@@ -79,8 +78,8 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe
RESOURCE_ID -> id
RESOURCE_START_TIME -> startTime
RESOURCE_STOP_TIME -> stopTime
- RESOURCE_NCPUS -> getInt(RESOURCE_STATE_NCPUS)
- RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_STATE_MEM_CAPACITY)
+ RESOURCE_CPU_COUNT -> getInt(RESOURCE_CPU_COUNT)
+ RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_MEM_CAPACITY)
else -> throw IllegalArgumentException("Invalid column")
}
@@ -94,7 +93,7 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe
override fun getInt(column: TableColumn<Int>): Int {
return when (column) {
- RESOURCE_NCPUS -> cpuCores
+ RESOURCE_CPU_COUNT -> cpuCores
else -> throw IllegalArgumentException("Invalid column")
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTrace.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTrace.kt
index 24c60bab..c7e7dc36 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTrace.kt
+++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTrace.kt
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.trace.azure
+package org.opendc.trace.azure
import com.fasterxml.jackson.dataformat.csv.CsvFactory
import org.opendc.trace.*
@@ -29,7 +29,7 @@ import java.nio.file.Path
/**
* [Trace] implementation for the Azure v1 VM traces.
*/
-class AzureTrace internal constructor(private val factory: CsvFactory, private val path: Path) : Trace {
+public class AzureTrace internal constructor(private val factory: CsvFactory, private val path: Path) : Trace {
override val tables: List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES)
override fun containsTable(name: String): Boolean = name in tables
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTraceFormat.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt
index 744e43a0..1230d857 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTraceFormat.kt
+++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.trace.azure
+package org.opendc.trace.azure
import com.fasterxml.jackson.dataformat.csv.CsvFactory
import com.fasterxml.jackson.dataformat.csv.CsvParser
@@ -32,11 +32,11 @@ import kotlin.io.path.exists
/**
* A format implementation for the Azure v1 format.
*/
-class AzureTraceFormat : TraceFormat {
+public class AzureTraceFormat : TraceFormat {
/**
* The name of this trace format.
*/
- override val name: String = "azure-v1"
+ override val name: String = "azure"
/**
* The [CsvFactory] used to create the parser.
diff --git a/opendc-trace/opendc-trace-azure/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat b/opendc-trace/opendc-trace-azure/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat
new file mode 100644
index 00000000..08e75529
--- /dev/null
+++ b/opendc-trace/opendc-trace-azure/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat
@@ -0,0 +1 @@
+org.opendc.trace.azure.AzureTraceFormat
diff --git a/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt b/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt
new file mode 100644
index 00000000..e5735f0d
--- /dev/null
+++ b/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt
@@ -0,0 +1,113 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.azure
+
+import org.junit.jupiter.api.Assertions.*
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertThrows
+import org.opendc.trace.*
+import java.io.File
+import java.net.URL
+
+/**
+ * Test suite for the [AzureTraceFormat] class.
+ */
+class AzureTraceFormatTest {
+ private val format = AzureTraceFormat()
+
+ @Test
+ fun testTraceExists() {
+ val url = File("src/test/resources/trace").toURI().toURL()
+ assertDoesNotThrow {
+ format.open(url)
+ }
+ }
+
+ @Test
+ fun testTraceDoesNotExists() {
+ val url = File("src/test/resources/trace").toURI().toURL()
+ assertThrows<IllegalArgumentException> {
+ format.open(URL(url.toString() + "help"))
+ }
+ }
+
+ @Test
+ fun testTables() {
+ val url = File("src/test/resources/trace").toURI().toURL()
+ val trace = format.open(url)
+
+ assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), trace.tables)
+ }
+
+ @Test
+ fun testTableExists() {
+ val url = File("src/test/resources/trace").toURI().toURL()
+ val table = format.open(url).getTable(TABLE_RESOURCE_STATES)
+
+ assertNotNull(table)
+ assertDoesNotThrow { table!!.newReader() }
+ }
+
+ @Test
+ fun testTableDoesNotExist() {
+ val url = File("src/test/resources/trace").toURI().toURL()
+ val trace = format.open(url)
+
+ assertFalse(trace.containsTable("test"))
+ assertNull(trace.getTable("test"))
+ }
+
+ @Test
+ fun testResources() {
+ val url = File("src/test/resources/trace").toURI().toURL()
+ val trace = format.open(url)
+
+ val reader = trace.getTable(TABLE_RESOURCES)!!.newReader()
+
+ assertAll(
+ { assertTrue(reader.nextRow()) },
+ { assertEquals("x/XsOfHO4ocsV99i4NluqKDuxctW2MMVmwqOPAlg4wp8mqbBOe3wxBlQo0+Qx+uf", reader.get(RESOURCE_ID)) },
+ { assertEquals(1, reader.getInt(RESOURCE_CPU_COUNT)) },
+ { assertEquals(1750000.0, reader.getDouble(RESOURCE_MEM_CAPACITY)) },
+ )
+
+ reader.close()
+ }
+
+ @Test
+ fun testSmoke() {
+ val url = File("src/test/resources/trace").toURI().toURL()
+ val trace = format.open(url)
+
+ val reader = trace.getTable(TABLE_RESOURCE_STATES)!!.newReader()
+
+ assertAll(
+ { assertTrue(reader.nextRow()) },
+ { assertEquals("+ZcrOp5/c/fJ6mVgP5qMZlOAGDwyjaaDNM0WoWOt2IDb47gT0UwK9lFwkPQv3C7Q", reader.get(RESOURCE_STATE_ID)) },
+ { assertEquals(0, reader.get(RESOURCE_STATE_TIMESTAMP).epochSecond) },
+ { assertEquals(2.86979, reader.getDouble(RESOURCE_STATE_CPU_USAGE_PCT), 0.01) }
+ )
+
+ reader.close()
+ }
+}
diff --git a/opendc-trace/opendc-trace-azure/src/test/resources/trace/vm_cpu_readings/vm_cpu_readings-file-1-of-125.csv b/opendc-trace/opendc-trace-azure/src/test/resources/trace/vm_cpu_readings/vm_cpu_readings-file-1-of-125.csv
new file mode 100644
index 00000000..db6ddf8a
--- /dev/null
+++ b/opendc-trace/opendc-trace-azure/src/test/resources/trace/vm_cpu_readings/vm_cpu_readings-file-1-of-125.csv
@@ -0,0 +1,100 @@
+0,+ZcrOp5/c/fJ6mVgP5qMZlOAGDwyjaaDNM0WoWOt2IDb47gT0UwK9lFwkPQv3C7Q,2.052803,3.911587,2.86979
+0,2zrgeOqUDy+l0GVi5NXudU+3sqZH+nLowfcz+D/JsCymTXbKrRf1Hr3OjAtxjnKm,1.64695,8.794403,3.254472
+0,/34Wh1Kq/qkNkW0tQrMiQ1eZ8hg9hHopydCzsXriefhgrn+0Rg1j22k1IHcV6PIQ,2.440088,6.941048,4.33624
+0,2lzdXk1Rqn1ibH2kZhGamYTMvVcRP6+x8b5zGiD/8t++5BQhzU18hGaL5sfR01Lo,0.302992,2.046712,0.970692
+0,0GrUQuLhCER5bWWcoJAgblPJWkaU4v3nf+NUrZnFTlXWEK99qgTRBTkjjUjJVAqA,1.515922,4.471657,2.438805
+0,2I8OpI6bMkdzL3HYLz4KlBcDhy2VTEm3skQbOvEo9rLoxryB0iB9iVh3rGd5DW2j,0.148552,0.315007,0.264341
+0,2IuuDcRMd97gln/+CrgPqI/fwffx67s87T1odKrA0wLYf8YuzGooHdKihitv2Q+s,0.169838,2.277277,0.859669
+0,2KaB1faO0ZB2KqB8MGwasWkqRLJHIE6+2wPuhzlzLNEUyeGzo0dU7brFa/cll/VJ,0.539162,1.371926,0.782212
+0,2BMVXt472mr/Y8m1vaaGoyGTSXcLvXk968PCHixwCDjPSgCm7yYSimGuBw7VPIiS,3.625195,7.211996,4.807884
+0,/3+EY60PnzKwod6nAUGBFSDDpBBOVEVUi90JWWWjPAlNyTUrGwlfQcSDoSkRumD7,0.180582,1.313473,0.43792
+0,+hulsuci78MKSG60G/gHJLqmz5/TFEB3WpS6HI1G1mm052le8oeemF3kz3eoPsnS,2.653344,9.983403,4.262461
+0,0O4otykohyRcsqqsg68kqo6ZCY6sL6eQLHUMYZxGVRhwQmTXRUN89izib3pOucrC,0.72983,4.516831,1.846142
+0,239KfRqrlUdyYuU0ubcASPKztu3q7hernahrolO5AczjUFI/QgoU+OoKzPuivFHQ,1.42953,11.553488,4.271241
+0,/SVzWHvPhr7KAIOUFr10EK8WdKXbrJojgcc4IGvutJ2S6HpRMD0zTfv/h0720+Q6,1.676634,6.703915,3.102252
+0,2a7bYEHqZvcgOeos5Q3J5qxpY4lXinv8M9mORfel5DlWRut0JynZtobNGNlBWn41,0.54178,8.239316,2.234877
+0,1NwFYwEAgv8qnCaWzzWv9hHj0TIJAZ2HT+iH+dsZKeSAPGoJGyVSDB+Zj4EuqWRC,2.933124,4.630701,3.975568
+0,3rg4SRyS/p6eMuGCJpjkz4oHzXSeeF16a7jJ9GAAYPiAQAsQNOEjHOe07on5RbjK,0.719536,3.383279,1.506528
+0,0DVV+uR/jr4XbwYQhVf2Yg0Kg7DfIDa7qJNzqvjVgEqGRJAUisrnYFv7AWr1k7by,0.949333,22.157649,3.751856
+0,3bHtb6EIFo9yXByIhpVDOJ7bzbIQvnGGb+jm8eOsEf0eKbrKMJvUiOYc6Wq3DXbR,3.504534,15.581505,6.388424
+0,0O5yc/ZVSHWxf4UHf/1b8Nut9raakorgqDwGV9k7TJdq55alNeMDB7CREuxZystP,7.587743,20.323464,16.540802
+0,0ZbYi+cMH7hCzT+8ICYVp5ZgcRUFNKsODuH09bbPdPioUPCPkBK2PM2oHhE3y4I6,2.694185,6.361789,4.55337
+0,1jw70sEl89jY2iRpd38PuYSBiOcuwe6tF4Q+YuGBJg20+gRIW3A7H3WZ+uL0EVmb,3.570395,5.707428,4.233997
+0,24MvpVXzcNO2qxwF4hMwCToFTBfoAE5xUQ4L6fwfWuBZ1GW06hHh5jWwWCu+8lPm,5.102273,8.678012,6.369649
+0,2gHzFAqM+fL7f1wtNETuzSoM7I6xlEWk2BJmj1SNXly/7z1RQFmwYRXU49DiYciJ,0.27374,54.447146,5.445003
+0,/hCom+lGMIkeE1wQi+VTFh+zzgbikbO0jQDzchDMCUNSgo6cEJfD1sIT2Ok4NlD6,0.170892,1.843549,0.737087
+0,2UwesOu8HXTdHyj0jd1agckz1KH5+Z4KOFe+wKFo9uvRI4GalozAPaxsMrBmx7Wo,3.349887,6.272554,4.425039
+0,1V8Fr/ZhjQcxql5s9p3hA1b0Wx6Sx9e+np1OImlp3GKyleH87bYjmQLZJouKYJR2,2.022219,4.724097,2.616506
+0,23E/SPMZKCUWz8nBmuCdbNBWf9ou6IQuZjmh0x2/icPrbLLvUk5SvbTjwqoLQxBX,0,0.46365,0.178483
+0,0Mj8nT0fnkeMIbcTBf27pOtUuTtMZH8uAZqAViSaye+9mBIjsNPmU6Z5hLK6f2I0,15.023186,23.297875,18.965327
+0,2xM0uOcqSowNzsbFbzhy5J1Ms2vv0jVQ5aM+J2E/LCBzTVKPrCCeWQ/r/cKmS1Tm,8.272075,9.415241,8.797159
+0,0MYQXyW75q9UURkn+O/V6iww0JaBl2qRG0Mh2bqRcuU5/Ws+7HJMPKSzVKlUEgcU,3.798828,8.915124,4.856879
+0,/HQfnMjgclpCxPod9jmGVQxfTnsjyNWA4KNkLMn4IKRlqheUo9AhhWv4vAumZNqg,4.788548,7.269977,6.640435
+0,0Q2PP+9O7LcnNI7AJQQR7pwM4ISG4024Z+INOw+TWgf2DCl8/prdGC7QJRGjc+Aa,0.10703,0.183798,0.136907
+0,/zLQxB1DGXC7iK7JeyYrUSguf6DjNA1MVTJzieRWmcobm0M+xgd28r842y3p5u5J,1.306953,3.22913,2.226509
+0,42cXpXkVqdXH/ok/tD46zKKCToy0k6HXoH3x7eeo4+zIva3IJKle5xfSEW3R45ON,1.018462,3.240817,2.196357
+0,+9HYwMx1Ckj15bJswEycBgiBSfrBw5NJE3p86IeFpFYKKxdw3NzMPTFKpg67XhsF,1.859664,7.255261,3.501303
+0,10KKTL95cApo6Pf24KZqgrM67v4M6rgZBoX+w/I3j4KS66FNhKomGnap9H8SVAvy,0.041225,2.593651,0.25894
+0,+LyaeKb1faiLEjAzynXF3xO/ZAho1R/Zyh1H4d45+NGsIJR6ryUTDmhyNvMh1wQ9,4.614357,11.692623,6.05005
+0,1SS5EeD9rxdWRFYBkR36PAd96w+Q7V2V4fDcc/2IJ1L07In7RGpQk/HVcOTKd78w,0.020435,0.515471,0.135453
+0,+HFoxb6Eu9kwzVkxs+A+9Q7zXa4aSIcOFm3AnYDCTQQMYyf6EST9nSHslGhUkgAD,8.53904,48.459572,16.166212
+0,+N+B5FPJIUVyH9v1Zcc+kjSTNvULkosDBM48N2JkDjhuVhQtWSfYQMQTQkGeVjLi,3.139119,99.036916,51.090982
+0,1ey9c7Hc1FyxLVbESoty7AkXbuENFSDXRAZiizFifRmJNM6IEx9eNu3bkUR+qCUJ,2.466582,5.842213,3.765056
+0,35F/52yPsKPGondM8xnzX68EKiKiKiZMDqsVnvc9ZOAc/rS3zvQ6YYj3QkLAHFhN,1.963258,43.494868,16.459037
+0,2KX+BTc0TPZOtCgbzKtKvP1yrM+Cc3WQU9DPkZDFD/5aNN/aPV40aQCKwW/HeTzh,1.040522,5.961609,3.305858
+0,+8X+qRHRLwwgj70uuXrkus7lrNtjMeTHfy5yQgymNJI+yFd5pbhRfStfS7lkVOhP,0.436353,15.995153,1.431229
+0,/g5MAtFnYaMO5MpJg40BsFmhS22s0tfwHiivGhPbcZ+KgEAtNxKkFdZYDtrDUUFO,6.905489,8.196952,7.527238
+0,/ke0seVq80UFQeXSTUh5hTrjghtn5qqWf38lQVTis+/ZR6Pdv5vdAotz4dvZcKDp,6.444482,23.136676,15.470455
+0,+tQeKqKqbAui7YXK0Efk3GUnvbzM+0pOpmOJ6OhkMSozjRyl5tHl7+mZwFznU3Mk,17.90259,20.095464,18.937014
+0,/hiC5yD45GhNtMpJTVwVF5ZnNNWfEHttESv/+KH6go9FBoncns+CuQ1M92c0xzFA,2.290396,2.609893,2.523336
+0,0i9+1LVd2t4m1KScDuoJnAAEL0bz9UGXh2iLAGV/8Eq5hTsAliyraV7j6wsf2MZX,4.266491,16.607137,6.929279
+0,2PVcv0/vy8mIjzH7CiB9cJU737jRi6kAO7PhqkxEWA4GrxvaCsK3ZDckhD8YR04U,1.048596,2.309172,1.447266
+0,/kbT+MIfY7jEW2Nn+TKf5BKkLAmBslDqKuZ8HI2Ire6eMKinGP7aTt6SY77vt8PK,2.409783,7.79851,5.552826
+0,2cCRKSXs9v9tPskjJn8UmV15qynI3I3GLPTor/i81nxh5Ocwb7Fq1zwEN5zmtXyx,0.356014,1.468193,0.781642
+0,2qsVNbcvPD0H3cs/p/6MTpuvUBtr5QN3iavAmkCQBCtrHcEpgskYVJf/6WQkEhOF,2.688901,85.501739,37.676562
+0,30FpxnoytvMKoGeJYqwnuL2mPbvKlxpjPIfVT8LKqqFl9smEksQjEzG3lgxhT4U7,2.499018,6.534664,3.508567
+0,/f1C+4xtoPaBxD+FoFdM52MiaWXZEqPqSnBxz4q4XMzoXabJvdddHchLrxc6SlYc,1.894231,15.683948000000001,3.199591
+0,30tz9NOV1bIKUB6uIOy4qZT8BVk3escZ0bWXBD9oedOQN1Qi06pplm7WM9iMvvvL,0.959278,63.599827,14.983399
+0,2q0sA6/4VZfksnucqVASzYgruD9T0219afuGrf3O/u8jpGHpn0k3oWvY35I7x8F/,2.694575,11.900751,5.254742
+0,/Qq/SKTnRJ4RZPWKIdCyPmYQUf+csOcFYS+rVD+kc1OkLboeKHK7CLV88wVVLlm9,55.553347,99.204744,93.215797
+0,2PJIXiy3/m1MNf4SQAQ9xU+LDqsHvyyCIWA2X0nB9kgLyVNh3g9xxpAeUpkXgvK6,0.591771,0.676084,0.628958
+0,/VIH23Tzi+711eCdsc7apDAoSBY6hcNqCu8oaZcPrUQmUXUyH8HJS7Z1DyhR6j/I,3.136726,5.477124,4.036594
+0,3/bNFRCZog1M2qwSCcwMYYos07f/9kRsfeFyaOmT0mNx3ldbNvRRbMBhoseq0DIg,2.993954,5.787727,4.272684
+0,3F+42xbLAiVPTJeHpyDwx6ZXcxArLFiMGGZTa9jmsLIpxxkBqC1QwN8mAwzDqWsU,3.488578,6.178318,4.692753
+0,08iqvtN8ilXeJdfiL86fde5JRTrjuLTp8guNabblV7QqkkAL23TwtLdwuFtg4P9G,3.64316,22.992153,10.256498
+0,0ZiQ/5P4mgnYud0uaI1lZCIJaCzrlEJdnAz8bcFMLDFryCrUJJDecbWQbLo6K69J,2.924592,4.261972,3.543138
+0,28JHlDFu72v9lIhjKLF+h9g1pyPq9+ruVET8NnBGKksclnvwx0WlQ066nh6doanS,1.2833,1.589682,1.353967
+0,3ClcWgHBEw8WzFSqnMYKUib9Abx6RDf3ITN8ivUilopa4t+UTJU0Y/U25sT/1okS,1.387814,2.764987,2.116221
+0,/qj8bL8dARqa83U6HwU/bUF5kLq12PKaebM0/2WrM2a3oH+BCC/IxFf1PjIWBNC5,23.139855,97.95723,75.918613
+0,17KWFIkHqLQpslptyD70Qof2iISdFN4IzZBc/WffQeds/tDjuZ/1O4KY68u10srE,2.374392,4.461708,3.201956
+0,3fNyZ1Bf9hUvTVDbHwh8Fh3E2i0BgPPL3QkkS9T0cjanDQA0u0z/Y5TSdXldEJM8,1.199056,3.188352,2.14033
+0,3DYNNYBvhBlVPHsg1uoo7ZVjKX5k1c0gZsfc8W0o0cJ1WJAI8f049TnSu/yIfp/m,1.305688,4.700476,2.216015
+0,2e9qO7smv0DTuXeR3VEzG2jztbM9wntJ3bMt6/LlN3RZBQzIY9vP7FFsphJC9bsW,0.087859,22.556549,10.203507
+0,3EeP6Vgbh292ahLWQJrInzehyR4Nuj2vNtdWuEbvFjKcmCc2i6VZVN4dQTRfIVxR,7.663198,22.199953,15.461753
+0,/Vi7oNg70eAzJHXwsCM9nzwBMg4l7cMyZhUT14V48AWjIAQzVYsbdI0KwNlBAXhK,0.61977,2.24158,1.181003
+0,4/c7nkT3SrtRRrRCsZxUJXxJjUr61iivwZxdihwPAtpCDUawKfPUzaq/05zFYBAk,2.667104,7.383679,4.050989
+0,1HYzfmk+s4SedWtOeHk4j5Zj52ateGX5bRFK5K3rwTVdB2A2m+3iwbL1IEzx8ir8,5.366892,12.404488,6.877072
+0,3vPq2HsXQ9SQT+URugEaQ3ezvstcGd5Bt9FIiFx1SrUfUrvvi/Gj8Nyw5DZhvyAR,3.014601,13.363316,4.535414
+0,2YbmUab2MqBMpvMaoaMP3zVxOhgqkNytraWdt/GG261oZ/tmgEB239WsbKJh1bE3,3.121409,98.73306,51.009852
+0,1IYQhDD8NGuAFnVPnffmt1yk20B9JHQI5DMC4Ny09pe6Sedik6YCIIVeBHIEo34W,1.512222,3.53396,2.379989
+0,1SSMSUcJ7qKM7q2yka80+ZP0yYWiYxGQxcJ8KBi4+TsDpv5FLUS6i2DHLMtXB3An,3.9704,4.345802,4.126586
+0,17CA6zpUCxW+Pdh2g5W0kTdlPlgWbBKz4YrHvbGP/Hmf13nZQBc/VZO7EL6nM75C,8.052588,16.023168,13.600106
+0,04rwScmEvRr0aU/mAE7aKtKFwowolGaTAPyQHuaVKEFmEVMAKxo+7UBCk3vRRRBd,2.221999,5.809178,3.021269
+0,1H9K/TW4c28Aob/H1O53cyQT7pHRww0L1ocyn19z1+MxC+k+5M/PgEx9B3zT/CNf,2.985884,7.584636,3.995057
+0,2fgXOaNZld/i7o20ULRNhCeL+o+vgZYzDOIhQ2n28TcGxXR047+F1b7QiD+l1Ypf,0.068074,0.884132,0.239792
+0,+0bAvqEMTl/RGyFmuz4zJH3DLMI6Q+iHapYn5BpbZI+0PNNfM7PXm/mojw+e8Xpn,3.238927,4.259525,3.611511
+0,3OdFPkhA5Q99wyfxmgyxPAhWyDLkV++XFtPL8pD3w5f8mBWbokeBwgk4gmNIxCOL,0.461767,10.466777,4.985617
+0,0UE8gxQAdCGY+WGN9yd9CL2ZGGqoyGQ2PzQGndwecce24GyTUnuvREbnMWBZZ7bG,0.730279,6.785359,3.363408
+0,/Uk/U5u4d+KNQVPD63pklfxeWc2zDAkUnrVmvxgRTuqNFbn90h8TuU5GZ+OamGQ5,0.105853,1.739301,0.262678
+0,2im96EJfLyxm7TPrtOR9m6Inq4E4/qR+AvP0TbnSdvzXI+N9gHh7C2fzppzcR0i8,0.325895,2.012216,0.802437
+0,+CrXBNhT3ch1hYU2e9IGs7wfjSLRkKYgidJYc42LlsH39cYtwdAX3wKm1OGlf+Kl,18.815771,40.850218,22.470045
+0,/hXRrrjPrAw8xDSsJnEwLdkRN1e42zJLE/HO5DXk5gbGLRmRx5H9n4T0UmraZ8uW,0.361838,0.831517,0.423214
+0,/sTadDDv8poFeLWS7lD/SEtEgWCBHXB1IaiitjCru4AcK8Z32hNXlccdY8hlFzTp,3.203254,5.682829,3.859569
+0,333YaK054AGlUYuw0XWxYn5K8NwzhfzJ3mm4YNwB1YXKjgnO64ZItBNaBRQoOgXn,0.124811,0.384592,0.257066
+0,+ZkQz7QrPZIODz45A+60ZFnG18jnyYlSY/IgEe1Yj8c4cU8h+L8WDIKMv2uB7EwD,1.022656,6.508863,3.368929
+0,+X4DW7zA6whRfOWSHHONJ1u3f0DyBvC9PqDmXGFfbxT4aUGCC6kVm6fuGu9IsQyL,3.428286,15.183059,5.743137
+0,2KXdN0Pb4iyu0jVPocTTf3dwk2Z1LjIlAcydV3HURGIUn1dTycCDDCHg5G6l6i9t,0.282044,0.40582,0.311669
+0,2lGxRtUbBrRZmIYagONMp6vj0zHk4EGhu0aSH5Ws/CAXwBNZpCavBFDNCEcPsOkt,3.662958,8.660027,5.281077
+0,+IR6CKA4zeO742dCx1l2hR0plhTanlaxPWAbckkZNo6UAti83TpYPRXrrfdmm9Ar,0.086237,2.450893,0.969819
+0,2/hWJ+i+1FSHiD44Rr3S4xWMUHC6hIgoVBX2XGZ7cOFyLn9FWQ3Kevsocw7CGaxJ,1.499537,2.832775,1.900258
+0,1WnALZnCvRlfqnuRyrIf0wxQOGLhGuvxInHelnMBM6cw9G9hydTBxqV60JSL/48p,0.717535,5.066802,1.448937
diff --git a/opendc-trace/opendc-trace-azure/src/test/resources/trace/vmtable/vmtable.csv b/opendc-trace/opendc-trace-azure/src/test/resources/trace/vmtable/vmtable.csv
new file mode 100644
index 00000000..299c518c
--- /dev/null
+++ b/opendc-trace/opendc-trace-azure/src/test/resources/trace/vmtable/vmtable.csv
@@ -0,0 +1,10 @@
+x/XsOfHO4ocsV99i4NluqKDuxctW2MMVmwqOPAlg4wp8mqbBOe3wxBlQo0+Qx+uf,VDU4C8cqdr+ORcqquwMRcsBA2l0SC6lCPys0wdghKROuxPYysA2XYii9Y5ZkaYaq,Pc2VLB8aDxK2DCC96itq4vW/zVDp4wioAUiB3HoGSFYQ0o6/ZCegTpb9vEH4LeMTEWVObHTPRYEY81TYivZCMQ==,0,2591700,99.369869,3.4240942342446719,10.194309,Delay-insensitive,1,1.75
+H5CxmMoVcZSpjgGbohnVA3R+7uCTe/hM2ht2uIYi3t7KwXB4tkBxmZHBrt2A4x+n,BSXOcywx8pUU0DueDo6UMol1YzR6tn47KLEKaoXp0a1bf2PpzJ7n7lLlmhQ0OJf9,3J17LcV4gXjFat62qhVFRfoiWArHnY763HVqqI6orJCfV8h5j9yeotRMnCLlX1ooGkMyQ2MDOuY1oz111AGN9Q==,0,1539300,100,6.18178366757598,33.98136,Interactive,1,0.75
+wR/G1YUjpMP4zUbxGM/XJNhYS8cAK3SGKM2tqhF7VdeTUYHGktQiKQNoDTtYvnAc,VDU4C8cqdr+ORcqquwMRcsBA2l0SC6lCPys0wdghKROuxPYysA2XYii9Y5ZkaYaq,Pc2VLB8aDxK2DCC96itq4vW/zVDp4wioAUiB3HoGSFYQ0o6/ZCegTpb9vEH4LeMT+hzuAPZnYJMu61JNhTDF/Q==,2188800,2591700,99.569027,3.5736346071428589,7.92425,Delay-insensitive,1,1.75
+1XiU+KpvIa3T1XP8kk3ZY71Of03+ogFL5Pag9Mc2jBuh0YqeW0Zcb9lepKLdPEDg,8u+M3WcFp8pq183WoMB79PhK7xUzbaviOBv0qWN6Xn4mbuNVM1GYJlIjswgit+k1,DHbeI+pYTYFjH8JAF8SewM0z/4SqQctvxcBRGIRglBmeLW5VjISVEw7/IpY345kHwHtk7+SKlEwc1upnT3PigA==,0,2591700,99.405085,16.2876105408034,95.69789,Delay-insensitive,8,56
+z5i2HiSaz6ZdLR6PXdnDjGva3jIlkMPXx23VtfXx9q3dXFRBQrxCOj7sHUsrmFLa,VDU4C8cqdr+ORcqquwMRcsBA2l0SC6lCPys0wdghKROuxPYysA2XYii9Y5ZkaYaq,Pc2VLB8aDxK2DCC96itq4vW/zVDp4wioAUiB3HoGSFYQ0o6/ZCegTpb9vEH4LeMTEWVObHTPRYEY81TYivZCMQ==,0,2188500,98.967961,3.036037969572376,9.445484,Delay-insensitive,1,1.75
+n77nP00/UpJmT+Yx1ZkDphvAqPoHU8yUpDCwyUtPNlRENqvNp6Inya1eiy7VP1+x,8u+M3WcFp8pq183WoMB79PhK7xUzbaviOBv0qWN6Xn4mbuNVM1GYJlIjswgit+k1,DHbeI+pYTYFjH8JAF8SewM0z/4SqQctvxcBRGIRglBmeLW5VjISVEw7/IpY345kHwHtk7+SKlEwc1upnT3PigA==,0,2591700,99.448473,34.17401179027781,98.553018,Delay-insensitive,8,56
+aTSXW3N1KepxKYwKumd7T1+f7DkGolSKV8EArYAdctjD26YqSMKezCVSdvmSgqIQ,dBub/K+8I6jD9t2ExqUdRNlVxPPvDWqICA9Sr+yzcBZ/nNuC0W2swapPoBNIRoF+,C9GnRqFF2lzW/elUsLEwhyAQj9D/d5JIOOgvwfPL1aINf+m1f29G7nXhr6mRPGbiofmjfP9GkepcWz9LX5tp7Q==,2290500,2292300,94.113335,32.461745857142866,94.113335,Unkown,1,1.75
+uSkGH3DS6BVo3RFnw3GZb6WCFSmGgvgKi4HIj08yxO4f5ladUQc3pqDOtqRN0W9+,8u+M3WcFp8pq183WoMB79PhK7xUzbaviOBv0qWN6Xn4mbuNVM1GYJlIjswgit+k1,DHbeI+pYTYFjH8JAF8SewM0z/4SqQctvxcBRGIRglBmeLW5VjISVEw7/IpY345kHwHtk7+SKlEwc1upnT3PigA==,0,2591700,99.276369,1.3500837561060346,23.450372,Delay-insensitive,8,56
+ztRY/Sk5mrSFFcpy2usZ0YZZ7Eumq130/5BB8WVXfWaYvFkU+EhXUQ2kOFkCXuCw,dBub/K+8I6jD9t2ExqUdRNlVxPPvDWqICA9Sr+yzcBZ/nNuC0W2swapPoBNIRoF+,C9GnRqFF2lzW/elUsLEwhyAQj9D/d5JIOOgvwfPL1aINf+m1f29G7nXhr6mRPGbiofmjfP9GkepcWz9LX5tp7Q==,2281200,2300100,98.671595,43.724999781249991,98.13707,Unkown,1,1.75
+bJoIb8ras2ZNNSdAz3CAu4HYRd6k9MOqij/+6/+/5XaYw4+EoGdUEr74DCi974gJ,8u+M3WcFp8pq183WoMB79PhK7xUzbaviOBv0qWN6Xn4mbuNVM1GYJlIjswgit+k1,DHbeI+pYTYFjH8JAF8SewM0z/4SqQctvxcBRGIRglBmeLW5VjISVEw7/IpY345kHwHtk7+SKlEwc1upnT3PigA==,0,2591700,99.498748,18.989459534151351,94.751666,Interactive,8,56
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt
index 67140fe9..4a60dff3 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.trace.sv
+package org.opendc.trace.bitbrains
import org.opendc.trace.*
import java.nio.file.Files
@@ -33,7 +33,7 @@ import kotlin.io.path.nameWithoutExtension
/**
* The resource state [Table] in the extended Bitbrains format.
*/
-internal class SvResourceStateTable(path: Path) : Table {
+internal class BitbrainsExResourceStateTable(path: Path) : Table {
/**
* The partitions that belong to the table.
*/
@@ -50,7 +50,7 @@ internal class SvResourceStateTable(path: Path) : Table {
RESOURCE_STATE_ID,
RESOURCE_STATE_CLUSTER_ID,
RESOURCE_STATE_TIMESTAMP,
- RESOURCE_STATE_NCPUS,
+ RESOURCE_STATE_CPU_COUNT,
RESOURCE_STATE_CPU_CAPACITY,
RESOURCE_STATE_CPU_USAGE,
RESOURCE_STATE_CPU_USAGE_PCT,
@@ -77,9 +77,9 @@ internal class SvResourceStateTable(path: Path) : Table {
delegate.close()
delegate = nextDelegate()
+ this.delegate = delegate
}
- this.delegate = delegate
return delegate != null
}
@@ -118,7 +118,7 @@ internal class SvResourceStateTable(path: Path) : Table {
return if (it.hasNext()) {
val (_, path) = it.next()
val reader = path.bufferedReader()
- return SvResourceStateTableReader(reader)
+ return BitbrainsExResourceStateTableReader(reader)
} else {
null
}
@@ -131,8 +131,8 @@ internal class SvResourceStateTable(path: Path) : Table {
override fun newReader(partition: String): TableReader {
val path = requireNotNull(partitions[partition]) { "Invalid partition $partition" }
val reader = path.bufferedReader()
- return SvResourceStateTableReader(reader)
+ return BitbrainsExResourceStateTableReader(reader)
}
- override fun toString(): String = "SvResourceStateTable"
+ override fun toString(): String = "BitbrainsExResourceStateTable"
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt
index 6ea403fe..f1cf7307 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.trace.sv
+package org.opendc.trace.bitbrains
import org.opendc.trace.*
import java.io.BufferedReader
@@ -29,7 +29,7 @@ import java.time.Instant
/**
* A [TableReader] for the Bitbrains resource state table.
*/
-internal class SvResourceStateTableReader(private val reader: BufferedReader) : TableReader {
+internal class BitbrainsExResourceStateTableReader(private val reader: BufferedReader) : TableReader {
override fun nextRow(): Boolean {
reset()
@@ -81,7 +81,7 @@ internal class SvResourceStateTableReader(private val reader: BufferedReader) :
COL_POWERED_ON -> poweredOn = field.toInt(10) == 1
COL_CPU_CAPACITY -> cpuCapacity = field.toDouble()
COL_ID -> id = field.trim()
- COL_MEM_CAPACITY -> memCapacity = field.toDouble()
+ COL_MEM_CAPACITY -> memCapacity = field.toDouble() * 1000 // Convert from MB to KB
}
}
@@ -93,7 +93,7 @@ internal class SvResourceStateTableReader(private val reader: BufferedReader) :
RESOURCE_STATE_ID -> true
RESOURCE_STATE_CLUSTER_ID -> true
RESOURCE_STATE_TIMESTAMP -> true
- RESOURCE_STATE_NCPUS -> true
+ RESOURCE_STATE_CPU_COUNT -> true
RESOURCE_STATE_CPU_CAPACITY -> true
RESOURCE_STATE_CPU_USAGE -> true
RESOURCE_STATE_CPU_USAGE_PCT -> true
@@ -111,7 +111,7 @@ internal class SvResourceStateTableReader(private val reader: BufferedReader) :
RESOURCE_STATE_ID -> id
RESOURCE_STATE_CLUSTER_ID -> cluster
RESOURCE_STATE_TIMESTAMP -> timestamp
- RESOURCE_STATE_NCPUS -> getInt(RESOURCE_STATE_NCPUS)
+ RESOURCE_STATE_CPU_COUNT -> getInt(RESOURCE_STATE_CPU_COUNT)
RESOURCE_STATE_CPU_CAPACITY -> getDouble(RESOURCE_STATE_CPU_CAPACITY)
RESOURCE_STATE_CPU_USAGE -> getDouble(RESOURCE_STATE_CPU_USAGE)
RESOURCE_STATE_CPU_USAGE_PCT -> getDouble(RESOURCE_STATE_CPU_USAGE_PCT)
@@ -134,7 +134,7 @@ internal class SvResourceStateTableReader(private val reader: BufferedReader) :
override fun getInt(column: TableColumn<Int>): Int {
return when (column) {
- RESOURCE_STATE_NCPUS -> cpuCores
+ RESOURCE_STATE_CPU_COUNT -> cpuCores
else -> throw IllegalArgumentException("Invalid column")
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTrace.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTrace.kt
index dbd63de5..f16c493d 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTrace.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTrace.kt
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.trace.sv
+package org.opendc.trace.bitbrains
import org.opendc.trace.*
import java.nio.file.Path
@@ -28,7 +28,7 @@ import java.nio.file.Path
/**
* [Trace] implementation for the extended Bitbrains format.
*/
-public class SvTrace internal constructor(private val path: Path) : Trace {
+public class BitbrainsExTrace internal constructor(private val path: Path) : Trace {
override val tables: List<String> = listOf(TABLE_RESOURCE_STATES)
override fun containsTable(name: String): Boolean = TABLE_RESOURCE_STATES == name
@@ -38,8 +38,8 @@ public class SvTrace internal constructor(private val path: Path) : Trace {
return null
}
- return SvResourceStateTable(path)
+ return BitbrainsExResourceStateTable(path)
}
- override fun toString(): String = "SvTrace[$path]"
+ override fun toString(): String = "BitbrainsExTrace[$path]"
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTraceFormat.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt
index 0cce8559..06388a84 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTraceFormat.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.trace.sv
+package org.opendc.trace.bitbrains
import org.opendc.trace.spi.TraceFormat
import java.net.URL
@@ -30,18 +30,18 @@ import kotlin.io.path.exists
/**
* A format implementation for the extended Bitbrains trace format.
*/
-public class SvTraceFormat : TraceFormat {
+public class BitbrainsExTraceFormat : TraceFormat {
/**
* The name of this trace format.
*/
- override val name: String = "sv"
+ override val name: String = "bitbrains-ex"
/**
* Open the trace file.
*/
- override fun open(url: URL): SvTrace {
+ override fun open(url: URL): BitbrainsExTrace {
val path = Paths.get(url.toURI())
require(path.exists()) { "URL $url does not exist" }
- return SvTrace(path)
+ return BitbrainsExTrace(path)
}
}
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt
index c9e5954d..7241b18b 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt
@@ -50,7 +50,7 @@ internal class BitbrainsResourceStateTable(private val factory: CsvFactory, path
override val columns: List<TableColumn<*>> = listOf(
RESOURCE_STATE_ID,
RESOURCE_STATE_TIMESTAMP,
- RESOURCE_STATE_NCPUS,
+ RESOURCE_STATE_CPU_COUNT,
RESOURCE_STATE_CPU_CAPACITY,
RESOURCE_STATE_CPU_USAGE,
RESOURCE_STATE_CPU_USAGE_PCT,
@@ -78,9 +78,9 @@ internal class BitbrainsResourceStateTable(private val factory: CsvFactory, path
delegate.close()
delegate = nextDelegate()
+ this.delegate = delegate
}
- this.delegate = delegate
return delegate != null
}
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt
index dab784c2..56e66f5c 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt
@@ -115,7 +115,7 @@ internal class BitbrainsResourceStateTableReader(private val partition: String,
return when (column) {
RESOURCE_STATE_ID -> true
RESOURCE_STATE_TIMESTAMP -> true
- RESOURCE_STATE_NCPUS -> true
+ RESOURCE_STATE_CPU_COUNT -> true
RESOURCE_STATE_CPU_CAPACITY -> true
RESOURCE_STATE_CPU_USAGE -> true
RESOURCE_STATE_CPU_USAGE_PCT -> true
@@ -133,7 +133,7 @@ internal class BitbrainsResourceStateTableReader(private val partition: String,
val res: Any? = when (column) {
RESOURCE_STATE_ID -> partition
RESOURCE_STATE_TIMESTAMP -> timestamp
- RESOURCE_STATE_NCPUS -> cpuCores
+ RESOURCE_STATE_CPU_COUNT -> cpuCores
RESOURCE_STATE_CPU_CAPACITY -> cpuCapacity
RESOURCE_STATE_CPU_USAGE -> cpuUsage
RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct
@@ -156,7 +156,7 @@ internal class BitbrainsResourceStateTableReader(private val partition: String,
override fun getInt(column: TableColumn<Int>): Int {
return when (column) {
- RESOURCE_STATE_NCPUS -> cpuCores
+ RESOURCE_STATE_CPU_COUNT -> cpuCores
else -> throw IllegalArgumentException("Invalid column")
}
}
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat b/opendc-trace/opendc-trace-bitbrains/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat
index f18135d0..fd6a2180 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat
@@ -1 +1,2 @@
org.opendc.trace.bitbrains.BitbrainsTraceFormat
+org.opendc.trace.bitbrains.BitbrainsExTraceFormat
diff --git a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt
new file mode 100644
index 00000000..2e4f176a
--- /dev/null
+++ b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt
@@ -0,0 +1,94 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.bitbrains
+
+import org.junit.jupiter.api.Assertions.*
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertThrows
+import org.opendc.trace.*
+import java.net.URL
+
+/**
+ * Test suite for the [BitbrainsExTraceFormat] class.
+ */
+class BitbrainsExTraceFormatTest {
+ private val format = BitbrainsExTraceFormat()
+
+ @Test
+ fun testTraceExists() {
+ val url = checkNotNull(BitbrainsExTraceFormatTest::class.java.getResource("/vm.txt"))
+ assertDoesNotThrow {
+ format.open(url)
+ }
+ }
+
+ @Test
+ fun testTraceDoesNotExists() {
+ val url = checkNotNull(BitbrainsExTraceFormatTest::class.java.getResource("/vm.txt"))
+ assertThrows<IllegalArgumentException> {
+ format.open(URL(url.toString() + "help"))
+ }
+ }
+
+ @Test
+ fun testTables() {
+ val url = checkNotNull(BitbrainsExTraceFormatTest::class.java.getResource("/vm.txt"))
+ val trace = format.open(url)
+
+ assertEquals(listOf(TABLE_RESOURCE_STATES), trace.tables)
+ }
+
+ @Test
+ fun testTableExists() {
+ val url = checkNotNull(BitbrainsExTraceFormatTest::class.java.getResource("/vm.txt"))
+ val table = format.open(url).getTable(TABLE_RESOURCE_STATES)
+
+ assertNotNull(table)
+ assertDoesNotThrow { table!!.newReader() }
+ }
+
+ @Test
+ fun testTableDoesNotExist() {
+ val url = checkNotNull(BitbrainsExTraceFormatTest::class.java.getResource("/vm.txt"))
+ val trace = format.open(url)
+
+ assertFalse(trace.containsTable("test"))
+ assertNull(trace.getTable("test"))
+ }
+
+ @Test
+ fun testSmoke() {
+ val url = checkNotNull(BitbrainsExTraceFormatTest::class.java.getResource("/vm.txt"))
+ val trace = format.open(url)
+
+ val reader = trace.getTable(TABLE_RESOURCE_STATES)!!.newReader()
+
+ assertAll(
+ { assertTrue(reader.nextRow()) },
+ { assertEquals(1631911500, reader.get(RESOURCE_STATE_TIMESTAMP).epochSecond) },
+ { assertEquals(21.2, reader.getDouble(RESOURCE_STATE_CPU_USAGE), 0.01) }
+ )
+
+ reader.close()
+ }
+}
diff --git a/opendc-trace/opendc-trace-bitbrains/src/test/resources/vm.txt b/opendc-trace/opendc-trace-bitbrains/src/test/resources/vm.txt
new file mode 100644
index 00000000..28bebb0c
--- /dev/null
+++ b/opendc-trace/opendc-trace-bitbrains/src/test/resources/vm.txt
@@ -0,0 +1,2 @@
+1631911500 21.2 22.10 0.0 0.0 0.67 1.2 0.0 0.0 5 1 abc 1 0.01 1 10 0.0 0.0 2699 vm 4096
+1631911800 30.4 31.80 0.0 0.0 0.56 1.3 0.0 0.0 5 1 abc 1 0.02 1 10 0.0 0.0 2699 vm 4096
diff --git a/opendc-trace/opendc-trace-opendc/build.gradle.kts b/opendc-trace/opendc-trace-opendc/build.gradle.kts
new file mode 100644
index 00000000..b9c242a1
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/build.gradle.kts
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+description = "Support for OpenDC-specific trace formats"
+
+/* Build configuration */
+plugins {
+ `kotlin-library-conventions`
+ `testing-conventions`
+ `jacoco-conventions`
+}
+
+dependencies {
+ api(platform(projects.opendcPlatform))
+ api(projects.opendcTrace.opendcTraceApi)
+
+ implementation(projects.opendcTrace.opendcTraceParquet)
+
+ testRuntimeOnly(libs.slf4j.simple)
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt
index f051bf88..bee4ba7e 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.trace.bp
+package org.opendc.trace.opendc
import org.apache.avro.generic.GenericRecord
import org.opendc.trace.*
@@ -28,9 +28,9 @@ import org.opendc.trace.util.parquet.LocalParquetReader
import java.nio.file.Path
/**
- * The resource state [Table] in the Bitbrains Parquet format.
+ * The resource state [Table] in the OpenDC virtual machine trace format.
*/
-internal class BPResourceStateTable(private val path: Path) : Table {
+internal class OdcVmResourceStateTable(private val path: Path) : Table {
override val name: String = TABLE_RESOURCE_STATES
override val isSynthetic: Boolean = false
@@ -38,13 +38,13 @@ internal class BPResourceStateTable(private val path: Path) : Table {
RESOURCE_STATE_ID,
RESOURCE_STATE_TIMESTAMP,
RESOURCE_STATE_DURATION,
- RESOURCE_STATE_NCPUS,
+ RESOURCE_STATE_CPU_COUNT,
RESOURCE_STATE_CPU_USAGE,
)
override fun newReader(): TableReader {
val reader = LocalParquetReader<GenericRecord>(path.resolve("trace.parquet"))
- return BPResourceStateTableReader(reader)
+ return OdcVmResourceStateTableReader(reader)
}
override fun newReader(partition: String): TableReader {
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
index 0e7ee555..df3bcfa6 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTableReader.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
@@ -20,8 +20,9 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.trace.bp
+package org.opendc.trace.opendc
+import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.opendc.trace.*
import org.opendc.trace.util.parquet.LocalParquetReader
@@ -29,16 +30,28 @@ import java.time.Duration
import java.time.Instant
/**
- * A [TableReader] implementation for the Bitbrains Parquet format.
+ * A [TableReader] implementation for the OpenDC virtual machine trace format.
*/
-internal class BPResourceStateTableReader(private val reader: LocalParquetReader<GenericRecord>) : TableReader {
+internal class OdcVmResourceStateTableReader(private val reader: LocalParquetReader<GenericRecord>) : TableReader {
/**
* The current record.
*/
private var record: GenericRecord? = null
+ /**
+ * A flag to indicate that the columns have been initialized.
+ */
+ private var hasInitializedColumns = false
+
override fun nextRow(): Boolean {
- record = reader.read()
+ val record = reader.read()
+ this.record = record
+
+ if (!hasInitializedColumns && record != null) {
+ initColumns(record.schema)
+ hasInitializedColumns = true
+ }
+
return record != null
}
@@ -47,7 +60,7 @@ internal class BPResourceStateTableReader(private val reader: LocalParquetReader
RESOURCE_STATE_ID -> true
RESOURCE_STATE_TIMESTAMP -> true
RESOURCE_STATE_DURATION -> true
- RESOURCE_STATE_NCPUS -> true
+ RESOURCE_STATE_CPU_COUNT -> true
RESOURCE_STATE_CPU_USAGE -> true
else -> false
}
@@ -58,11 +71,11 @@ internal class BPResourceStateTableReader(private val reader: LocalParquetReader
@Suppress("UNCHECKED_CAST")
val res: Any = when (column) {
- RESOURCE_STATE_ID -> record["id"].toString()
- RESOURCE_STATE_TIMESTAMP -> Instant.ofEpochMilli(record["time"] as Long)
- RESOURCE_STATE_DURATION -> Duration.ofMillis(record["duration"] as Long)
- RESOURCE_STATE_NCPUS -> record["cores"]
- RESOURCE_STATE_CPU_USAGE -> (record["cpuUsage"] as Number).toDouble()
+ RESOURCE_STATE_ID -> record[COL_ID].toString()
+ RESOURCE_STATE_TIMESTAMP -> Instant.ofEpochMilli(record[COL_TIMESTAMP] as Long)
+ RESOURCE_STATE_DURATION -> Duration.ofMillis(record[COL_DURATION] as Long)
+ RESOURCE_STATE_CPU_COUNT -> getInt(RESOURCE_STATE_CPU_COUNT)
+ RESOURCE_STATE_CPU_USAGE -> getDouble(RESOURCE_STATE_CPU_USAGE)
else -> throw IllegalArgumentException("Invalid column")
}
@@ -76,9 +89,8 @@ internal class BPResourceStateTableReader(private val reader: LocalParquetReader
override fun getInt(column: TableColumn<Int>): Int {
val record = checkNotNull(record) { "Reader in invalid state" }
-
return when (column) {
- RESOURCE_STATE_NCPUS -> record["cores"] as Int
+ RESOURCE_STATE_CPU_COUNT -> record[COL_CPU_COUNT] as Int
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -90,7 +102,7 @@ internal class BPResourceStateTableReader(private val reader: LocalParquetReader
override fun getDouble(column: TableColumn<Double>): Double {
val record = checkNotNull(record) { "Reader in invalid state" }
return when (column) {
- RESOURCE_STATE_CPU_USAGE -> (record["cpuUsage"] as Number).toDouble()
+ RESOURCE_STATE_CPU_USAGE -> (record[COL_CPU_USAGE] as Number).toDouble()
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -99,5 +111,27 @@ internal class BPResourceStateTableReader(private val reader: LocalParquetReader
reader.close()
}
- override fun toString(): String = "BPResourceStateTableReader"
+ override fun toString(): String = "OdcVmResourceStateTableReader"
+
+ /**
+ * Initialize the columns for the reader based on [schema].
+ */
+ private fun initColumns(schema: Schema) {
+ try {
+ COL_ID = schema.getField("id").pos()
+ COL_TIMESTAMP = (schema.getField("timestamp") ?: schema.getField("time")).pos()
+ COL_DURATION = schema.getField("duration").pos()
+ COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("cores")).pos()
+ COL_CPU_USAGE = (schema.getField("cpu_usage") ?: schema.getField("cpuUsage")).pos()
+ } catch (e: NullPointerException) {
+ // This happens when the field we are trying to access does not exist
+ throw IllegalArgumentException("Invalid schema", e)
+ }
+ }
+
+ private var COL_ID = -1
+ private var COL_TIMESTAMP = -1
+ private var COL_DURATION = -1
+ private var COL_CPU_COUNT = -1
+ private var COL_CPU_USAGE = -1
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt
index 5b0f013f..b1456560 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.trace.bp
+package org.opendc.trace.opendc
import org.apache.avro.generic.GenericRecord
import org.opendc.trace.*
@@ -28,9 +28,9 @@ import org.opendc.trace.util.parquet.LocalParquetReader
import java.nio.file.Path
/**
- * The resource [Table] in the Bitbrains Parquet format.
+ * The resource [Table] for the OpenDC virtual machine trace format.
*/
-internal class BPResourceTable(private val path: Path) : Table {
+internal class OdcVmResourceTable(private val path: Path) : Table {
override val name: String = TABLE_RESOURCES
override val isSynthetic: Boolean = false
@@ -38,13 +38,13 @@ internal class BPResourceTable(private val path: Path) : Table {
RESOURCE_ID,
RESOURCE_START_TIME,
RESOURCE_STOP_TIME,
- RESOURCE_NCPUS,
- RESOURCE_MEM_CAPACITY
+ RESOURCE_CPU_COUNT,
+ RESOURCE_MEM_CAPACITY,
)
override fun newReader(): TableReader {
val reader = LocalParquetReader<GenericRecord>(path.resolve("meta.parquet"))
- return BPResourceTableReader(reader)
+ return OdcVmResourceTableReader(reader)
}
override fun newReader(partition: String): TableReader {
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
index 4416aae8..c52da62d 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
@@ -20,24 +20,37 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.trace.bp
+package org.opendc.trace.opendc
+import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.opendc.trace.*
import org.opendc.trace.util.parquet.LocalParquetReader
import java.time.Instant
/**
- * A [TableReader] implementation for the Bitbrains Parquet format.
+ * A [TableReader] implementation for the resources table in the OpenDC virtual machine trace format.
*/
-internal class BPResourceTableReader(private val reader: LocalParquetReader<GenericRecord>) : TableReader {
+internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<GenericRecord>) : TableReader {
/**
* The current record.
*/
private var record: GenericRecord? = null
+ /**
+ * A flag to indicate that the columns have been initialized.
+ */
+ private var hasInitializedColumns = false
+
override fun nextRow(): Boolean {
- record = reader.read()
+ val record = reader.read()
+ this.record = record
+
+ if (!hasInitializedColumns && record != null) {
+ initColumns(record.schema)
+ hasInitializedColumns = true
+ }
+
return record != null
}
@@ -46,7 +59,7 @@ internal class BPResourceTableReader(private val reader: LocalParquetReader<Gene
RESOURCE_ID -> true
RESOURCE_START_TIME -> true
RESOURCE_STOP_TIME -> true
- RESOURCE_NCPUS -> true
+ RESOURCE_CPU_COUNT -> true
RESOURCE_MEM_CAPACITY -> true
else -> false
}
@@ -57,10 +70,10 @@ internal class BPResourceTableReader(private val reader: LocalParquetReader<Gene
@Suppress("UNCHECKED_CAST")
val res: Any = when (column) {
- RESOURCE_ID -> record["id"].toString()
- RESOURCE_START_TIME -> Instant.ofEpochMilli(record["submissionTime"] as Long)
- RESOURCE_STOP_TIME -> Instant.ofEpochMilli(record["endTime"] as Long)
- RESOURCE_NCPUS -> getInt(RESOURCE_NCPUS)
+ RESOURCE_ID -> record[COL_ID].toString()
+ RESOURCE_START_TIME -> Instant.ofEpochMilli(record[COL_START_TIME] as Long)
+ RESOURCE_STOP_TIME -> Instant.ofEpochMilli(record[COL_STOP_TIME] as Long)
+ RESOURCE_CPU_COUNT -> getInt(RESOURCE_CPU_COUNT)
RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_MEM_CAPACITY)
else -> throw IllegalArgumentException("Invalid column")
}
@@ -77,7 +90,7 @@ internal class BPResourceTableReader(private val reader: LocalParquetReader<Gene
val record = checkNotNull(record) { "Reader in invalid state" }
return when (column) {
- RESOURCE_NCPUS -> record["maxCores"] as Int
+ RESOURCE_CPU_COUNT -> record[COL_CPU_COUNT] as Int
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -90,7 +103,7 @@ internal class BPResourceTableReader(private val reader: LocalParquetReader<Gene
val record = checkNotNull(record) { "Reader in invalid state" }
return when (column) {
- RESOURCE_MEM_CAPACITY -> (record["requiredMemory"] as Number).toDouble() * 1000.0 // MB to KB
+ RESOURCE_MEM_CAPACITY -> (record[COL_MEM_CAPACITY] as Number).toDouble()
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -99,5 +112,27 @@ internal class BPResourceTableReader(private val reader: LocalParquetReader<Gene
reader.close()
}
- override fun toString(): String = "BPResourceTableReader"
+ override fun toString(): String = "OdcVmResourceTableReader"
+
+ /**
+ * Initialize the columns for the reader based on [schema].
+ */
+ private fun initColumns(schema: Schema) {
+ try {
+ COL_ID = schema.getField("id").pos()
+ COL_START_TIME = (schema.getField("start_time") ?: schema.getField("submissionTime")).pos()
+ COL_STOP_TIME = (schema.getField("stop_time") ?: schema.getField("endTime")).pos()
+ COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("maxCores")).pos()
+ COL_MEM_CAPACITY = (schema.getField("mem_capacity") ?: schema.getField("requiredMemory")).pos()
+ } catch (e: NullPointerException) {
+ // This happens when the field we are trying to access does not exist
+ throw IllegalArgumentException("Invalid schema")
+ }
+ }
+
+ private var COL_ID = -1
+ private var COL_START_TIME = -1
+ private var COL_STOP_TIME = -1
+ private var COL_CPU_COUNT = -1
+ private var COL_MEM_CAPACITY = -1
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTrace.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTrace.kt
index 486587b1..3e5029b4 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTrace.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTrace.kt
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.trace.bp
+package org.opendc.trace.opendc
import org.opendc.trace.TABLE_RESOURCES
import org.opendc.trace.TABLE_RESOURCE_STATES
@@ -29,9 +29,9 @@ import org.opendc.trace.Trace
import java.nio.file.Path
/**
- * A [Trace] in the Bitbrains Parquet format.
+ * A [Trace] in the OpenDC virtual machine trace format.
*/
-public class BPTrace internal constructor(private val path: Path) : Trace {
+public class OdcVmTrace internal constructor(private val path: Path) : Trace {
override val tables: List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES)
override fun containsTable(name: String): Boolean =
@@ -39,11 +39,11 @@ public class BPTrace internal constructor(private val path: Path) : Trace {
override fun getTable(name: String): Table? {
return when (name) {
- TABLE_RESOURCES -> BPResourceTable(path)
- TABLE_RESOURCE_STATES -> BPResourceStateTable(path)
+ TABLE_RESOURCES -> OdcVmResourceTable(path)
+ TABLE_RESOURCE_STATES -> OdcVmResourceStateTable(path)
else -> null
}
}
- override fun toString(): String = "BPTrace[$path]"
+ override fun toString(): String = "OdcVmTrace[$path]"
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
new file mode 100644
index 00000000..8edba725
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
@@ -0,0 +1,82 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.opendc
+
+import org.apache.avro.Schema
+import org.apache.avro.SchemaBuilder
+import org.opendc.trace.spi.TraceFormat
+import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA
+import java.net.URL
+import java.nio.file.Paths
+import kotlin.io.path.exists
+
+/**
+ * A [TraceFormat] implementation of the OpenDC virtual machine trace format.
+ */
+public class OdcVmTraceFormat : TraceFormat {
+ /**
+ * The name of this trace format.
+ */
+ override val name: String = "opendc-vm"
+
+ /**
+ * Open a Bitbrains Parquet trace.
+ */
+ override fun open(url: URL): OdcVmTrace {
+ val path = Paths.get(url.toURI())
+ require(path.exists()) { "URL $url does not exist" }
+ return OdcVmTrace(path)
+ }
+
+ public companion object {
+ /**
+ * Schema for the resources table in the trace.
+ */
+ @JvmStatic
+ public val RESOURCES_SCHEMA: Schema = SchemaBuilder
+ .record("resource")
+ .namespace("org.opendc.trace.opendc")
+ .fields()
+ .requiredString("id")
+ .name("start_time").type(TIMESTAMP_SCHEMA).noDefault()
+ .name("stop_time").type(TIMESTAMP_SCHEMA).noDefault()
+ .requiredInt("cpu_count")
+ .requiredLong("mem_capacity")
+ .endRecord()
+
+ /**
+ * Schema for the resource states table in the trace.
+ */
+ @JvmStatic
+ public val RESOURCE_STATES_SCHEMA: Schema = SchemaBuilder
+ .record("resource_state")
+ .namespace("org.opendc.trace.opendc")
+ .fields()
+ .requiredString("id")
+ .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault()
+ .requiredLong("duration")
+ .requiredInt("cpu_count")
+ .requiredDouble("cpu_usage")
+ .endRecord()
+ }
+}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat b/opendc-trace/opendc-trace-opendc/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat
new file mode 100644
index 00000000..94094af4
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat
@@ -0,0 +1 @@
+org.opendc.trace.opendc.OdcVmTraceFormat
diff --git a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt
new file mode 100644
index 00000000..42eb369e
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt
@@ -0,0 +1,121 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.opendc
+
+import org.junit.jupiter.api.Assertions.*
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertDoesNotThrow
+import org.junit.jupiter.api.assertThrows
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
+import org.opendc.trace.*
+import java.io.File
+import java.net.URL
+
+/**
+ * Test suite for the [OdcVmTraceFormat] implementation.
+ */
+internal class OdcVmTraceFormatTest {
+ private val format = OdcVmTraceFormat()
+
+ @Test
+ fun testTraceExists() {
+ val url = File("src/test/resources/trace-v2.1").toURI().toURL()
+ assertDoesNotThrow { format.open(url) }
+ }
+
+ @Test
+ fun testTraceDoesNotExists() {
+ val url = File("src/test/resources/trace-v2.1").toURI().toURL()
+ assertThrows<IllegalArgumentException> {
+ format.open(URL(url.toString() + "help"))
+ }
+ }
+
+ @Test
+ fun testTables() {
+ val url = File("src/test/resources/trace-v2.1").toURI().toURL()
+ val trace = format.open(url)
+
+ assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), trace.tables)
+ }
+
+ @Test
+ fun testTableExists() {
+ val url = File("src/test/resources/trace-v2.1").toURI().toURL()
+ val table = format.open(url).getTable(TABLE_RESOURCE_STATES)
+
+ assertNotNull(table)
+ assertDoesNotThrow { table!!.newReader() }
+ }
+
+ @Test
+ fun testTableDoesNotExist() {
+ val url = File("src/test/resources/trace-v2.1").toURI().toURL()
+ val trace = format.open(url)
+
+ assertFalse(trace.containsTable("test"))
+ assertNull(trace.getTable("test"))
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = ["trace-v2.0", "trace-v2.1"])
+ fun testResources(name: String) {
+ val url = File("src/test/resources/$name").toURI().toURL()
+ val trace = format.open(url)
+
+ val reader = trace.getTable(TABLE_RESOURCES)!!.newReader()
+
+ assertAll(
+ { assertTrue(reader.nextRow()) },
+ { assertEquals("1019", reader.get(RESOURCE_ID)) },
+ { assertTrue(reader.nextRow()) },
+ { assertEquals("1023", reader.get(RESOURCE_ID)) },
+ { assertTrue(reader.nextRow()) },
+ { assertEquals("1052", reader.get(RESOURCE_ID)) },
+ { assertTrue(reader.nextRow()) },
+ { assertEquals("1073", reader.get(RESOURCE_ID)) },
+ { assertFalse(reader.nextRow()) }
+ )
+
+ reader.close()
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = ["trace-v2.0", "trace-v2.1"])
+ fun testSmoke(name: String) {
+ val url = File("src/test/resources/$name").toURI().toURL()
+ val trace = format.open(url)
+
+ val reader = trace.getTable(TABLE_RESOURCE_STATES)!!.newReader()
+
+ assertAll(
+ { assertTrue(reader.nextRow()) },
+ { assertEquals("1019", reader.get(RESOURCE_STATE_ID)) },
+ { assertEquals(1376314846, reader.get(RESOURCE_STATE_TIMESTAMP).epochSecond) },
+ { assertEquals(0.0, reader.getDouble(RESOURCE_STATE_CPU_USAGE), 0.01) }
+ )
+
+ reader.close()
+ }
+}
diff --git a/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.0/meta.parquet b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.0/meta.parquet
new file mode 100644
index 00000000..d6ff09d8
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.0/meta.parquet
Binary files differ
diff --git a/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.0/trace.parquet b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.0/trace.parquet
new file mode 100644
index 00000000..5b6fa6b7
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.0/trace.parquet
Binary files differ
diff --git a/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/meta.parquet b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/meta.parquet
new file mode 100644
index 00000000..d8184945
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/meta.parquet
Binary files differ
diff --git a/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/trace.parquet b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/trace.parquet
new file mode 100644
index 00000000..00ab5835
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/trace.parquet
Binary files differ
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/AvroUtils.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt
index a4676f31..086b900b 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/AvroUtils.kt
+++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt
@@ -21,7 +21,7 @@
*/
@file:JvmName("AvroUtils")
-package org.opendc.experiments.capelin.export.parquet
+package org.opendc.trace.util.parquet
import org.apache.avro.LogicalTypes
import org.apache.avro.Schema
@@ -29,16 +29,16 @@ import org.apache.avro.Schema
/**
* Schema for UUID type.
*/
-internal val UUID_SCHEMA = LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING))
+public val UUID_SCHEMA: Schema = LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING))
/**
* Schema for timestamp type.
*/
-internal val TIMESTAMP_SCHEMA = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG))
+public val TIMESTAMP_SCHEMA: Schema = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG))
/**
* Helper function to make a [Schema] field optional.
*/
-internal fun Schema.optional(): Schema {
+public fun Schema.optional(): Schema {
return Schema.createUnion(Schema.create(Schema.Type.NULL), this)
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTraceFormat.kt b/opendc-trace/opendc-trace-tools/build.gradle.kts
index 49d5b4c5..35190dba 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTraceFormat.kt
+++ b/opendc-trace/opendc-trace-tools/build.gradle.kts
@@ -20,28 +20,28 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.trace.bp
+description = "Tools for working with workload traces"
-import org.opendc.trace.spi.TraceFormat
-import java.net.URL
-import java.nio.file.Paths
-import kotlin.io.path.exists
+/* Build configuration */
+plugins {
+ `kotlin-conventions`
+ application
+}
-/**
- * A format implementation for the GWF trace format.
- */
-public class BPTraceFormat : TraceFormat {
- /**
- * The name of this trace format.
- */
- override val name: String = "bitbrains-parquet"
+application {
+ mainClass.set("org.opendc.trace.tools.TraceConverterKt")
+}
+
+dependencies {
+ api(platform(projects.opendcPlatform))
+
+ implementation(projects.opendcTrace.opendcTraceParquet)
+ implementation(projects.opendcTrace.opendcTraceOpendc)
+ implementation(projects.opendcTrace.opendcTraceAzure)
+ implementation(projects.opendcTrace.opendcTraceBitbrains)
+
+ implementation(libs.kotlin.logging)
+ implementation(libs.clikt)
- /**
- * Open a Bitbrains Parquet trace.
- */
- override fun open(url: URL): BPTrace {
- val path = Paths.get(url.toURI())
- require(path.exists()) { "URL $url does not exist" }
- return BPTrace(path)
- }
+ runtimeOnly(libs.log4j.slf4j)
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt
index 1f3878eb..322464cd 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt
+++ b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.trace
+package org.opendc.trace.tools
import com.github.ajalt.clikt.core.CliktCommand
import com.github.ajalt.clikt.parameters.arguments.argument
@@ -34,15 +34,15 @@ import org.apache.avro.generic.GenericRecordBuilder
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
-import org.opendc.experiments.capelin.trace.azure.AzureTraceFormat
-import org.opendc.experiments.capelin.trace.bp.BP_RESOURCES_SCHEMA
-import org.opendc.experiments.capelin.trace.bp.BP_RESOURCE_STATES_SCHEMA
-import org.opendc.experiments.capelin.trace.sv.SvTraceFormat
import org.opendc.trace.*
+import org.opendc.trace.azure.AzureTraceFormat
+import org.opendc.trace.bitbrains.BitbrainsExTraceFormat
import org.opendc.trace.bitbrains.BitbrainsTraceFormat
+import org.opendc.trace.opendc.OdcVmTraceFormat
import org.opendc.trace.util.parquet.LocalOutputFile
import java.io.File
import java.util.*
+import kotlin.math.abs
import kotlin.math.max
import kotlin.math.min
import kotlin.math.roundToLong
@@ -50,12 +50,12 @@ import kotlin.math.roundToLong
/**
* A script to convert a trace in text format into a Parquet trace.
*/
-fun main(args: Array<String>): Unit = TraceConverterCli().main(args)
+public fun main(args: Array<String>): Unit = TraceConverterCli().main(args)
/**
* Represents the command for converting traces
*/
-class TraceConverterCli : CliktCommand(name = "trace-converter") {
+internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
/**
* The logger instance for the converter.
*/
@@ -79,7 +79,7 @@ class TraceConverterCli : CliktCommand(name = "trace-converter") {
*/
private val format by option("-f", "--format", help = "input format of trace")
.choice(
- "solvinity" to SvTraceFormat(),
+ "solvinity" to BitbrainsExTraceFormat(),
"bitbrains" to BitbrainsTraceFormat(),
"azure" to AzureTraceFormat()
)
@@ -106,23 +106,28 @@ class TraceConverterCli : CliktCommand(name = "trace-converter") {
logger.info { "Building resources table" }
val metaWriter = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(metaParquet))
- .withSchema(BP_RESOURCES_SCHEMA)
+ .withSchema(OdcVmTraceFormat.RESOURCES_SCHEMA)
.withCompressionCodec(CompressionCodecName.ZSTD)
.enablePageWriteChecksum()
.build()
val selectedVms = metaWriter.use { convertResources(trace, it) }
+ if (selectedVms.isEmpty()) {
+ logger.warn { "No VMs selected" }
+ return
+ }
+
logger.info { "Wrote ${selectedVms.size} rows" }
logger.info { "Building resource states table" }
val writer = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(traceParquet))
- .withSchema(BP_RESOURCE_STATES_SCHEMA)
+ .withSchema(OdcVmTraceFormat.RESOURCE_STATES_SCHEMA)
.withCompressionCodec(CompressionCodecName.ZSTD)
- .enableDictionaryEncoding()
- .enablePageWriteChecksum()
+ .withDictionaryEncoding("id", true)
.withBloomFilterEnabled("id", true)
.withBloomFilterNDV("id", selectedVms.size.toLong())
+ .enableValidation()
.build()
val statesCount = writer.use { convertResourceStates(trace, it, selectedVms) }
@@ -155,7 +160,7 @@ class TraceConverterCli : CliktCommand(name = "trace-converter") {
startTime = min(startTime, timestamp)
stopTime = max(stopTime, timestamp)
- numCpus = max(numCpus, reader.getInt(RESOURCE_STATE_NCPUS))
+ numCpus = max(numCpus, reader.getInt(RESOURCE_STATE_CPU_COUNT))
memCapacity = max(memCapacity, reader.getDouble(RESOURCE_STATE_MEM_CAPACITY))
if (reader.hasColumn(RESOURCE_STATE_MEM_USAGE)) {
@@ -170,13 +175,13 @@ class TraceConverterCli : CliktCommand(name = "trace-converter") {
continue
}
- val builder = GenericRecordBuilder(BP_RESOURCES_SCHEMA)
+ val builder = GenericRecordBuilder(OdcVmTraceFormat.RESOURCES_SCHEMA)
builder["id"] = id
- builder["submissionTime"] = startTime
- builder["endTime"] = stopTime
- builder["maxCores"] = numCpus
- builder["requiredMemory"] = max(memCapacity, memUsage).roundToLong()
+ builder["start_time"] = startTime
+ builder["stop_time"] = stopTime
+ builder["cpu_count"] = numCpus
+ builder["mem_capacity"] = max(memCapacity, memUsage).roundToLong()
logger.info { "Selecting VM $id" }
@@ -195,44 +200,58 @@ class TraceConverterCli : CliktCommand(name = "trace-converter") {
var hasNextRow = reader.nextRow()
var count = 0
+ var lastId: String? = null
+ var lastTimestamp = 0L
while (hasNextRow) {
- var lastTimestamp = Long.MIN_VALUE
+ val id = reader.get(RESOURCE_STATE_ID)
- do {
- val id = reader.get(RESOURCE_STATE_ID)
+ if (id !in selectedVms) {
+ hasNextRow = reader.nextRow()
+ continue
+ }
- if (id !in selectedVms) {
- hasNextRow = reader.nextRow()
- continue
- }
+ val cpuCount = reader.getInt(RESOURCE_STATE_CPU_COUNT)
+ val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE)
- val builder = GenericRecordBuilder(BP_RESOURCE_STATES_SCHEMA)
- builder["id"] = id
+ val startTimestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli()
+ var timestamp = startTimestamp
+ var duration: Long
- val timestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli()
- if (lastTimestamp < 0) {
- lastTimestamp = timestamp - 5 * 60 * 1000L
+ // Check whether the previous entry is from a different VM
+ if (id != lastId) {
+ lastTimestamp = timestamp - 5 * 60 * 1000L
+ }
+
+ do {
+ timestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli()
+
+ duration = timestamp - lastTimestamp
+ hasNextRow = reader.nextRow()
+
+ if (!hasNextRow) {
+ break
}
- val duration = timestamp - lastTimestamp
- val cores = reader.getInt(RESOURCE_STATE_NCPUS)
- val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE)
- val flops = (cpuUsage * duration / 1000.0).roundToLong()
+ val shouldContinue = id == reader.get(RESOURCE_STATE_ID) &&
+ abs(cpuUsage - reader.getDouble(RESOURCE_STATE_CPU_USAGE)) < 0.01 &&
+ cpuCount == reader.getInt(RESOURCE_STATE_CPU_COUNT)
+ } while (shouldContinue)
- builder["time"] = timestamp
- builder["duration"] = duration
- builder["cores"] = cores
- builder["cpuUsage"] = cpuUsage
- builder["flops"] = flops
+ val builder = GenericRecordBuilder(OdcVmTraceFormat.RESOURCE_STATES_SCHEMA)
- writer.write(builder.build())
+ builder["id"] = id
+ builder["timestamp"] = startTimestamp
+ builder["duration"] = duration
+ builder["cpu_count"] = cpuCount
+ builder["cpu_usage"] = cpuUsage
- lastTimestamp = timestamp
- hasNextRow = reader.nextRow()
- } while (hasNextRow && id == reader.get(RESOURCE_STATE_ID))
+ writer.write(builder.build())
count++
+
+ lastId = id
+ lastTimestamp = timestamp
}
return count
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
index 483558e1..1b518fee 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
@@ -28,14 +28,12 @@ import com.github.ajalt.clikt.parameters.types.file
import com.github.ajalt.clikt.parameters.types.long
import kotlinx.coroutines.*
import mu.KotlinLogging
-import org.opendc.experiments.capelin.*
-import org.opendc.experiments.capelin.env.EnvironmentReader
-import org.opendc.experiments.capelin.env.MachineDef
+import org.opendc.compute.workload.*
+import org.opendc.compute.workload.topology.HostSpec
+import org.opendc.compute.workload.topology.Topology
+import org.opendc.compute.workload.topology.apply
+import org.opendc.compute.workload.util.PerformanceInterferenceReader
import org.opendc.experiments.capelin.model.Workload
-import org.opendc.experiments.capelin.trace.ParquetTraceReader
-import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader
-import org.opendc.experiments.capelin.trace.RawParquetTraceReader
-import org.opendc.experiments.capelin.util.ComputeServiceSimulator
import org.opendc.experiments.capelin.util.createComputeScheduler
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.compute.model.MachineModel
@@ -43,6 +41,7 @@ import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.LinearPowerModel
+import org.opendc.simulator.compute.power.SimplePowerDriver
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.telemetry.compute.ComputeMetricExporter
import org.opendc.telemetry.compute.collectServiceMetrics
@@ -50,12 +49,12 @@ import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import org.opendc.web.client.ApiClient
import org.opendc.web.client.AuthConfiguration
import org.opendc.web.client.model.Scenario
-import org.opendc.web.client.model.Topology
import java.io.File
import java.net.URI
import java.time.Duration
import java.util.*
import org.opendc.web.client.model.Portfolio as ClientPortfolio
+import org.opendc.web.client.model.Topology as ClientTopology
private val logger = KotlinLogging.logger {}
@@ -129,18 +128,14 @@ class RunnerCli : CliktCommand(name = "runner") {
/**
* Run a single scenario.
*/
- private suspend fun runScenario(portfolio: ClientPortfolio, scenario: Scenario, environment: EnvironmentReader): List<WebComputeMonitor.Result> {
+ private suspend fun runScenario(portfolio: ClientPortfolio, scenario: Scenario, topology: Topology): List<WebComputeMonitor.Result> {
val id = scenario.id
logger.info { "Constructing performance interference model" }
- val traceDir = File(
- tracePath,
- scenario.trace.traceId
- )
- val traceReader = RawParquetTraceReader(traceDir)
+ val workloadLoader = ComputeWorkloadLoader(tracePath)
val interferenceGroups = let {
- val path = File(traceDir, "performance-interference-model.json")
+ val path = tracePath.resolve(scenario.trace.traceId).resolve("performance-interference-model.json")
val operational = scenario.operationalPhenomena
val enabled = operational.performanceInterferenceEnabled
@@ -156,7 +151,7 @@ class RunnerCli : CliktCommand(name = "runner") {
logger.info { "Starting repeat $repeat" }
withTimeout(runTimeout * 1000) {
val interferenceModel = interferenceGroups?.let { VmInterferenceModel(it, Random(repeat.toLong())) }
- runRepeat(scenario, repeat, environment, traceReader, interferenceModel)
+ runRepeat(scenario, repeat, topology, workloadLoader, interferenceModel)
}
}
@@ -171,8 +166,8 @@ class RunnerCli : CliktCommand(name = "runner") {
private suspend fun runRepeat(
scenario: Scenario,
repeat: Int,
- environment: EnvironmentReader,
- traceReader: RawParquetTraceReader,
+ topology: Topology,
+ workloadLoader: ComputeWorkloadLoader,
interferenceModel: VmInterferenceModel?
): WebComputeMonitor.Result {
val monitor = WebComputeMonitor()
@@ -186,23 +181,18 @@ class RunnerCli : CliktCommand(name = "runner") {
val operational = scenario.operationalPhenomena
val computeScheduler = createComputeScheduler(operational.schedulerName, seeder)
+ val workload = Workload(workloadName, trace(workloadName).sampleByLoad(workloadFraction))
- val trace = ParquetTraceReader(
- listOf(traceReader),
- Workload(workloadName, workloadFraction),
- repeat
- )
val failureModel =
if (operational.failuresEnabled)
- grid5000(Duration.ofDays(7), repeat)
+ grid5000(Duration.ofDays(7))
else
null
- val simulator = ComputeServiceSimulator(
+ val simulator = ComputeWorkloadRunner(
coroutineContext,
clock,
computeScheduler,
- environment.read(),
failureModel,
interferenceModel.takeIf { operational.performanceInterferenceEnabled }
)
@@ -210,7 +200,10 @@ class RunnerCli : CliktCommand(name = "runner") {
val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor), exportInterval = Duration.ofHours(1))
try {
- simulator.run(trace)
+ // Instantiate the topology onto the simulator
+ simulator.apply(topology)
+ // Run workload trace
+ simulator.run(workload.source.resolve(workloadLoader, seeder), seeder.nextLong())
} finally {
simulator.close()
metricReader.close()
@@ -292,56 +285,62 @@ class RunnerCli : CliktCommand(name = "runner") {
}
/**
- * Convert the specified [topology] into an [EnvironmentReader] understood by Capelin.
+ * Convert the specified [topology] into an [Topology] understood by OpenDC.
*/
- private fun convert(topology: Topology): EnvironmentReader {
- val nodes = mutableListOf<MachineDef>()
- val random = Random(0)
-
- val machines = topology.rooms.asSequence()
- .flatMap { room ->
- room.tiles.flatMap { tile ->
- tile.rack?.machines?.map { machine -> tile.rack to machine } ?: emptyList()
- }
- }
- for ((rack, machine) in machines) {
- val clusterId = rack.id
- val position = machine.position
-
- val processors = machine.cpus.flatMap { cpu ->
- val cores = cpu.numberOfCores
- val speed = cpu.clockRateMhz
- // TODO Remove hard coding of vendor
- val node = ProcessingNode("Intel", "amd64", cpu.name, cores)
- List(cores) { coreId ->
- ProcessingUnit(node, coreId, speed)
- }
- }
- val memoryUnits = machine.memory.map { memory ->
- MemoryUnit(
- "Samsung",
- memory.name,
- memory.speedMbPerS,
- memory.sizeMb.toLong()
- )
- }
+ private fun convert(topology: ClientTopology): Topology {
+ return object : Topology {
+
+ override fun resolve(): List<HostSpec> {
+ val res = mutableListOf<HostSpec>()
+ val random = Random(0)
+
+ val machines = topology.rooms.asSequence()
+ .flatMap { room ->
+ room.tiles.flatMap { tile ->
+ tile.rack?.machines?.map { machine -> tile.rack to machine } ?: emptyList()
+ }
+ }
+ for ((rack, machine) in machines) {
+ val clusterId = rack.id
+ val position = machine.position
+
+ val processors = machine.cpus.flatMap { cpu ->
+ val cores = cpu.numberOfCores
+ val speed = cpu.clockRateMhz
+ // TODO Remove hard coding of vendor
+ val node = ProcessingNode("Intel", "amd64", cpu.name, cores)
+ List(cores) { coreId ->
+ ProcessingUnit(node, coreId, speed)
+ }
+ }
+ val memoryUnits = machine.memory.map { memory ->
+ MemoryUnit(
+ "Samsung",
+ memory.name,
+ memory.speedMbPerS,
+ memory.sizeMb.toLong()
+ )
+ }
- val energyConsumptionW = machine.cpus.sumOf { it.energyConsumptionW }
+ val energyConsumptionW = machine.cpus.sumOf { it.energyConsumptionW }
+ val powerModel = LinearPowerModel(2 * energyConsumptionW, energyConsumptionW * 0.5)
+ val powerDriver = SimplePowerDriver(powerModel)
- nodes.add(
- MachineDef(
- UUID(random.nextLong(), random.nextLong()),
- "node-$clusterId-$position",
- mapOf("cluster" to clusterId),
- MachineModel(processors, memoryUnits),
- LinearPowerModel(2 * energyConsumptionW, energyConsumptionW * 0.5)
- )
- )
- }
+ val spec = HostSpec(
+ UUID(random.nextLong(), random.nextLong()),
+ "node-$clusterId-$position",
+ mapOf("cluster" to clusterId),
+ MachineModel(processors, memoryUnits),
+ powerDriver
+ )
+
+ res += spec
+ }
+
+ return res
+ }
- return object : EnvironmentReader {
- override fun read(): List<MachineDef> = nodes
- override fun close() {}
+ override fun toString(): String = "WebRunnerTopologyFactory"
}
}
}
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 933febe0..587f1cb2 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -25,6 +25,7 @@ include(":opendc-platform")
include(":opendc-compute:opendc-compute-api")
include(":opendc-compute:opendc-compute-service")
include(":opendc-compute:opendc-compute-simulator")
+include(":opendc-compute:opendc-compute-workload")
include(":opendc-workflow:opendc-workflow-api")
include(":opendc-workflow:opendc-workflow-service")
include(":opendc-faas:opendc-faas-api")
@@ -50,7 +51,10 @@ include(":opendc-trace:opendc-trace-swf")
include(":opendc-trace:opendc-trace-wtf")
include(":opendc-trace:opendc-trace-wfformat")
include(":opendc-trace:opendc-trace-bitbrains")
+include(":opendc-trace:opendc-trace-azure")
+include(":opendc-trace:opendc-trace-opendc")
include(":opendc-trace:opendc-trace-parquet")
+include(":opendc-trace:opendc-trace-tools")
include(":opendc-harness:opendc-harness-api")
include(":opendc-harness:opendc-harness-engine")
include(":opendc-harness:opendc-harness-cli")
diff --git a/traces/bitbrains-small/meta.parquet b/traces/bitbrains-small/meta.parquet
index 43f51cb8..da6e5330 100644
--- a/traces/bitbrains-small/meta.parquet
+++ b/traces/bitbrains-small/meta.parquet
Binary files differ
diff --git a/traces/bitbrains-small/trace.parquet b/traces/bitbrains-small/trace.parquet
index f4dd5a50..fe0a254c 100644
--- a/traces/bitbrains-small/trace.parquet
+++ b/traces/bitbrains-small/trace.parquet
Binary files differ