summaryrefslogtreecommitdiff
path: root/opendc-experiments
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-09-30 20:57:16 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2022-10-03 20:47:12 +0200
commit448b4cafe3c757812138a8ca7580975191ac2f9c (patch)
treec3d25329908f718f1f7eae1dc944ef532f1849f1 /opendc-experiments
parent07743e75891e8b3ebcefe4771f92af8003ef0b1f (diff)
refactor(exp/compute): Integrate compute workload classes
This change integrates the classes from the old `opendc-compute-workload` module into the `opendc-experiments-compute` module. This new module contains helper classes for setting up experiments with the OpenDC compute service.
Diffstat (limited to 'opendc-experiments')
-rw-r--r--opendc-experiments/opendc-experiments-capelin/build.gradle.kts3
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt6
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt6
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt2
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/CompositeWorkloadPortfolio.kt4
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/HorVerPortfolio.kt4
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/MoreHpcPortfolio.kt6
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/MoreVelocityPortfolio.kt4
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/OperationalPhenomenaPortfolio.kt4
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/TestPortfolio.kt2
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt4
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt9
-rw-r--r--opendc-experiments/opendc-experiments-compute/build.gradle.kts12
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeSchedulers.kt86
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeSteps.kt6
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkload.kt35
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkloadLoader.kt255
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkloads.kt62
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModel.kt39
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModels.kt70
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt2
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt117
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/VirtualMachine.kt54
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetComputeMonitor.kt67
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetDataWriter.kt132
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetHostDataWriter.kt210
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetServerDataWriter.kt198
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetServiceDataWriter.kt128
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/Utils.kt38
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/CompositeComputeWorkload.kt66
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/HpcSampledComputeWorkload.kt143
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/LoadSampledComputeWorkload.kt61
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/TraceComputeWorkload.kt37
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt427
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMonitor.kt47
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMonitorProvisioningStep.kt (renamed from opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeMonitorProvisioningStep.kt)4
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/HostInfo.kt28
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/HostTableReader.kt125
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServerInfo.kt37
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServerTableReader.kt90
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServiceData.kt46
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServiceTableReader.kt70
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/topology/HostSpec.kt47
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/topology/Topology.kt33
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/test/kotlin/org/opendc/experiments/compute/export/parquet/HostDataWriterTest.kt79
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/test/kotlin/org/opendc/experiments/compute/export/parquet/ServerDataWriterTest.kt73
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/test/kotlin/org/opendc/experiments/compute/export/parquet/ServiceDataWriterTest.kt67
47 files changed, 3006 insertions, 39 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
index b19ce750..e19784ba 100644
--- a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
+++ b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
@@ -32,9 +32,8 @@ plugins {
}
dependencies {
- api(projects.opendcCompute.opendcComputeWorkload)
+ api(projects.opendcExperiments.opendcExperimentsCompute)
- implementation(projects.opendcExperiments.opendcExperimentsCompute)
implementation(projects.opendcSimulator.opendcSimulatorCore)
implementation(projects.opendcSimulator.opendcSimulatorCompute)
implementation(projects.opendcCompute.opendcComputeSimulator)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt b/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt
index 378c3833..db56f75d 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt
@@ -28,11 +28,9 @@ import org.opendc.compute.service.scheduler.filters.ComputeFilter
import org.opendc.compute.service.scheduler.filters.RamFilter
import org.opendc.compute.service.scheduler.filters.VCpuFilter
import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
-import org.opendc.compute.workload.*
-import org.opendc.compute.workload.topology.Topology
import org.opendc.experiments.capelin.topology.clusterTopology
-import org.opendc.experiments.compute.setupComputeService
-import org.opendc.experiments.compute.setupHosts
+import org.opendc.experiments.compute.*
+import org.opendc.experiments.compute.topology.Topology
import org.opendc.experiments.provisioner.Provisioner
import org.opendc.simulator.core.runBlockingSimulation
import org.openjdk.jmh.annotations.*
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt
index 6bd470f3..e019af34 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt
@@ -23,14 +23,10 @@
package org.opendc.experiments.capelin
import org.opendc.compute.service.ComputeService
-import org.opendc.compute.workload.ComputeWorkloadLoader
-import org.opendc.compute.workload.createComputeScheduler
-import org.opendc.compute.workload.export.parquet.ParquetComputeMonitor
-import org.opendc.compute.workload.grid5000
-import org.opendc.compute.workload.replay
import org.opendc.experiments.capelin.model.Scenario
import org.opendc.experiments.capelin.topology.clusterTopology
import org.opendc.experiments.compute.*
+import org.opendc.experiments.compute.export.parquet.ParquetComputeMonitor
import org.opendc.experiments.provisioner.Provisioner
import org.opendc.simulator.core.runBlockingSimulation
import java.io.File
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt
index a2e71243..ed2588f0 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt
@@ -22,7 +22,7 @@
package org.opendc.experiments.capelin.model
-import org.opendc.compute.workload.ComputeWorkload
+import org.opendc.experiments.compute.ComputeWorkload
/**
* A single workload originating from a trace.
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/CompositeWorkloadPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/CompositeWorkloadPortfolio.kt
index 68eb15b3..80b8859c 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/CompositeWorkloadPortfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/CompositeWorkloadPortfolio.kt
@@ -22,12 +22,12 @@
package org.opendc.experiments.capelin.portfolio
-import org.opendc.compute.workload.composite
-import org.opendc.compute.workload.trace
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Scenario
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
+import org.opendc.experiments.compute.composite
+import org.opendc.experiments.compute.trace
/**
* A [Portfolio] that explores the effect of a composite workload.
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/HorVerPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/HorVerPortfolio.kt
index 0d7f3072..f3c002ac 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/HorVerPortfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/HorVerPortfolio.kt
@@ -22,12 +22,12 @@
package org.opendc.experiments.capelin.portfolio
-import org.opendc.compute.workload.sampleByLoad
-import org.opendc.compute.workload.trace
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Scenario
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
+import org.opendc.experiments.compute.sampleByLoad
+import org.opendc.experiments.compute.trace
/**
* A [Portfolio] that explores the difference between horizontal and vertical scaling.
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/MoreHpcPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/MoreHpcPortfolio.kt
index 6afffc09..22f9f3ac 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/MoreHpcPortfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/MoreHpcPortfolio.kt
@@ -22,13 +22,13 @@
package org.opendc.experiments.capelin.portfolio
-import org.opendc.compute.workload.sampleByHpc
-import org.opendc.compute.workload.sampleByHpcLoad
-import org.opendc.compute.workload.trace
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Scenario
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
+import org.opendc.experiments.compute.sampleByHpc
+import org.opendc.experiments.compute.sampleByHpcLoad
+import org.opendc.experiments.compute.trace
/**
* A [Portfolio] to explore the effect of HPC workloads.
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/MoreVelocityPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/MoreVelocityPortfolio.kt
index 92bf80b3..e63a5807 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/MoreVelocityPortfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/MoreVelocityPortfolio.kt
@@ -22,12 +22,12 @@
package org.opendc.experiments.capelin.portfolio
-import org.opendc.compute.workload.sampleByLoad
-import org.opendc.compute.workload.trace
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Scenario
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
+import org.opendc.experiments.compute.sampleByLoad
+import org.opendc.experiments.compute.trace
/**
* A [Portfolio] that explores the effect of adding more velocity to a cluster (e.g., faster machines).
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/OperationalPhenomenaPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/OperationalPhenomenaPortfolio.kt
index f9a9d681..12570108 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/OperationalPhenomenaPortfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/OperationalPhenomenaPortfolio.kt
@@ -22,12 +22,12 @@
package org.opendc.experiments.capelin.portfolio
-import org.opendc.compute.workload.sampleByLoad
-import org.opendc.compute.workload.trace
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Scenario
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
+import org.opendc.experiments.compute.sampleByLoad
+import org.opendc.experiments.compute.trace
/**
* A [Portfolio] that explores the effect of operational phenomena on metrics.
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/TestPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/TestPortfolio.kt
index 944e9f43..6f126b87 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/TestPortfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/portfolio/TestPortfolio.kt
@@ -22,11 +22,11 @@
package org.opendc.experiments.capelin.portfolio
-import org.opendc.compute.workload.trace
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Scenario
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
+import org.opendc.experiments.compute.trace
/**
* A [Portfolio] to perform a simple test run.
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt
index 5ab4261a..d152a6db 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt
@@ -23,8 +23,8 @@
@file:JvmName("TopologyFactories")
package org.opendc.experiments.capelin.topology
-import org.opendc.compute.workload.topology.HostSpec
-import org.opendc.compute.workload.topology.Topology
+import org.opendc.experiments.compute.topology.HostSpec
+import org.opendc.experiments.compute.topology.Topology
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 3407e30f..ae1ca807 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -32,13 +32,12 @@ import org.opendc.compute.service.scheduler.filters.ComputeFilter
import org.opendc.compute.service.scheduler.filters.RamFilter
import org.opendc.compute.service.scheduler.filters.VCpuFilter
import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
-import org.opendc.compute.workload.*
-import org.opendc.compute.workload.telemetry.ComputeMonitor
-import org.opendc.compute.workload.telemetry.table.HostTableReader
-import org.opendc.compute.workload.telemetry.table.ServiceTableReader
-import org.opendc.compute.workload.topology.Topology
import org.opendc.experiments.capelin.topology.clusterTopology
import org.opendc.experiments.compute.*
+import org.opendc.experiments.compute.telemetry.ComputeMonitor
+import org.opendc.experiments.compute.telemetry.table.HostTableReader
+import org.opendc.experiments.compute.telemetry.table.ServiceTableReader
+import org.opendc.experiments.compute.topology.Topology
import org.opendc.experiments.provisioner.Provisioner
import org.opendc.simulator.core.runBlockingSimulation
import java.io.File
diff --git a/opendc-experiments/opendc-experiments-compute/build.gradle.kts b/opendc-experiments/opendc-experiments-compute/build.gradle.kts
index 70f16199..5cae1d43 100644
--- a/opendc-experiments/opendc-experiments-compute/build.gradle.kts
+++ b/opendc-experiments/opendc-experiments-compute/build.gradle.kts
@@ -30,6 +30,16 @@ plugins {
}
dependencies {
+ api(projects.opendcCompute.opendcComputeService)
api(projects.opendcExperiments.opendcExperimentsBase)
- api(projects.opendcCompute.opendcComputeWorkload)
+ api(projects.opendcCompute.opendcComputeSimulator)
+
+ implementation(projects.opendcTrace.opendcTraceApi)
+ implementation(projects.opendcTrace.opendcTraceParquet)
+ implementation(projects.opendcSimulator.opendcSimulatorCore)
+ implementation(projects.opendcSimulator.opendcSimulatorCompute)
+
+ implementation(libs.kotlin.logging)
+
+ testImplementation(libs.slf4j.simple)
}
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeSchedulers.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeSchedulers.kt
new file mode 100644
index 00000000..1731a4ac
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeSchedulers.kt
@@ -0,0 +1,86 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+@file:JvmName("ComputeSchedulers")
+package org.opendc.experiments.compute
+
+import org.opendc.compute.service.scheduler.ComputeScheduler
+import org.opendc.compute.service.scheduler.FilterScheduler
+import org.opendc.compute.service.scheduler.ReplayScheduler
+import org.opendc.compute.service.scheduler.filters.ComputeFilter
+import org.opendc.compute.service.scheduler.filters.RamFilter
+import org.opendc.compute.service.scheduler.filters.VCpuFilter
+import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
+import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher
+import org.opendc.compute.service.scheduler.weights.RamWeigher
+import org.opendc.compute.service.scheduler.weights.VCpuWeigher
+import java.util.*
+
+/**
+ * Create a [ComputeScheduler] for the experiment.
+ */
+public fun createComputeScheduler(name: String, seeder: Random, placements: Map<String, String> = emptyMap()): ComputeScheduler {
+ val cpuAllocationRatio = 16.0
+ val ramAllocationRatio = 1.5
+ return when (name) {
+ "mem" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(RamWeigher(multiplier = 1.0))
+ )
+ "mem-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(RamWeigher(multiplier = -1.0))
+ )
+ "core-mem" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(CoreRamWeigher(multiplier = 1.0))
+ )
+ "core-mem-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(CoreRamWeigher(multiplier = -1.0))
+ )
+ "active-servers" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(InstanceCountWeigher(multiplier = -1.0))
+ )
+ "active-servers-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(InstanceCountWeigher(multiplier = 1.0))
+ )
+ "provisioned-cores" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = 1.0))
+ )
+ "provisioned-cores-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = -1.0))
+ )
+ "random" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = emptyList(),
+ subsetSize = Int.MAX_VALUE,
+ random = Random(seeder.nextLong())
+ )
+ "replay" -> ReplayScheduler(placements)
+ else -> throw IllegalArgumentException("Unknown policy $name")
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeSteps.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeSteps.kt
index ce36fac8..3ae4b0df 100644
--- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeSteps.kt
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeSteps.kt
@@ -25,9 +25,9 @@ package org.opendc.experiments.compute
import org.opendc.compute.service.ComputeService
import org.opendc.compute.service.scheduler.ComputeScheduler
-import org.opendc.compute.workload.telemetry.ComputeMetricReader
-import org.opendc.compute.workload.telemetry.ComputeMonitor
-import org.opendc.compute.workload.topology.HostSpec
+import org.opendc.experiments.compute.telemetry.ComputeMonitor
+import org.opendc.experiments.compute.telemetry.ComputeMonitorProvisioningStep
+import org.opendc.experiments.compute.topology.HostSpec
import org.opendc.experiments.provisioner.ProvisioningContext
import org.opendc.experiments.provisioner.ProvisioningStep
import java.time.Duration
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkload.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkload.kt
new file mode 100644
index 00000000..3db980ca
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkload.kt
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.compute
+
+import java.util.*
+
+/**
+ * An interface that describes how a workload is resolved.
+ */
+public interface ComputeWorkload {
+ /**
+ * Resolve the workload into a list of [VirtualMachine]s to simulate.
+ */
+ public fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine>
+}
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkloadLoader.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkloadLoader.kt
new file mode 100644
index 00000000..f92e10e3
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkloadLoader.kt
@@ -0,0 +1,255 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.compute
+
+import mu.KotlinLogging
+import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
+import org.opendc.simulator.compute.workload.SimTrace
+import org.opendc.trace.*
+import org.opendc.trace.conv.*
+import java.io.File
+import java.lang.ref.SoftReference
+import java.util.*
+import java.util.concurrent.ConcurrentHashMap
+import kotlin.math.max
+import kotlin.math.roundToLong
+
+/**
+ * A helper class for loading compute workload traces into memory.
+ *
+ * @param baseDir The directory containing the traces.
+ */
+public class ComputeWorkloadLoader(private val baseDir: File) {
+ /**
+ * The logger for this instance.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
+ * The cache of workloads.
+ */
+ private val cache = ConcurrentHashMap<String, SoftReference<List<VirtualMachine>>>()
+
+ /**
+ * Read the fragments into memory.
+ */
+ private fun parseFragments(trace: Trace): Map<String, Builder> {
+ val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader()
+
+ val idCol = reader.resolve(RESOURCE_ID)
+ val timestampCol = reader.resolve(RESOURCE_STATE_TIMESTAMP)
+ val durationCol = reader.resolve(RESOURCE_STATE_DURATION)
+ val coresCol = reader.resolve(RESOURCE_CPU_COUNT)
+ val usageCol = reader.resolve(RESOURCE_STATE_CPU_USAGE)
+
+ val fragments = mutableMapOf<String, Builder>()
+
+ return try {
+ while (reader.nextRow()) {
+ val id = reader.getString(idCol)!!
+ val time = reader.getInstant(timestampCol)!!
+ val duration = reader.getDuration(durationCol)!!
+ val cores = reader.getInt(coresCol)
+ val cpuUsage = reader.getDouble(usageCol)
+
+ val deadlineMs = time.toEpochMilli()
+ val timeMs = (time - duration).toEpochMilli()
+ val builder = fragments.computeIfAbsent(id) { Builder() }
+ builder.add(timeMs, deadlineMs, cpuUsage, cores)
+ }
+
+ fragments
+ } finally {
+ reader.close()
+ }
+ }
+
+ /**
+ * Read the metadata into a workload.
+ */
+ private fun parseMeta(trace: Trace, fragments: Map<String, Builder>, interferenceModel: VmInterferenceModel): List<VirtualMachine> {
+ val reader = checkNotNull(trace.getTable(TABLE_RESOURCES)).newReader()
+
+ val idCol = reader.resolve(RESOURCE_ID)
+ val startTimeCol = reader.resolve(RESOURCE_START_TIME)
+ val stopTimeCol = reader.resolve(RESOURCE_STOP_TIME)
+ val cpuCountCol = reader.resolve(RESOURCE_CPU_COUNT)
+ val cpuCapacityCol = reader.resolve(RESOURCE_CPU_CAPACITY)
+ val memCol = reader.resolve(RESOURCE_MEM_CAPACITY)
+
+ var counter = 0
+ val entries = mutableListOf<VirtualMachine>()
+
+ return try {
+ while (reader.nextRow()) {
+
+ val id = reader.getString(idCol)!!
+ if (!fragments.containsKey(id)) {
+ continue
+ }
+
+ val submissionTime = reader.getInstant(startTimeCol)!!
+ val endTime = reader.getInstant(stopTimeCol)!!
+ val cpuCount = reader.getInt(cpuCountCol)
+ val cpuCapacity = reader.getDouble(cpuCapacityCol)
+ val memCapacity = reader.getDouble(memCol) / 1000.0 // Convert from KB to MB
+ val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray())
+
+ val builder = fragments.getValue(id)
+ val totalLoad = builder.totalLoad
+
+ entries.add(
+ VirtualMachine(
+ uid,
+ id,
+ cpuCount,
+ cpuCapacity,
+ memCapacity.roundToLong(),
+ totalLoad,
+ submissionTime,
+ endTime,
+ builder.build(),
+ interferenceModel.getProfile(id)
+ )
+ )
+ }
+
+ // Make sure the virtual machines are ordered by start time
+ entries.sortBy { it.startTime }
+
+ entries
+ } catch (e: Exception) {
+ e.printStackTrace()
+ throw e
+ } finally {
+ reader.close()
+ }
+ }
+
+ /**
+ * Read the interference model associated with the specified [trace].
+ */
+ private fun parseInterferenceModel(trace: Trace): VmInterferenceModel {
+ val reader = checkNotNull(trace.getTable(TABLE_INTERFERENCE_GROUPS)).newReader()
+
+ return try {
+ val membersCol = reader.resolve(INTERFERENCE_GROUP_MEMBERS)
+ val targetCol = reader.resolve(INTERFERENCE_GROUP_TARGET)
+ val scoreCol = reader.resolve(INTERFERENCE_GROUP_SCORE)
+
+ val modelBuilder = VmInterferenceModel.builder()
+
+ while (reader.nextRow()) {
+ val members = reader.getSet(membersCol, String::class.java)!!
+ val target = reader.getDouble(targetCol)
+ val score = reader.getDouble(scoreCol)
+
+ modelBuilder
+ .addGroup(members, target, score)
+ }
+
+ modelBuilder.build()
+ } finally {
+ reader.close()
+ }
+ }
+
+ /**
+ * Load the trace with the specified [name] and [format].
+ */
+ public fun get(name: String, format: String): List<VirtualMachine> {
+ val ref = cache.compute(name) { key, oldVal ->
+ val inst = oldVal?.get()
+ if (inst == null) {
+
+ val path = baseDir.resolve(key)
+
+ logger.info { "Loading trace $key at $path" }
+
+ val trace = Trace.open(path, format)
+ val fragments = parseFragments(trace)
+ val interferenceModel = parseInterferenceModel(trace)
+ val vms = parseMeta(trace, fragments, interferenceModel)
+
+ SoftReference(vms)
+ } else {
+ oldVal
+ }
+ }
+
+ return checkNotNull(ref?.get()) { "Memory pressure" }
+ }
+
+ /**
+ * Clear the workload cache.
+ */
+ public fun reset() {
+ cache.clear()
+ }
+
+ /**
+ * A builder for a VM trace.
+ */
+ private class Builder {
+ /**
+ * The total load of the trace.
+ */
+ @JvmField var totalLoad: Double = 0.0
+
+ /**
+ * The internal builder for the trace.
+ */
+ private val builder = SimTrace.builder()
+
+ /**
+ * The deadline of the previous fragment.
+ */
+ private var previousDeadline = Long.MIN_VALUE
+
+ /**
+ * Add a fragment to the trace.
+ *
+ * @param timestamp Timestamp at which the fragment starts (in epoch millis).
+ * @param deadline Timestamp at which the fragment ends (in epoch millis).
+ * @param usage CPU usage of this fragment.
+ * @param cores Number of cores used.
+ */
+ fun add(timestamp: Long, deadline: Long, usage: Double, cores: Int) {
+ val duration = max(0, deadline - timestamp)
+ totalLoad += (usage * duration) / 1000.0 // avg MHz * duration = MFLOPs
+
+ if (timestamp != previousDeadline) {
+ // There is a gap between the previous and current fragment; fill the gap
+ builder.add(timestamp, 0.0, cores)
+ }
+
+ builder.add(deadline, usage, cores)
+ previousDeadline = deadline
+ }
+
+ /**
+ * Build the trace.
+ */
+ fun build(): SimTrace = builder.build()
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkloads.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkloads.kt
new file mode 100644
index 00000000..732f761e
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkloads.kt
@@ -0,0 +1,62 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+@file:JvmName("ComputeWorkloads")
+package org.opendc.experiments.compute
+
+import org.opendc.experiments.compute.internal.CompositeComputeWorkload
+import org.opendc.experiments.compute.internal.HpcSampledComputeWorkload
+import org.opendc.experiments.compute.internal.LoadSampledComputeWorkload
+import org.opendc.experiments.compute.internal.TraceComputeWorkload
+
+/**
+ * Construct a workload from a trace.
+ */
+public fun trace(name: String, format: String = "opendc-vm"): ComputeWorkload = TraceComputeWorkload(name, format)
+
+/**
+ * Construct a composite workload with the specified fractions.
+ */
+public fun composite(vararg pairs: Pair<ComputeWorkload, Double>): ComputeWorkload {
+ return CompositeComputeWorkload(pairs.toMap())
+}
+
+/**
+ * Sample a workload by a [fraction] of the total load.
+ */
+public fun ComputeWorkload.sampleByLoad(fraction: Double): ComputeWorkload {
+ return LoadSampledComputeWorkload(this, fraction)
+}
+
+/**
+ * Sample a workload by a [fraction] of the HPC VMs (count)
+ */
+public fun ComputeWorkload.sampleByHpc(fraction: Double): ComputeWorkload {
+ return HpcSampledComputeWorkload(this, fraction)
+}
+
+/**
+ * Sample a workload by a [fraction] of the HPC load
+ */
+public fun ComputeWorkload.sampleByHpcLoad(fraction: Double): ComputeWorkload {
+ return HpcSampledComputeWorkload(this, fraction, sampleLoad = true)
+}
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModel.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModel.kt
new file mode 100644
index 00000000..f96b7e16
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModel.kt
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.compute
+
+import org.opendc.compute.service.ComputeService
+import org.opendc.compute.simulator.failure.HostFaultInjector
+import java.time.Clock
+import java.util.*
+import kotlin.coroutines.CoroutineContext
+
+/**
+ * Factory interface for constructing [HostFaultInjector] for modeling failures of compute service hosts.
+ */
+public interface FailureModel {
+ /**
+ * Construct a [HostFaultInjector] for the specified [service].
+ */
+ public fun createInjector(context: CoroutineContext, clock: Clock, service: ComputeService, random: Random): HostFaultInjector
+}
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModels.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModels.kt
new file mode 100644
index 00000000..00bf44a1
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/FailureModels.kt
@@ -0,0 +1,70 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+@file:JvmName("FailureModels")
+package org.opendc.experiments.compute
+
+import org.apache.commons.math3.distribution.LogNormalDistribution
+import org.apache.commons.math3.random.Well19937c
+import org.opendc.compute.service.ComputeService
+import org.opendc.compute.simulator.SimHost
+import org.opendc.compute.simulator.failure.HostFaultInjector
+import org.opendc.compute.simulator.failure.StartStopHostFault
+import org.opendc.compute.simulator.failure.StochasticVictimSelector
+import java.time.Clock
+import java.time.Duration
+import java.util.*
+import kotlin.coroutines.CoroutineContext
+import kotlin.math.ln
+
+/**
+ * Obtain a [FailureModel] based on the GRID'5000 failure trace.
+ *
+ * This fault injector uses parameters from the GRID'5000 failure trace as described in
+ * "A Framework for the Study of Grid Inter-Operation Mechanisms", A. Iosup, 2009.
+ */
+public fun grid5000(failureInterval: Duration): FailureModel {
+ return object : FailureModel {
+ override fun createInjector(
+ context: CoroutineContext,
+ clock: Clock,
+ service: ComputeService,
+ random: Random
+ ): HostFaultInjector {
+ val rng = Well19937c(random.nextLong())
+ val hosts = service.hosts.map { it as SimHost }.toSet()
+
+ // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009
+ // GRID'5000
+ return HostFaultInjector(
+ context,
+ clock,
+ hosts,
+ iat = LogNormalDistribution(rng, ln(failureInterval.toHours().toDouble()), 1.03),
+ selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), random),
+ fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71))
+ )
+ }
+
+ override fun toString(): String = "Grid5000FailureModel"
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt
index 6aca4ab7..28c9bc01 100644
--- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt
@@ -24,7 +24,7 @@ package org.opendc.experiments.compute
import org.opendc.compute.service.ComputeService
import org.opendc.compute.simulator.SimHost
-import org.opendc.compute.workload.topology.HostSpec
+import org.opendc.experiments.compute.topology.HostSpec
import org.opendc.experiments.provisioner.ProvisioningContext
import org.opendc.experiments.provisioner.ProvisioningStep
import org.opendc.simulator.compute.SimBareMetalMachine
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt
new file mode 100644
index 00000000..0df9305a
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt
@@ -0,0 +1,117 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+@file:JvmName("TraceHelpers")
+package org.opendc.experiments.compute
+
+import kotlinx.coroutines.*
+import org.opendc.compute.service.ComputeService
+import org.opendc.simulator.compute.workload.SimTraceWorkload
+import java.time.Clock
+import java.util.*
+import kotlin.coroutines.coroutineContext
+import kotlin.math.max
+
+/**
+ * Helper method to replay the specified list of [VirtualMachine] and suspend execution util all VMs have finished.
+ *
+ * @param clock The simulation clock.
+ * @param trace The trace to simulate.
+ * @param seed The seed to use for randomness.
+ * @param submitImmediately A flag to indicate that the servers are scheduled immediately (so not at their start time).
+ * @param failureModel A failure model to use for injecting failures.
+ * @param interference A flag to indicate that VM interference needs to be enabled.
+ */
+public suspend fun ComputeService.replay(
+ clock: Clock,
+ trace: List<VirtualMachine>,
+ seed: Long,
+ submitImmediately: Boolean = false,
+ failureModel: FailureModel? = null,
+ interference: Boolean = false
+) {
+ val injector = failureModel?.createInjector(coroutineContext, clock, this, Random(seed))
+ val client = newClient()
+
+ // Create new image for the virtual machine
+ val image = client.newImage("vm-image")
+
+ try {
+ coroutineScope {
+ // Start the fault injector
+ injector?.start()
+
+ var offset = Long.MIN_VALUE
+
+ for (entry in trace.sortedBy { it.startTime }) {
+ val now = clock.millis()
+ val start = entry.startTime.toEpochMilli()
+
+ if (offset < 0) {
+ offset = start - now
+ }
+
+ // Make sure the trace entries are ordered by submission time
+ assert(start - offset >= 0) { "Invalid trace order" }
+
+ if (!submitImmediately) {
+ delay(max(0, (start - offset) - now))
+ }
+
+ val workloadOffset = -offset + 300001
+ val workload = SimTraceWorkload(entry.trace, workloadOffset)
+ val meta = mutableMapOf<String, Any>("workload" to workload)
+
+ val interferenceProfile = entry.interferenceProfile
+ if (interference && interferenceProfile != null) {
+ meta["interference-profile"] = interferenceProfile
+ }
+
+ launch {
+ val server = client.newServer(
+ entry.name,
+ image,
+ client.newFlavor(
+ entry.name,
+ entry.cpuCount,
+ entry.memCapacity,
+ meta = if (entry.cpuCapacity > 0.0) mapOf("cpu-capacity" to entry.cpuCapacity) else emptyMap()
+ ),
+ meta = meta
+ )
+
+ // Wait for the server reach its end time
+ val endTime = entry.stopTime.toEpochMilli()
+ delay(endTime + workloadOffset - clock.millis() + 5 * 60 * 1000)
+
+ // Stop the server after reaching the end-time of the virtual machine
+ server.stop()
+ }
+ }
+ }
+
+ yield()
+ } finally {
+ injector?.close()
+ client.close()
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/VirtualMachine.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/VirtualMachine.kt
new file mode 100644
index 00000000..3ed497a0
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/VirtualMachine.kt
@@ -0,0 +1,54 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.compute
+
+import org.opendc.simulator.compute.kernel.interference.VmInterferenceProfile
+import org.opendc.simulator.compute.workload.SimTrace
+import java.time.Instant
+import java.util.*
+
+/**
+ * A virtual machine workload.
+ *
+ * @param uid The unique identifier of the virtual machine.
+ * @param name The name of the virtual machine.
+ * @param cpuCapacity The required CPU capacity for the VM in MHz.
+ * @param cpuCount The number of vCPUs in the VM.
+ * @param memCapacity The provisioned memory for the VM in MB.
+ * @param startTime The start time of the VM.
+ * @param stopTime The stop time of the VM.
+ * @param trace The trace that belong to this VM.
+ * @param interferenceProfile The interference profile of this virtual machine.
+ */
+public data class VirtualMachine(
+ val uid: UUID,
+ val name: String,
+ val cpuCount: Int,
+ val cpuCapacity: Double,
+ val memCapacity: Long,
+ val totalLoad: Double,
+ val startTime: Instant,
+ val stopTime: Instant,
+ val trace: SimTrace,
+ val interferenceProfile: VmInterferenceProfile?
+)
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetComputeMonitor.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetComputeMonitor.kt
new file mode 100644
index 00000000..a104851f
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetComputeMonitor.kt
@@ -0,0 +1,67 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.compute.export.parquet
+
+import org.opendc.experiments.compute.telemetry.ComputeMonitor
+import org.opendc.experiments.compute.telemetry.table.HostTableReader
+import org.opendc.experiments.compute.telemetry.table.ServerTableReader
+import org.opendc.experiments.compute.telemetry.table.ServiceTableReader
+import java.io.File
+
+/**
+ * A [ComputeMonitor] that logs the events to a Parquet file.
+ */
+public class ParquetComputeMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable {
+ private val serverWriter = ParquetServerDataWriter(
+ File(base, "server/$partition/data.parquet").also { it.parentFile.mkdirs() },
+ bufferSize
+ )
+
+ private val hostWriter = ParquetHostDataWriter(
+ File(base, "host/$partition/data.parquet").also { it.parentFile.mkdirs() },
+ bufferSize
+ )
+
+ private val serviceWriter = ParquetServiceDataWriter(
+ File(base, "service/$partition/data.parquet").also { it.parentFile.mkdirs() },
+ bufferSize
+ )
+
+ override fun record(reader: ServerTableReader) {
+ serverWriter.write(reader)
+ }
+
+ override fun record(reader: HostTableReader) {
+ hostWriter.write(reader)
+ }
+
+ override fun record(reader: ServiceTableReader) {
+ serviceWriter.write(reader)
+ }
+
+ override fun close() {
+ hostWriter.close()
+ serviceWriter.close()
+ serverWriter.close()
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetDataWriter.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetDataWriter.kt
new file mode 100644
index 00000000..60629a95
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetDataWriter.kt
@@ -0,0 +1,132 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.compute.export.parquet
+
+import mu.KotlinLogging
+import org.apache.parquet.column.ParquetProperties
+import org.apache.parquet.hadoop.ParquetFileWriter
+import org.apache.parquet.hadoop.ParquetWriter
+import org.apache.parquet.hadoop.api.WriteSupport
+import org.apache.parquet.hadoop.metadata.CompressionCodecName
+import org.opendc.trace.util.parquet.LocalParquetWriter
+import java.io.File
+import java.util.concurrent.ArrayBlockingQueue
+import java.util.concurrent.BlockingQueue
+import kotlin.concurrent.thread
+
+/**
+ * A writer that writes data in Parquet format.
+ *
+ * @param path The path to the file to write the data to.
+ * @param writeSupport The [WriteSupport] implementation for converting the records to Parquet format.
+ */
+public abstract class ParquetDataWriter<in T>(
+ path: File,
+ private val writeSupport: WriteSupport<T>,
+ bufferSize: Int = 4096
+) : AutoCloseable {
+ /**
+ * The logging instance to use.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
+ * The queue of records to process.
+ */
+ private val queue: BlockingQueue<T> = ArrayBlockingQueue(bufferSize)
+
+ /**
+ * An exception to be propagated to the actual writer.
+ */
+ private var exception: Throwable? = null
+
+ /**
+ * The thread that is responsible for writing the Parquet records.
+ */
+ private val writerThread = thread(start = false, name = this.toString()) {
+ val writer = let {
+ val builder = LocalParquetWriter.builder(path.toPath(), writeSupport)
+ .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
+ .withCompressionCodec(CompressionCodecName.ZSTD)
+ .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+ buildWriter(builder)
+ }
+
+ val queue = queue
+ val buf = mutableListOf<T>()
+ var shouldStop = false
+
+ try {
+ while (!shouldStop) {
+ try {
+ writer.write(queue.take())
+ } catch (e: InterruptedException) {
+ shouldStop = true
+ }
+
+ if (queue.drainTo(buf) > 0) {
+ for (data in buf) {
+ writer.write(data)
+ }
+ buf.clear()
+ }
+ }
+ } catch (e: Throwable) {
+ logger.error(e) { "Failure in Parquet data writer" }
+ exception = e
+ } finally {
+ writer.close()
+ }
+ }
+
+ /**
+ * Build the [ParquetWriter] used to write the Parquet files.
+ */
+ protected open fun buildWriter(builder: LocalParquetWriter.Builder<@UnsafeVariance T>): ParquetWriter<@UnsafeVariance T> {
+ return builder.build()
+ }
+
+ /**
+ * Write the specified metrics to the database.
+ */
+ public fun write(data: T) {
+ val exception = exception
+ if (exception != null) {
+ throw IllegalStateException("Writer thread failed", exception)
+ }
+
+ queue.put(data)
+ }
+
+ /**
+ * Signal the writer to stop.
+ */
+ override fun close() {
+ writerThread.interrupt()
+ writerThread.join()
+ }
+
+ init {
+ writerThread.start()
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetHostDataWriter.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetHostDataWriter.kt
new file mode 100644
index 00000000..cf0a3bf2
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetHostDataWriter.kt
@@ -0,0 +1,210 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.compute.export.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.ParquetWriter
+import org.apache.parquet.hadoop.api.WriteSupport
+import org.apache.parquet.io.api.RecordConsumer
+import org.apache.parquet.schema.*
+import org.opendc.experiments.compute.telemetry.table.HostTableReader
+import org.opendc.trace.util.parquet.LocalParquetWriter
+import java.io.File
+import java.util.*
+
+/**
+ * A Parquet event writer for [HostTableReader]s.
+ */
+public class ParquetHostDataWriter(path: File, bufferSize: Int) :
+ ParquetDataWriter<HostTableReader>(path, HostDataWriteSupport(), bufferSize) {
+
+ override fun buildWriter(builder: LocalParquetWriter.Builder<HostTableReader>): ParquetWriter<HostTableReader> {
+ return builder
+ .withDictionaryEncoding("host_id", true)
+ .build()
+ }
+
+ override fun toString(): String = "host-writer"
+
+ /**
+ * A [WriteSupport] implementation for a [HostTableReader].
+ */
+ private class HostDataWriteSupport : WriteSupport<HostTableReader>() {
+ lateinit var recordConsumer: RecordConsumer
+
+ override fun init(configuration: Configuration): WriteContext {
+ return WriteContext(SCHEMA, emptyMap())
+ }
+
+ override fun prepareForWrite(recordConsumer: RecordConsumer) {
+ this.recordConsumer = recordConsumer
+ }
+
+ override fun write(record: HostTableReader) {
+ write(recordConsumer, record)
+ }
+
+ private fun write(consumer: RecordConsumer, data: HostTableReader) {
+ consumer.startMessage()
+
+ consumer.startField("timestamp", 0)
+ consumer.addLong(data.timestamp.toEpochMilli())
+ consumer.endField("timestamp", 0)
+
+ consumer.startField("host_id", 1)
+ consumer.addBinary(UUID.fromString(data.host.id).toBinary())
+ consumer.endField("host_id", 1)
+
+ consumer.startField("uptime", 2)
+ consumer.addLong(data.uptime)
+ consumer.endField("uptime", 2)
+
+ consumer.startField("downtime", 3)
+ consumer.addLong(data.downtime)
+ consumer.endField("downtime", 3)
+
+ val bootTime = data.bootTime
+ if (bootTime != null) {
+ consumer.startField("boot_time", 4)
+ consumer.addLong(bootTime.toEpochMilli())
+ consumer.endField("boot_time", 4)
+ }
+
+ consumer.startField("cpu_count", 5)
+ consumer.addInteger(data.host.cpuCount)
+ consumer.endField("cpu_count", 5)
+
+ consumer.startField("cpu_limit", 6)
+ consumer.addDouble(data.cpuLimit)
+ consumer.endField("cpu_limit", 6)
+
+ consumer.startField("cpu_time_active", 7)
+ consumer.addLong(data.cpuActiveTime)
+ consumer.endField("cpu_time_active", 7)
+
+ consumer.startField("cpu_time_idle", 8)
+ consumer.addLong(data.cpuIdleTime)
+ consumer.endField("cpu_time_idle", 8)
+
+ consumer.startField("cpu_time_steal", 9)
+ consumer.addLong(data.cpuStealTime)
+ consumer.endField("cpu_time_steal", 9)
+
+ consumer.startField("cpu_time_lost", 10)
+ consumer.addLong(data.cpuLostTime)
+ consumer.endField("cpu_time_lost", 10)
+
+ consumer.startField("mem_limit", 11)
+ consumer.addLong(data.host.memCapacity)
+ consumer.endField("mem_limit", 11)
+
+ consumer.startField("power_total", 12)
+ consumer.addDouble(data.powerTotal)
+ consumer.endField("power_total", 12)
+
+ consumer.startField("guests_terminated", 13)
+ consumer.addInteger(data.guestsTerminated)
+ consumer.endField("guests_terminated", 13)
+
+ consumer.startField("guests_running", 14)
+ consumer.addInteger(data.guestsRunning)
+ consumer.endField("guests_running", 14)
+
+ consumer.startField("guests_error", 15)
+ consumer.addInteger(data.guestsError)
+ consumer.endField("guests_error", 15)
+
+ consumer.startField("guests_invalid", 16)
+ consumer.addInteger(data.guestsInvalid)
+ consumer.endField("guests_invalid", 16)
+
+ consumer.endMessage()
+ }
+ }
+
+ private companion object {
+ /**
+ * The schema of the host data.
+ */
+ val SCHEMA: MessageType = Types
+ .buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("timestamp"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY)
+ .length(16)
+ .`as`(LogicalTypeAnnotation.uuidType())
+ .named("host_id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("uptime"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("downtime"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("boot_time"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cpu_count"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_limit"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("cpu_time_active"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("cpu_time_idle"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("cpu_time_steal"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("cpu_time_lost"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("mem_limit"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("power_total"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("guests_terminated"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("guests_running"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("guests_error"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("guests_invalid"),
+ )
+ .named("host")
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetServerDataWriter.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetServerDataWriter.kt
new file mode 100644
index 00000000..1622289e
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetServerDataWriter.kt
@@ -0,0 +1,198 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.compute.export.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.ParquetWriter
+import org.apache.parquet.hadoop.api.WriteSupport
+import org.apache.parquet.io.api.RecordConsumer
+import org.apache.parquet.schema.*
+import org.opendc.experiments.compute.telemetry.table.ServerTableReader
+import org.opendc.trace.util.parquet.LocalParquetWriter
+import java.io.File
+import java.util.*
+
+/**
+ * A Parquet event writer for [ServerTableReader]s.
+ */
+public class ParquetServerDataWriter(path: File, bufferSize: Int) :
+ ParquetDataWriter<ServerTableReader>(path, ServerDataWriteSupport(), bufferSize) {
+
+ override fun buildWriter(builder: LocalParquetWriter.Builder<ServerTableReader>): ParquetWriter<ServerTableReader> {
+ return builder
+ .withDictionaryEncoding("server_id", true)
+ .withDictionaryEncoding("host_id", true)
+ .build()
+ }
+
+ override fun toString(): String = "server-writer"
+
+ /**
+ * A [WriteSupport] implementation for a [ServerTableReader].
+ */
+ private class ServerDataWriteSupport : WriteSupport<ServerTableReader>() {
+ lateinit var recordConsumer: RecordConsumer
+
+ override fun init(configuration: Configuration): WriteContext {
+ return WriteContext(SCHEMA, emptyMap())
+ }
+
+ override fun prepareForWrite(recordConsumer: RecordConsumer) {
+ this.recordConsumer = recordConsumer
+ }
+
+ override fun write(record: ServerTableReader) {
+ write(recordConsumer, record)
+ }
+
+ private fun write(consumer: RecordConsumer, data: ServerTableReader) {
+ consumer.startMessage()
+
+ consumer.startField("timestamp", 0)
+ consumer.addLong(data.timestamp.toEpochMilli())
+ consumer.endField("timestamp", 0)
+
+ consumer.startField("server_id", 1)
+ consumer.addBinary(UUID.fromString(data.server.id).toBinary())
+ consumer.endField("server_id", 1)
+
+ val hostId = data.host?.id
+ if (hostId != null) {
+ consumer.startField("host_id", 2)
+ consumer.addBinary(UUID.fromString(hostId).toBinary())
+ consumer.endField("host_id", 2)
+ }
+
+ consumer.startField("uptime", 3)
+ consumer.addLong(data.uptime)
+ consumer.endField("uptime", 3)
+
+ consumer.startField("downtime", 4)
+ consumer.addLong(data.downtime)
+ consumer.endField("downtime", 4)
+
+ val bootTime = data.bootTime
+ if (bootTime != null) {
+ consumer.startField("boot_time", 5)
+ consumer.addLong(bootTime.toEpochMilli())
+ consumer.endField("boot_time", 5)
+ }
+
+ val provisionTime = data.provisionTime
+ if (provisionTime != null) {
+ consumer.startField("provision_time", 6)
+ consumer.addLong(provisionTime.toEpochMilli())
+ consumer.endField("provision_time", 6)
+ }
+
+ consumer.startField("cpu_count", 7)
+ consumer.addInteger(data.server.cpuCount)
+ consumer.endField("cpu_count", 7)
+
+ consumer.startField("cpu_limit", 8)
+ consumer.addDouble(data.cpuLimit)
+ consumer.endField("cpu_limit", 8)
+
+ consumer.startField("cpu_time_active", 9)
+ consumer.addLong(data.cpuActiveTime)
+ consumer.endField("cpu_time_active", 9)
+
+ consumer.startField("cpu_time_idle", 10)
+ consumer.addLong(data.cpuIdleTime)
+ consumer.endField("cpu_time_idle", 10)
+
+ consumer.startField("cpu_time_steal", 11)
+ consumer.addLong(data.cpuStealTime)
+ consumer.endField("cpu_time_steal", 11)
+
+ consumer.startField("cpu_time_lost", 12)
+ consumer.addLong(data.cpuLostTime)
+ consumer.endField("cpu_time_lost", 12)
+
+ consumer.startField("mem_limit", 13)
+ consumer.addLong(data.server.memCapacity)
+ consumer.endField("mem_limit", 13)
+
+ consumer.endMessage()
+ }
+ }
+
+ private companion object {
+ /**
+ * The schema of the server data.
+ */
+ val SCHEMA: MessageType = Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("timestamp"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY)
+ .length(16)
+ .`as`(LogicalTypeAnnotation.uuidType())
+ .named("server_id"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY)
+ .length(16)
+ .`as`(LogicalTypeAnnotation.uuidType())
+ .named("host_id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("uptime"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("downtime"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("provision_time"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("boot_time"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cpu_count"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_limit"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("cpu_time_active"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("cpu_time_idle"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("cpu_time_steal"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("cpu_time_lost"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("mem_limit")
+ )
+ .named("server")
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetServiceDataWriter.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetServiceDataWriter.kt
new file mode 100644
index 00000000..0c466d39
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetServiceDataWriter.kt
@@ -0,0 +1,128 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.compute.export.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.api.WriteSupport
+import org.apache.parquet.io.api.RecordConsumer
+import org.apache.parquet.schema.*
+import org.opendc.experiments.compute.telemetry.table.ServiceTableReader
+import java.io.File
+
+/**
+ * A Parquet event writer for [ServiceTableReader]s.
+ */
+public class ParquetServiceDataWriter(path: File, bufferSize: Int) :
+ ParquetDataWriter<ServiceTableReader>(path, ServiceDataWriteSupport(), bufferSize) {
+
+ override fun toString(): String = "service-writer"
+
+ /**
+ * A [WriteSupport] implementation for a [ServiceTableReader].
+ */
+ private class ServiceDataWriteSupport : WriteSupport<ServiceTableReader>() {
+ lateinit var recordConsumer: RecordConsumer
+
+ override fun init(configuration: Configuration): WriteContext {
+ return WriteContext(SCHEMA, emptyMap())
+ }
+
+ override fun prepareForWrite(recordConsumer: RecordConsumer) {
+ this.recordConsumer = recordConsumer
+ }
+
+ override fun write(record: ServiceTableReader) {
+ write(recordConsumer, record)
+ }
+
+ private fun write(consumer: RecordConsumer, data: ServiceTableReader) {
+ consumer.startMessage()
+
+ consumer.startField("timestamp", 0)
+ consumer.addLong(data.timestamp.toEpochMilli())
+ consumer.endField("timestamp", 0)
+
+ consumer.startField("hosts_up", 1)
+ consumer.addInteger(data.hostsUp)
+ consumer.endField("hosts_up", 1)
+
+ consumer.startField("hosts_down", 2)
+ consumer.addInteger(data.hostsDown)
+ consumer.endField("hosts_down", 2)
+
+ consumer.startField("servers_pending", 3)
+ consumer.addInteger(data.serversPending)
+ consumer.endField("servers_pending", 3)
+
+ consumer.startField("servers_active", 4)
+ consumer.addInteger(data.serversActive)
+ consumer.endField("servers_active", 4)
+
+ consumer.startField("attempts_success", 5)
+ consumer.addInteger(data.attemptsSuccess)
+ consumer.endField("attempts_pending", 5)
+
+ consumer.startField("attempts_failure", 6)
+ consumer.addInteger(data.attemptsFailure)
+ consumer.endField("attempts_failure", 6)
+
+ consumer.startField("attempts_error", 7)
+ consumer.addInteger(data.attemptsError)
+ consumer.endField("attempts_error", 7)
+
+ consumer.endMessage()
+ }
+ }
+
+ private companion object {
+ private val SCHEMA: MessageType = Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("timestamp"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("hosts_up"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("hosts_down"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("servers_pending"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("servers_active"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("attempts_success"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("attempts_failure"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("attempts_error"),
+ )
+ .named("service")
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/Utils.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/Utils.kt
new file mode 100644
index 00000000..a3f2d597
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/Utils.kt
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.compute.export.parquet
+
+import org.apache.parquet.io.api.Binary
+import java.nio.ByteBuffer
+import java.util.UUID
+
+/**
+ * Helper method to convert a [UUID] into a [Binary] object consumed by Parquet.
+ */
+internal fun UUID.toBinary(): Binary {
+ val bb = ByteBuffer.allocate(16)
+ bb.putLong(mostSignificantBits)
+ bb.putLong(leastSignificantBits)
+ bb.rewind()
+ return Binary.fromConstantByteBuffer(bb)
+}
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/CompositeComputeWorkload.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/CompositeComputeWorkload.kt
new file mode 100644
index 00000000..75779088
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/CompositeComputeWorkload.kt
@@ -0,0 +1,66 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.compute.internal
+
+import mu.KotlinLogging
+import org.opendc.experiments.compute.ComputeWorkload
+import org.opendc.experiments.compute.ComputeWorkloadLoader
+import org.opendc.experiments.compute.VirtualMachine
+import java.util.*
+
+/**
+ * A [ComputeWorkload] that samples multiple workloads based on the total load of all workloads.
+ */
+internal class CompositeComputeWorkload(val sources: Map<ComputeWorkload, Double>) : ComputeWorkload {
+ /**
+ * The logging instance of this class.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> {
+ val traces = sources.map { (source, fraction) -> fraction to source.resolve(loader, random) }
+
+ val totalLoad = traces.sumOf { (_, vms) -> vms.sumOf { it.totalLoad } }
+
+ val res = mutableListOf<VirtualMachine>()
+
+ for ((fraction, vms) in traces) {
+ var currentLoad = 0.0
+
+ for (entry in vms) {
+ val entryLoad = entry.totalLoad
+ if ((currentLoad + entryLoad) / totalLoad > fraction) {
+ break
+ }
+
+ currentLoad += entryLoad
+ res += entry
+ }
+ }
+
+ val vmCount = traces.sumOf { (_, vms) -> vms.size }
+ logger.info { "Sampled $vmCount VMs into subset of ${res.size} VMs" }
+
+ return res
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/HpcSampledComputeWorkload.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/HpcSampledComputeWorkload.kt
new file mode 100644
index 00000000..23efb154
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/HpcSampledComputeWorkload.kt
@@ -0,0 +1,143 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.compute.internal
+
+import mu.KotlinLogging
+import org.opendc.experiments.compute.ComputeWorkload
+import org.opendc.experiments.compute.ComputeWorkloadLoader
+import org.opendc.experiments.compute.VirtualMachine
+import java.util.*
+
+/**
+ * A [ComputeWorkload] that samples HPC VMs in the workload.
+ *
+ * @param fraction The fraction of load/virtual machines to sample
+ * @param sampleLoad A flag to indicate that the sampling should be based on the total load or on the number of VMs.
+ */
+internal class HpcSampledComputeWorkload(val source: ComputeWorkload, val fraction: Double, val sampleLoad: Boolean = false) : ComputeWorkload {
+ /**
+ * The logging instance of this class.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
+ * The pattern to match compute nodes in the workload.
+ */
+ private val pattern = Regex("^(ComputeNode|cn).*")
+
+ override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> {
+ val vms = source.resolve(loader, random)
+
+ val (hpc, nonHpc) = vms.partition { entry ->
+ val name = entry.name
+ name.matches(pattern)
+ }
+
+ val hpcSequence = generateSequence(0) { it + 1 }
+ .map { index ->
+ val res = mutableListOf<VirtualMachine>()
+ hpc.mapTo(res) { sample(it, index) }
+ res.shuffle(random)
+ res
+ }
+ .flatten()
+
+ val nonHpcSequence = generateSequence(0) { it + 1 }
+ .map { index ->
+ val res = mutableListOf<VirtualMachine>()
+ nonHpc.mapTo(res) { sample(it, index) }
+ res.shuffle(random)
+ res
+ }
+ .flatten()
+
+ logger.debug { "Found ${hpc.size} HPC workloads and ${nonHpc.size} non-HPC workloads" }
+
+ val totalLoad = vms.sumOf { it.totalLoad }
+
+ logger.debug { "Total trace load: $totalLoad" }
+ var hpcCount = 0
+ var hpcLoad = 0.0
+ var nonHpcCount = 0
+ var nonHpcLoad = 0.0
+
+ val res = mutableListOf<VirtualMachine>()
+
+ if (sampleLoad) {
+ var currentLoad = 0.0
+ for (entry in hpcSequence) {
+ val entryLoad = entry.totalLoad
+ if ((currentLoad + entryLoad) / totalLoad > fraction) {
+ break
+ }
+
+ hpcLoad += entryLoad
+ hpcCount += 1
+ currentLoad += entryLoad
+ res += entry
+ }
+
+ for (entry in nonHpcSequence) {
+ val entryLoad = entry.totalLoad
+ if ((currentLoad + entryLoad) / totalLoad > 1) {
+ break
+ }
+
+ nonHpcLoad += entryLoad
+ nonHpcCount += 1
+ currentLoad += entryLoad
+ res += entry
+ }
+ } else {
+ hpcSequence
+ .take((fraction * vms.size).toInt())
+ .forEach { entry ->
+ hpcLoad += entry.totalLoad
+ hpcCount += 1
+ res.add(entry)
+ }
+
+ nonHpcSequence
+ .take(((1 - fraction) * vms.size).toInt())
+ .forEach { entry ->
+ nonHpcLoad += entry.totalLoad
+ nonHpcCount += 1
+ res.add(entry)
+ }
+ }
+
+ logger.debug { "HPC $hpcCount (load $hpcLoad) and non-HPC $nonHpcCount (load $nonHpcLoad)" }
+ logger.debug { "Total sampled load: ${hpcLoad + nonHpcLoad}" }
+ logger.info { "Sampled ${vms.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }
+
+ return res
+ }
+
+ /**
+ * Sample a random trace entry.
+ */
+ private fun sample(entry: VirtualMachine, i: Int): VirtualMachine {
+ val uid = UUID.nameUUIDFromBytes("${entry.uid}-$i".toByteArray())
+ return entry.copy(uid = uid)
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/LoadSampledComputeWorkload.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/LoadSampledComputeWorkload.kt
new file mode 100644
index 00000000..4663c59e
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/LoadSampledComputeWorkload.kt
@@ -0,0 +1,61 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.compute.internal
+
+import mu.KotlinLogging
+import org.opendc.experiments.compute.ComputeWorkload
+import org.opendc.experiments.compute.ComputeWorkloadLoader
+import org.opendc.experiments.compute.VirtualMachine
+import java.util.*
+
+/**
+ * A [ComputeWorkload] that is sampled based on total load.
+ */
+internal class LoadSampledComputeWorkload(val source: ComputeWorkload, val fraction: Double) : ComputeWorkload {
+ /**
+ * The logging instance of this class.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> {
+ val vms = source.resolve(loader, random)
+ val res = mutableListOf<VirtualMachine>()
+
+ val totalLoad = vms.sumOf { it.totalLoad }
+ var currentLoad = 0.0
+
+ for (entry in vms) {
+ val entryLoad = entry.totalLoad
+ if ((currentLoad + entryLoad) / totalLoad > fraction) {
+ break
+ }
+
+ currentLoad += entryLoad
+ res += entry
+ }
+
+ logger.info { "Sampled ${vms.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }
+
+ return res
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/TraceComputeWorkload.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/TraceComputeWorkload.kt
new file mode 100644
index 00000000..1cfee3bd
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/internal/TraceComputeWorkload.kt
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.compute.internal
+
+import org.opendc.experiments.compute.ComputeWorkload
+import org.opendc.experiments.compute.ComputeWorkloadLoader
+import org.opendc.experiments.compute.VirtualMachine
+import java.util.*
+
+/**
+ * A [ComputeWorkload] from a trace.
+ */
+internal class TraceComputeWorkload(val name: String, val format: String) : ComputeWorkload {
+ override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> {
+ return loader.get(name, format)
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt
new file mode 100644
index 00000000..088f98e9
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt
@@ -0,0 +1,427 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.compute.telemetry
+
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.isActive
+import kotlinx.coroutines.launch
+import mu.KotlinLogging
+import org.opendc.compute.api.Server
+import org.opendc.compute.service.ComputeService
+import org.opendc.compute.service.driver.Host
+import org.opendc.experiments.compute.telemetry.table.*
+import java.time.Clock
+import java.time.Duration
+import java.time.Instant
+
+/**
+ * A helper class to collect metrics from a [ComputeService] instance and automatically export the metrics every
+ * export interval.
+ *
+ * @param scope The [CoroutineScope] to run the reader in.
+ * @param clock The virtual clock.
+ * @param service The [ComputeService] to monitor.
+ * @param monitor The monitor to export the metrics to.
+ * @param exportInterval The export interval.
+ */
+public class ComputeMetricReader(
+ scope: CoroutineScope,
+ clock: Clock,
+ private val service: ComputeService,
+ private val monitor: ComputeMonitor,
+ private val exportInterval: Duration = Duration.ofMinutes(5)
+) : AutoCloseable {
+ private val logger = KotlinLogging.logger {}
+
+ /**
+ * Aggregator for service metrics.
+ */
+ private val serviceTableReader = ServiceTableReaderImpl(service)
+
+ /**
+ * Mapping from [Host] instances to [HostTableReaderImpl]
+ */
+ private val hostTableReaders = mutableMapOf<Host, HostTableReaderImpl>()
+
+ /**
+ * Mapping from [Server] instances to [ServerTableReaderImpl]
+ */
+ private val serverTableReaders = mutableMapOf<Server, ServerTableReaderImpl>()
+
+ /**
+ * The background job that is responsible for collecting the metrics every cycle.
+ */
+ private val job = scope.launch {
+ val intervalMs = exportInterval.toMillis()
+ val service = service
+ val monitor = monitor
+ val hostTableReaders = hostTableReaders
+ val serverTableReaders = serverTableReaders
+ val serviceTableReader = serviceTableReader
+
+ try {
+ while (isActive) {
+ delay(intervalMs)
+
+ try {
+ val now = clock.instant()
+
+ for (host in service.hosts) {
+ val reader = hostTableReaders.computeIfAbsent(host) { HostTableReaderImpl(it) }
+ reader.record(now)
+ monitor.record(reader)
+ reader.reset()
+ }
+
+ for (server in service.servers) {
+ val reader = serverTableReaders.computeIfAbsent(server) { ServerTableReaderImpl(service, it) }
+ reader.record(now)
+ monitor.record(reader)
+ reader.reset()
+ }
+
+ serviceTableReader.record(now)
+ monitor.record(serviceTableReader)
+ } catch (cause: Throwable) {
+ logger.warn(cause) { "Exporter threw an Exception" }
+ }
+ }
+ } finally {
+ if (monitor is AutoCloseable) {
+ monitor.close()
+ }
+ }
+ }
+
+ override fun close() {
+ job.cancel()
+ }
+
+ /**
+ * An aggregator for service metrics before they are reported.
+ */
+ private class ServiceTableReaderImpl(private val service: ComputeService) : ServiceTableReader {
+ private var _timestamp: Instant = Instant.MIN
+ override val timestamp: Instant
+ get() = _timestamp
+
+ override val hostsUp: Int
+ get() = _hostsUp
+ private var _hostsUp = 0
+
+ override val hostsDown: Int
+ get() = _hostsDown
+ private var _hostsDown = 0
+
+ override val serversPending: Int
+ get() = _serversPending
+ private var _serversPending = 0
+
+ override val serversActive: Int
+ get() = _serversActive
+ private var _serversActive = 0
+
+ override val attemptsSuccess: Int
+ get() = _attemptsSuccess
+ private var _attemptsSuccess = 0
+
+ override val attemptsFailure: Int
+ get() = _attemptsFailure
+ private var _attemptsFailure = 0
+
+ override val attemptsError: Int
+ get() = _attemptsError
+ private var _attemptsError = 0
+
+ /**
+ * Record the next cycle.
+ */
+ fun record(now: Instant) {
+ _timestamp = now
+
+ val stats = service.getSchedulerStats()
+ _hostsUp = stats.hostsAvailable
+ _hostsDown = stats.hostsUnavailable
+ _serversPending = stats.serversPending
+ _serversActive = stats.serversActive
+ _attemptsSuccess = stats.attemptsSuccess.toInt()
+ _attemptsFailure = stats.attemptsFailure.toInt()
+ _attemptsError = stats.attemptsError.toInt()
+ }
+ }
+
+ /**
+ * An aggregator for host metrics before they are reported.
+ */
+ private class HostTableReaderImpl(host: Host) : HostTableReader {
+ private val _host = host
+
+ override val host: HostInfo = HostInfo(host.uid.toString(), host.name, "x86", host.model.cpuCount, host.model.memoryCapacity)
+
+ override val timestamp: Instant
+ get() = _timestamp
+ private var _timestamp = Instant.MIN
+
+ override val guestsTerminated: Int
+ get() = _guestsTerminated
+ private var _guestsTerminated = 0
+
+ override val guestsRunning: Int
+ get() = _guestsRunning
+ private var _guestsRunning = 0
+
+ override val guestsError: Int
+ get() = _guestsError
+ private var _guestsError = 0
+
+ override val guestsInvalid: Int
+ get() = _guestsInvalid
+ private var _guestsInvalid = 0
+
+ override val cpuLimit: Double
+ get() = _cpuLimit
+ private var _cpuLimit = 0.0
+
+ override val cpuUsage: Double
+ get() = _cpuUsage
+ private var _cpuUsage = 0.0
+
+ override val cpuDemand: Double
+ get() = _cpuDemand
+ private var _cpuDemand = 0.0
+
+ override val cpuUtilization: Double
+ get() = _cpuUtilization
+ private var _cpuUtilization = 0.0
+
+ override val cpuActiveTime: Long
+ get() = _cpuActiveTime - previousCpuActiveTime
+ private var _cpuActiveTime = 0L
+ private var previousCpuActiveTime = 0L
+
+ override val cpuIdleTime: Long
+ get() = _cpuIdleTime - previousCpuIdleTime
+ private var _cpuIdleTime = 0L
+ private var previousCpuIdleTime = 0L
+
+ override val cpuStealTime: Long
+ get() = _cpuStealTime - previousCpuStealTime
+ private var _cpuStealTime = 0L
+ private var previousCpuStealTime = 0L
+
+ override val cpuLostTime: Long
+ get() = _cpuLostTime - previousCpuLostTime
+ private var _cpuLostTime = 0L
+ private var previousCpuLostTime = 0L
+
+ override val powerUsage: Double
+ get() = _powerUsage
+ private var _powerUsage = 0.0
+
+ override val powerTotal: Double
+ get() = _powerTotal - previousPowerTotal
+ private var _powerTotal = 0.0
+ private var previousPowerTotal = 0.0
+
+ override val uptime: Long
+ get() = _uptime - previousUptime
+ private var _uptime = 0L
+ private var previousUptime = 0L
+
+ override val downtime: Long
+ get() = _downtime - previousDowntime
+ private var _downtime = 0L
+ private var previousDowntime = 0L
+
+ override val bootTime: Instant?
+ get() = _bootTime
+ private var _bootTime: Instant? = null
+
+ /**
+ * Record the next cycle.
+ */
+ fun record(now: Instant) {
+ val hostCpuStats = _host.getCpuStats()
+ val hostSysStats = _host.getSystemStats()
+
+ _timestamp = now
+ _guestsTerminated = hostSysStats.guestsTerminated
+ _guestsRunning = hostSysStats.guestsRunning
+ _guestsError = hostSysStats.guestsError
+ _guestsInvalid = hostSysStats.guestsInvalid
+ _cpuLimit = hostCpuStats.capacity
+ _cpuDemand = hostCpuStats.demand
+ _cpuUsage = hostCpuStats.usage
+ _cpuUtilization = hostCpuStats.utilization
+ _cpuActiveTime = hostCpuStats.activeTime
+ _cpuIdleTime = hostCpuStats.idleTime
+ _cpuStealTime = hostCpuStats.stealTime
+ _cpuLostTime = hostCpuStats.lostTime
+ _powerUsage = hostSysStats.powerUsage
+ _powerTotal = hostSysStats.energyUsage
+ _uptime = hostSysStats.uptime.toMillis()
+ _downtime = hostSysStats.downtime.toMillis()
+ _bootTime = hostSysStats.bootTime
+ }
+
+ /**
+ * Finish the aggregation for this cycle.
+ */
+ fun reset() {
+ // Reset intermediate state for next aggregation
+ previousCpuActiveTime = _cpuActiveTime
+ previousCpuIdleTime = _cpuIdleTime
+ previousCpuStealTime = _cpuStealTime
+ previousCpuLostTime = _cpuLostTime
+ previousPowerTotal = _powerTotal
+ previousUptime = _uptime
+ previousDowntime = _downtime
+
+ _guestsTerminated = 0
+ _guestsRunning = 0
+ _guestsError = 0
+ _guestsInvalid = 0
+
+ _cpuLimit = 0.0
+ _cpuUsage = 0.0
+ _cpuDemand = 0.0
+ _cpuUtilization = 0.0
+
+ _powerUsage = 0.0
+ }
+ }
+
+ /**
+ * An aggregator for server metrics before they are reported.
+ */
+ private class ServerTableReaderImpl(private val service: ComputeService, server: Server) : ServerTableReader {
+ private val _server = server
+
+ /**
+ * The static information about this server.
+ */
+ override val server = ServerInfo(
+ server.uid.toString(),
+ server.name,
+ "vm",
+ "x86",
+ server.image.uid.toString(),
+ server.image.name,
+ server.flavor.cpuCount,
+ server.flavor.memorySize
+ )
+
+ /**
+ * The [HostInfo] of the host on which the server is hosted.
+ */
+ override var host: HostInfo? = null
+ private var _host: Host? = null
+
+ private var _timestamp = Instant.MIN
+ override val timestamp: Instant
+ get() = _timestamp
+
+ override val uptime: Long
+ get() = _uptime - previousUptime
+ private var _uptime: Long = 0
+ private var previousUptime = 0L
+
+ override val downtime: Long
+ get() = _downtime - previousDowntime
+ private var _downtime: Long = 0
+ private var previousDowntime = 0L
+
+ override val provisionTime: Instant?
+ get() = _provisionTime
+ private var _provisionTime: Instant? = null
+
+ override val bootTime: Instant?
+ get() = _bootTime
+ private var _bootTime: Instant? = null
+
+ override val cpuLimit: Double
+ get() = _cpuLimit
+ private var _cpuLimit = 0.0
+
+ override val cpuActiveTime: Long
+ get() = _cpuActiveTime - previousCpuActiveTime
+ private var _cpuActiveTime = 0L
+ private var previousCpuActiveTime = 0L
+
+ override val cpuIdleTime: Long
+ get() = _cpuIdleTime - previousCpuIdleTime
+ private var _cpuIdleTime = 0L
+ private var previousCpuIdleTime = 0L
+
+ override val cpuStealTime: Long
+ get() = _cpuStealTime - previousCpuStealTime
+ private var _cpuStealTime = 0L
+ private var previousCpuStealTime = 0L
+
+ override val cpuLostTime: Long
+ get() = _cpuLostTime - previousCpuLostTime
+ private var _cpuLostTime = 0L
+ private var previousCpuLostTime = 0L
+
+ /**
+ * Record the next cycle.
+ */
+ fun record(now: Instant) {
+ val newHost = service.lookupHost(_server)
+ if (newHost != null && newHost.uid != _host?.uid) {
+ _host = newHost
+ host = HostInfo(newHost.uid.toString(), newHost.name, "x86", newHost.model.cpuCount, newHost.model.memoryCapacity)
+ }
+
+ val cpuStats = _host?.getCpuStats(_server)
+ val sysStats = _host?.getSystemStats(_server)
+
+ _timestamp = now
+ _cpuLimit = cpuStats?.capacity ?: 0.0
+ _cpuActiveTime = cpuStats?.activeTime ?: 0
+ _cpuIdleTime = cpuStats?.idleTime ?: 0
+ _cpuStealTime = cpuStats?.stealTime ?: 0
+ _cpuLostTime = cpuStats?.lostTime ?: 0
+ _uptime = sysStats?.uptime?.toMillis() ?: 0
+ _downtime = sysStats?.downtime?.toMillis() ?: 0
+ _provisionTime = _server.launchedAt
+ _bootTime = sysStats?.bootTime
+ }
+
+ /**
+ * Finish the aggregation for this cycle.
+ */
+ fun reset() {
+ previousUptime = _uptime
+ previousDowntime = _downtime
+ previousCpuActiveTime = _cpuActiveTime
+ previousCpuIdleTime = _cpuIdleTime
+ previousCpuStealTime = _cpuStealTime
+ previousCpuLostTime = _cpuLostTime
+
+ _host = null
+ _cpuLimit = 0.0
+ }
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMonitor.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMonitor.kt
new file mode 100644
index 00000000..ff36bef3
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMonitor.kt
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.compute.telemetry
+
+import org.opendc.experiments.compute.telemetry.table.HostTableReader
+import org.opendc.experiments.compute.telemetry.table.ServerTableReader
+import org.opendc.experiments.compute.telemetry.table.ServiceTableReader
+
+/**
+ * A monitor that tracks the metrics and events of the OpenDC Compute service.
+ */
+public interface ComputeMonitor {
+ /**
+ * Record an entry with the specified [reader].
+ */
+ public fun record(reader: ServerTableReader) {}
+
+ /**
+ * Record an entry with the specified [reader].
+ */
+ public fun record(reader: HostTableReader) {}
+
+ /**
+ * Record an entry with the specified [reader].
+ */
+ public fun record(reader: ServiceTableReader) {}
+}
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeMonitorProvisioningStep.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMonitorProvisioningStep.kt
index 0be4953b..68ca5ae8 100644
--- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeMonitorProvisioningStep.kt
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMonitorProvisioningStep.kt
@@ -20,14 +20,12 @@
* SOFTWARE.
*/
-package org.opendc.experiments.compute
+package org.opendc.experiments.compute.telemetry
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import org.opendc.compute.service.ComputeService
-import org.opendc.compute.workload.telemetry.ComputeMetricReader
-import org.opendc.compute.workload.telemetry.ComputeMonitor
import org.opendc.experiments.provisioner.ProvisioningContext
import org.opendc.experiments.provisioner.ProvisioningStep
import java.time.Duration
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/HostInfo.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/HostInfo.kt
new file mode 100644
index 00000000..84dd7a4f
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/HostInfo.kt
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.compute.telemetry.table
+
+/**
+ * Information about a host exposed to the telemetry service.
+ */
+public data class HostInfo(val id: String, val name: String, val arch: String, val cpuCount: Int, val memCapacity: Long)
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/HostTableReader.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/HostTableReader.kt
new file mode 100644
index 00000000..e6953550
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/HostTableReader.kt
@@ -0,0 +1,125 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.compute.telemetry.table
+
+import java.time.Instant
+
+/**
+ * An interface that is used to read a row of a host trace entry.
+ */
+public interface HostTableReader {
+ /**
+ * The timestamp of the current entry of the reader.
+ */
+ public val timestamp: Instant
+
+ /**
+ * The [HostInfo] of the host to which the row belongs to.
+ */
+ public val host: HostInfo
+
+ /**
+ * The number of guests that are in a terminated state.
+ */
+ public val guestsTerminated: Int
+
+ /**
+ * The number of guests that are in a running state.
+ */
+ public val guestsRunning: Int
+
+ /**
+ * The number of guests that are in an error state.
+ */
+ public val guestsError: Int
+
+ /**
+ * The number of guests that are in an unknown state.
+ */
+ public val guestsInvalid: Int
+
+ /**
+ * The capacity of the CPUs in the host (in MHz).
+ */
+ public val cpuLimit: Double
+
+ /**
+ * The usage of all CPUs in the host (in MHz).
+ */
+ public val cpuUsage: Double
+
+ /**
+ * The demand of all vCPUs of the guests (in MHz)
+ */
+ public val cpuDemand: Double
+
+ /**
+ * The CPU utilization of the host.
+ */
+ public val cpuUtilization: Double
+
+ /**
+ * The duration (in seconds) that a CPU was active in the host.
+ */
+ public val cpuActiveTime: Long
+
+ /**
+ * The duration (in seconds) that a CPU was idle in the host.
+ */
+ public val cpuIdleTime: Long
+
+ /**
+ * The duration (in seconds) that a vCPU wanted to run, but no capacity was available.
+ */
+ public val cpuStealTime: Long
+
+ /**
+ * The duration (in seconds) of CPU time that was lost due to interference.
+ */
+ public val cpuLostTime: Long
+
+ /**
+ * The current power usage of the host in W.
+ */
+ public val powerUsage: Double
+
+ /**
+ * The total power consumption of the host since last time in J.
+ */
+ public val powerTotal: Double
+
+ /**
+ * The uptime of the host since last time in ms.
+ */
+ public val uptime: Long
+
+ /**
+ * The downtime of the host since last time in ms.
+ */
+ public val downtime: Long
+
+ /**
+ * The [Instant] at which the host booted.
+ */
+ public val bootTime: Instant?
+}
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServerInfo.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServerInfo.kt
new file mode 100644
index 00000000..fc360fee
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServerInfo.kt
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.compute.telemetry.table
+
+/**
+ * Static information about a server exposed to the telemetry service.
+ */
+public data class ServerInfo(
+ val id: String,
+ val name: String,
+ val type: String,
+ val arch: String,
+ val imageId: String,
+ val imageName: String,
+ val cpuCount: Int,
+ val memCapacity: Long
+)
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServerTableReader.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServerTableReader.kt
new file mode 100644
index 00000000..c4e2fb4c
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServerTableReader.kt
@@ -0,0 +1,90 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.compute.telemetry.table
+
+import java.time.Instant
+
+/**
+ * An interface that is used to read a row of a server trace entry.
+ */
+public interface ServerTableReader {
+ /**
+ * The timestamp of the current entry of the reader.
+ */
+ public val timestamp: Instant
+
+ /**
+ * The [ServerInfo] of the server to which the row belongs to.
+ */
+ public val server: ServerInfo
+
+ /**
+ * The [HostInfo] of the host on which the server is hosted or `null` if it has no host.
+ */
+ public val host: HostInfo?
+
+ /**
+ * The uptime of the host since last time in ms.
+ */
+ public val uptime: Long
+
+ /**
+ * The downtime of the host since last time in ms.
+ */
+ public val downtime: Long
+
+ /**
+ * The [Instant] at which the server was enqueued for the scheduler.
+ */
+ public val provisionTime: Instant?
+
+ /**
+ * The [Instant] at which the server booted.
+ */
+ public val bootTime: Instant?
+
+ /**
+ * The capacity of the CPUs of the servers (in MHz).
+ */
+ public val cpuLimit: Double
+
+ /**
+ * The duration (in seconds) that a CPU was active in the server.
+ */
+ public val cpuActiveTime: Long
+
+ /**
+ * The duration (in seconds) that a CPU was idle in the server.
+ */
+ public val cpuIdleTime: Long
+
+ /**
+ * The duration (in seconds) that a vCPU wanted to run, but no capacity was available.
+ */
+ public val cpuStealTime: Long
+
+ /**
+ * The duration (in seconds) of CPU time that was lost due to interference.
+ */
+ public val cpuLostTime: Long
+}
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServiceData.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServiceData.kt
new file mode 100644
index 00000000..394c6bd6
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServiceData.kt
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.compute.telemetry.table
+
+import java.time.Instant
+
+/**
+ * A trace entry for the compute service.
+ */
+public data class ServiceData(
+ val timestamp: Instant,
+ val hostsUp: Int,
+ val hostsDown: Int,
+ val serversPending: Int,
+ val serversActive: Int,
+ val attemptsSuccess: Int,
+ val attemptsFailure: Int,
+ val attemptsError: Int
+)
+
+/**
+ * Convert a [ServiceTableReader] into a persistent object.
+ */
+public fun ServiceTableReader.toServiceData(): ServiceData {
+ return ServiceData(timestamp, hostsUp, hostsDown, serversPending, serversActive, attemptsSuccess, attemptsFailure, attemptsError)
+}
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServiceTableReader.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServiceTableReader.kt
new file mode 100644
index 00000000..0155a879
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServiceTableReader.kt
@@ -0,0 +1,70 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.compute.telemetry.table
+
+import java.time.Instant
+
+/**
+ * An interface that is used to read a row of a service trace entry.
+ */
+public interface ServiceTableReader {
+ /**
+ * The timestamp of the current entry of the reader.
+ */
+ public val timestamp: Instant
+
+ /**
+ * The number of hosts that are up at this instant.
+ */
+ public val hostsUp: Int
+
+ /**
+ * The number of hosts that are down at this instant.
+ */
+ public val hostsDown: Int
+
+ /**
+ * The number of servers that are pending to be scheduled.
+ */
+ public val serversPending: Int
+
+ /**
+ * The number of servers that are currently active.
+ */
+ public val serversActive: Int
+
+ /**
+ * The scheduling attempts that were successful.
+ */
+ public val attemptsSuccess: Int
+
+ /**
+ * The scheduling attempts that were unsuccessful due to client error.
+ */
+ public val attemptsFailure: Int
+
+ /**
+ * The scheduling attempts that were unsuccessful due to scheduler error.
+ */
+ public val attemptsError: Int
+}
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/topology/HostSpec.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/topology/HostSpec.kt
new file mode 100644
index 00000000..c0ac0fbf
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/topology/HostSpec.kt
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.compute.topology
+
+import org.opendc.simulator.compute.model.MachineModel
+import org.opendc.simulator.compute.power.PowerDriver
+import org.opendc.simulator.flow.mux.FlowMultiplexerFactory
+import java.util.*
+
+/**
+ * Description of a physical host that will be simulated by OpenDC and host the virtual machines.
+ *
+ * @param uid Unique identifier of the host.
+ * @param name The name of the host.
+ * @param meta The metadata of the host.
+ * @param model The physical model of the machine.
+ * @param powerDriver The [PowerDriver] to model the power consumption of the machine.
+ * @param multiplexerFactory The [FlowMultiplexerFactory] that is used to multiplex the virtual machines over the host.
+ */
+public data class HostSpec(
+ val uid: UUID,
+ val name: String,
+ val meta: Map<String, Any>,
+ val model: MachineModel,
+ val powerDriver: PowerDriver,
+ val multiplexerFactory: FlowMultiplexerFactory = FlowMultiplexerFactory.maxMinMultiplexer()
+)
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/topology/Topology.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/topology/Topology.kt
new file mode 100644
index 00000000..bd29110f
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/topology/Topology.kt
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.compute.topology
+
+/**
+ * Representation of the environment of the compute service, describing the physical details of every host.
+ */
+public interface Topology {
+ /**
+ * Resolve the [Topology] into a list of [HostSpec]s.
+ */
+ public fun resolve(): List<HostSpec>
+}
diff --git a/opendc-experiments/opendc-experiments-compute/src/test/kotlin/org/opendc/experiments/compute/export/parquet/HostDataWriterTest.kt b/opendc-experiments/opendc-experiments-compute/src/test/kotlin/org/opendc/experiments/compute/export/parquet/HostDataWriterTest.kt
new file mode 100644
index 00000000..52b94324
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-compute/src/test/kotlin/org/opendc/experiments/compute/export/parquet/HostDataWriterTest.kt
@@ -0,0 +1,79 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.compute.export.parquet
+
+import org.junit.jupiter.api.AfterEach
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertDoesNotThrow
+import org.opendc.experiments.compute.telemetry.table.HostInfo
+import org.opendc.experiments.compute.telemetry.table.HostTableReader
+import java.nio.file.Files
+import java.time.Instant
+
+/**
+ * Test suite for [ParquetHostDataWriter]
+ */
+class HostDataWriterTest {
+ /**
+ * The path to write the data file to.
+ */
+ private val path = Files.createTempFile("opendc", "parquet")
+
+ /**
+ * The writer used to write the data.
+ */
+ private val writer = ParquetHostDataWriter(path.toFile(), bufferSize = 4096)
+
+ @AfterEach
+ fun tearDown() {
+ writer.close()
+ Files.deleteIfExists(path)
+ }
+
+ @Test
+ fun testSmoke() {
+ assertDoesNotThrow {
+ writer.write(object : HostTableReader {
+ override val timestamp: Instant = Instant.now()
+ override val host: HostInfo = HostInfo("id", "test", "x86", 4, 4096)
+ override val guestsTerminated: Int = 0
+ override val guestsRunning: Int = 0
+ override val guestsError: Int = 0
+ override val guestsInvalid: Int = 0
+ override val cpuLimit: Double = 4096.0
+ override val cpuUsage: Double = 1.0
+ override val cpuDemand: Double = 1.0
+ override val cpuUtilization: Double = 0.0
+ override val cpuActiveTime: Long = 1
+ override val cpuIdleTime: Long = 1
+ override val cpuStealTime: Long = 1
+ override val cpuLostTime: Long = 1
+ override val powerUsage: Double = 1.0
+ override val powerTotal: Double = 1.0
+ override val uptime: Long = 1
+ override val downtime: Long = 1
+ override val bootTime: Instant? = null
+ })
+ }
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-compute/src/test/kotlin/org/opendc/experiments/compute/export/parquet/ServerDataWriterTest.kt b/opendc-experiments/opendc-experiments-compute/src/test/kotlin/org/opendc/experiments/compute/export/parquet/ServerDataWriterTest.kt
new file mode 100644
index 00000000..0ba93173
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-compute/src/test/kotlin/org/opendc/experiments/compute/export/parquet/ServerDataWriterTest.kt
@@ -0,0 +1,73 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.compute.export.parquet
+
+import org.junit.jupiter.api.AfterEach
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertDoesNotThrow
+import org.opendc.experiments.compute.telemetry.table.HostInfo
+import org.opendc.experiments.compute.telemetry.table.ServerInfo
+import org.opendc.experiments.compute.telemetry.table.ServerTableReader
+import java.nio.file.Files
+import java.time.Instant
+
+/**
+ * Test suite for [ParquetServerDataWriter]
+ */
+class ServerDataWriterTest {
+ /**
+ * The path to write the data file to.
+ */
+ private val path = Files.createTempFile("opendc", "parquet")
+
+ /**
+ * The writer used to write the data.
+ */
+ private val writer = ParquetServerDataWriter(path.toFile(), bufferSize = 4096)
+
+ @AfterEach
+ fun tearDown() {
+ writer.close()
+ Files.deleteIfExists(path)
+ }
+
+ @Test
+ fun testSmoke() {
+ assertDoesNotThrow {
+ writer.write(object : ServerTableReader {
+ override val timestamp: Instant = Instant.now()
+ override val server: ServerInfo = ServerInfo("id", "test", "vm", "x86", "test", "test", 2, 4096)
+ override val host: HostInfo = HostInfo("id", "test", "x86", 4, 4096)
+ override val cpuLimit: Double = 4096.0
+ override val cpuActiveTime: Long = 1
+ override val cpuIdleTime: Long = 1
+ override val cpuStealTime: Long = 1
+ override val cpuLostTime: Long = 1
+ override val uptime: Long = 1
+ override val downtime: Long = 1
+ override val provisionTime: Instant = timestamp
+ override val bootTime: Instant? = null
+ })
+ }
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-compute/src/test/kotlin/org/opendc/experiments/compute/export/parquet/ServiceDataWriterTest.kt b/opendc-experiments/opendc-experiments-compute/src/test/kotlin/org/opendc/experiments/compute/export/parquet/ServiceDataWriterTest.kt
new file mode 100644
index 00000000..20301185
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-compute/src/test/kotlin/org/opendc/experiments/compute/export/parquet/ServiceDataWriterTest.kt
@@ -0,0 +1,67 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.compute.export.parquet
+
+import org.junit.jupiter.api.AfterEach
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertDoesNotThrow
+import org.opendc.experiments.compute.telemetry.table.ServiceTableReader
+import java.nio.file.Files
+import java.time.Instant
+
+/**
+ * Test suite for [ParquetServiceDataWriter]
+ */
+class ServiceDataWriterTest {
+ /**
+ * The path to write the data file to.
+ */
+ private val path = Files.createTempFile("opendc", "parquet")
+
+ /**
+ * The writer used to write the data.
+ */
+ private val writer = ParquetServiceDataWriter(path.toFile(), bufferSize = 4096)
+
+ @AfterEach
+ fun tearDown() {
+ writer.close()
+ Files.deleteIfExists(path)
+ }
+
+ @Test
+ fun testSmoke() {
+ assertDoesNotThrow {
+ writer.write(object : ServiceTableReader {
+ override val timestamp: Instant = Instant.now()
+ override val hostsUp: Int = 1
+ override val hostsDown: Int = 0
+ override val serversPending: Int = 1
+ override val serversActive: Int = 1
+ override val attemptsSuccess: Int = 1
+ override val attemptsFailure: Int = 0
+ override val attemptsError: Int = 0
+ })
+ }
+ }
+}