summaryrefslogtreecommitdiff
path: root/opendc-experiments/opendc-experiments-capelin/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-experiments/opendc-experiments-capelin/src/main')
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt28
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt10
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt18
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt10
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt10
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt54
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt3
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt3
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt120
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/EnvironmentReader.kt35
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/AvroUtils.kt44
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt145
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt67
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt101
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt95
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt65
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt23
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/ClusterSpec.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/MachineDef.kt)30
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/ClusterSpecReader.kt121
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt103
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt62
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReader.kt60
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt139
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt261
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt260
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceEntry.kt44
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceReader.kt32
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt198
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTable.kt127
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTableReader.kt149
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTable.kt54
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTableReader.kt169
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTrace.kt46
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTraceFormat.kt56
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt53
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTableReader.kt103
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt53
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt103
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTrace.kt49
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTraceFormat.kt47
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/Schemas.kt55
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt138
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt212
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTrace.kt45
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTraceFormat.kt47
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt222
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt38
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt97
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/VmPlacementReader.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt)2
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml2
50 files changed, 316 insertions, 3692 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt
index faabe5cb..31e8f961 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt
@@ -22,7 +22,8 @@
package org.opendc.experiments.capelin
-import org.opendc.experiments.capelin.model.CompositeWorkload
+import org.opendc.compute.workload.composite
+import org.opendc.compute.workload.trace
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
@@ -42,30 +43,25 @@ public class CompositeWorkloadPortfolio : Portfolio("composite-workload") {
)
override val workload: Workload by anyOf(
- CompositeWorkload(
+ Workload(
"all-azure",
- listOf(Workload("solvinity-short", 0.0), Workload("azure", 1.0)),
- totalSampleLoad
+ composite(trace("solvinity-short") to 0.0, trace("azure") to 1.0)
),
- CompositeWorkload(
+ Workload(
"solvinity-25-azure-75",
- listOf(Workload("solvinity-short", 0.25), Workload("azure", 0.75)),
- totalSampleLoad
+ composite(trace("solvinity-short") to 0.25, trace("azure") to 0.75)
),
- CompositeWorkload(
+ Workload(
"solvinity-50-azure-50",
- listOf(Workload("solvinity-short", 0.5), Workload("azure", 0.5)),
- totalSampleLoad
+ composite(trace("solvinity-short") to 0.5, trace("azure") to 0.5)
),
- CompositeWorkload(
+ Workload(
"solvinity-75-azure-25",
- listOf(Workload("solvinity-short", 0.75), Workload("azure", 0.25)),
- totalSampleLoad
+ composite(trace("solvinity-short") to 0.75, trace("azure") to 0.25)
),
- CompositeWorkload(
+ Workload(
"all-solvinity",
- listOf(Workload("solvinity-short", 1.0), Workload("azure", 0.0)),
- totalSampleLoad
+ composite(trace("solvinity-short") to 1.0, trace("azure") to 0.0)
)
)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt
index e1cf8517..cd093e6c 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt
@@ -22,6 +22,8 @@
package org.opendc.experiments.capelin
+import org.opendc.compute.workload.sampleByLoad
+import org.opendc.compute.workload.trace
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
@@ -44,10 +46,10 @@ public class HorVerPortfolio : Portfolio("horizontal_vs_vertical") {
)
override val workload: Workload by anyOf(
- Workload("solvinity", 0.1),
- Workload("solvinity", 0.25),
- Workload("solvinity", 0.5),
- Workload("solvinity", 1.0)
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.1)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.25)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.5)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(1.0))
)
override val operationalPhenomena: OperationalPhenomena by anyOf(
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt
index a995e467..73e59a58 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt
@@ -22,8 +22,10 @@
package org.opendc.experiments.capelin
+import org.opendc.compute.workload.sampleByHpc
+import org.opendc.compute.workload.sampleByHpcLoad
+import org.opendc.compute.workload.trace
import org.opendc.experiments.capelin.model.OperationalPhenomena
-import org.opendc.experiments.capelin.model.SamplingStrategy
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
import org.opendc.harness.dsl.anyOf
@@ -40,13 +42,13 @@ public class MoreHpcPortfolio : Portfolio("more_hpc") {
)
override val workload: Workload by anyOf(
- Workload("solvinity", 0.0, samplingStrategy = SamplingStrategy.HPC),
- Workload("solvinity", 0.25, samplingStrategy = SamplingStrategy.HPC),
- Workload("solvinity", 0.5, samplingStrategy = SamplingStrategy.HPC),
- Workload("solvinity", 1.0, samplingStrategy = SamplingStrategy.HPC),
- Workload("solvinity", 0.25, samplingStrategy = SamplingStrategy.HPC_LOAD),
- Workload("solvinity", 0.5, samplingStrategy = SamplingStrategy.HPC_LOAD),
- Workload("solvinity", 1.0, samplingStrategy = SamplingStrategy.HPC_LOAD)
+ Workload("solvinity", trace("solvinity").sampleByHpc(0.0)),
+ Workload("solvinity", trace("solvinity").sampleByHpc(0.25)),
+ Workload("solvinity", trace("solvinity").sampleByHpc(0.5)),
+ Workload("solvinity", trace("solvinity").sampleByHpc(1.0)),
+ Workload("solvinity", trace("solvinity").sampleByHpcLoad(0.25)),
+ Workload("solvinity", trace("solvinity").sampleByHpcLoad(0.5)),
+ Workload("solvinity", trace("solvinity").sampleByHpcLoad(1.0))
)
override val operationalPhenomena: OperationalPhenomena by anyOf(
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt
index 49559e0e..9d5717bb 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt
@@ -22,6 +22,8 @@
package org.opendc.experiments.capelin
+import org.opendc.compute.workload.sampleByLoad
+import org.opendc.compute.workload.trace
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
@@ -40,10 +42,10 @@ public class MoreVelocityPortfolio : Portfolio("more_velocity") {
)
override val workload: Workload by anyOf(
- Workload("solvinity", 0.1),
- Workload("solvinity", 0.25),
- Workload("solvinity", 0.5),
- Workload("solvinity", 1.0)
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.1)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.25)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.5)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(1.0))
)
override val operationalPhenomena: OperationalPhenomena by anyOf(
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt
index 1aac4f9e..7ab586b3 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt
@@ -22,6 +22,8 @@
package org.opendc.experiments.capelin
+import org.opendc.compute.workload.sampleByLoad
+import org.opendc.compute.workload.trace
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
@@ -36,10 +38,10 @@ public class OperationalPhenomenaPortfolio : Portfolio("operational_phenomena")
)
override val workload: Workload by anyOf(
- Workload("solvinity", 0.1),
- Workload("solvinity", 0.25),
- Workload("solvinity", 0.5),
- Workload("solvinity", 1.0)
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.1)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.25)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.5)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(1.0))
)
override val operationalPhenomena: OperationalPhenomena by anyOf(
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index 6261ebbf..630b76c4 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -24,16 +24,16 @@ package org.opendc.experiments.capelin
import com.typesafe.config.ConfigFactory
import mu.KotlinLogging
-import org.opendc.experiments.capelin.env.ClusterEnvironmentReader
-import org.opendc.experiments.capelin.export.parquet.ParquetExportMonitor
-import org.opendc.experiments.capelin.model.CompositeWorkload
+import org.opendc.compute.workload.ComputeWorkloadLoader
+import org.opendc.compute.workload.ComputeWorkloadRunner
+import org.opendc.compute.workload.export.parquet.ParquetExportMonitor
+import org.opendc.compute.workload.grid5000
+import org.opendc.compute.workload.topology.apply
+import org.opendc.compute.workload.util.PerformanceInterferenceReader
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
-import org.opendc.experiments.capelin.trace.ParquetTraceReader
-import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader
-import org.opendc.experiments.capelin.trace.RawParquetTraceReader
-import org.opendc.experiments.capelin.util.ComputeServiceSimulator
+import org.opendc.experiments.capelin.topology.clusterTopology
import org.opendc.experiments.capelin.util.createComputeScheduler
import org.opendc.harness.dsl.Experiment
import org.opendc.harness.dsl.anyOf
@@ -43,10 +43,8 @@ import org.opendc.telemetry.compute.ComputeMetricExporter
import org.opendc.telemetry.compute.collectServiceMetrics
import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import java.io.File
-import java.io.FileInputStream
import java.time.Duration
import java.util.*
-import java.util.concurrent.ConcurrentHashMap
import kotlin.math.roundToLong
/**
@@ -91,33 +89,19 @@ abstract class Portfolio(name: String) : Experiment(name) {
abstract val allocationPolicy: String
/**
- * A map of trace readers.
+ * A helper class to load workload traces.
*/
- private val traceReaders = ConcurrentHashMap<String, RawParquetTraceReader>()
+ private val workloadLoader = ComputeWorkloadLoader(File(config.getString("trace-path")))
/**
* Perform a single trial for this portfolio.
*/
override fun doRun(repeat: Int): Unit = runBlockingSimulation {
val seeder = Random(repeat.toLong())
- val environment = ClusterEnvironmentReader(File(config.getString("env-path"), "${topology.name}.txt"))
- val workload = workload
- val workloadNames = if (workload is CompositeWorkload) {
- workload.workloads.map { it.name }
- } else {
- listOf(workload.name)
- }
- val rawReaders = workloadNames.map { workloadName ->
- traceReaders.computeIfAbsent(workloadName) {
- logger.info { "Loading trace $workloadName" }
- RawParquetTraceReader(File(config.getString("trace-path"), workloadName))
- }
- }
- val trace = ParquetTraceReader(rawReaders, workload, seeder.nextInt())
val performanceInterferenceModel = if (operationalPhenomena.hasInterference)
PerformanceInterferenceReader()
- .read(FileInputStream(config.getString("interference-model")))
+ .read(File(config.getString("interference-model")))
.let { VmInterferenceModel(it, Random(seeder.nextLong())) }
else
null
@@ -125,14 +109,13 @@ abstract class Portfolio(name: String) : Experiment(name) {
val computeScheduler = createComputeScheduler(allocationPolicy, seeder, vmPlacements)
val failureModel =
if (operationalPhenomena.failureFrequency > 0)
- grid5000(Duration.ofSeconds((operationalPhenomena.failureFrequency * 60).roundToLong()), seeder.nextInt())
+ grid5000(Duration.ofSeconds((operationalPhenomena.failureFrequency * 60).roundToLong()))
else
null
- val simulator = ComputeServiceSimulator(
+ val runner = ComputeWorkloadRunner(
coroutineContext,
clock,
computeScheduler,
- environment.read(),
failureModel,
performanceInterferenceModel
)
@@ -142,17 +125,22 @@ abstract class Portfolio(name: String) : Experiment(name) {
"portfolio_id=$name/scenario_id=$id/run_id=$repeat",
4096
)
- val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
+ val metricReader = CoroutineMetricReader(this, runner.producers, ComputeMetricExporter(clock, monitor))
+ val topology = clusterTopology(File(config.getString("env-path"), "${topology.name}.txt"))
try {
- simulator.run(trace)
+ // Instantiate the desired topology
+ runner.apply(topology)
+
+ // Run the workload trace
+ runner.run(workload.source.resolve(workloadLoader, seeder), seeder.nextLong())
} finally {
- simulator.close()
+ runner.close()
metricReader.close()
monitor.close()
}
- val monitorResults = collectServiceMetrics(clock.instant(), simulator.producers[0])
+ val monitorResults = collectServiceMetrics(clock.instant(), runner.producers[0])
logger.debug {
"Scheduler " +
"Success=${monitorResults.attemptsSuccess} " +
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt
index b6d3b30c..17ec48d4 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt
@@ -22,6 +22,7 @@
package org.opendc.experiments.capelin
+import org.opendc.compute.workload.trace
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
@@ -36,7 +37,7 @@ public class ReplayPortfolio : Portfolio("replay") {
)
override val workload: Workload by anyOf(
- Workload("solvinity", 1.0)
+ Workload("solvinity", trace("solvinity"))
)
override val operationalPhenomena: OperationalPhenomena by anyOf(
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt
index 90840db8..98eb989d 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt
@@ -22,6 +22,7 @@
package org.opendc.experiments.capelin
+import org.opendc.compute.workload.trace
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
@@ -36,7 +37,7 @@ public class TestPortfolio : Portfolio("test") {
)
override val workload: Workload by anyOf(
- Workload("solvinity", 1.0)
+ Workload("solvinity", trace("solvinity"))
)
override val operationalPhenomena: OperationalPhenomena by anyOf(
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt
deleted file mode 100644
index babd8ada..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.env
-
-import org.opendc.simulator.compute.model.MachineModel
-import org.opendc.simulator.compute.model.MemoryUnit
-import org.opendc.simulator.compute.model.ProcessingNode
-import org.opendc.simulator.compute.model.ProcessingUnit
-import org.opendc.simulator.compute.power.LinearPowerModel
-import java.io.File
-import java.io.FileInputStream
-import java.io.InputStream
-import java.util.*
-
-/**
- * A [EnvironmentReader] for the internal environment format.
- *
- * @param input The input stream describing the physical cluster.
- */
-class ClusterEnvironmentReader(private val input: InputStream) : EnvironmentReader {
- /**
- * Construct a [ClusterEnvironmentReader] for the specified [file].
- */
- constructor(file: File) : this(FileInputStream(file))
-
- override fun read(): List<MachineDef> {
- var clusterIdCol = 0
- var speedCol = 0
- var numberOfHostsCol = 0
- var memoryPerHostCol = 0
- var coresPerHostCol = 0
-
- var clusterIdx = 0
- var clusterId: String
- var speed: Double
- var numberOfHosts: Int
- var memoryPerHost: Long
- var coresPerHost: Int
-
- val nodes = mutableListOf<MachineDef>()
- val random = Random(0)
-
- input.bufferedReader().use { reader ->
- reader.lineSequence()
- .filter { line ->
- // Ignore comments in the file
- !line.startsWith("#") && line.isNotBlank()
- }
- .forEachIndexed { idx, line ->
- val values = line.split(";")
-
- if (idx == 0) {
- val header = values.mapIndexed { col, name -> Pair(name.trim(), col) }.toMap()
- clusterIdCol = header["ClusterID"]!!
- speedCol = header["Speed"]!!
- numberOfHostsCol = header["numberOfHosts"]!!
- memoryPerHostCol = header["memoryCapacityPerHost"]!!
- coresPerHostCol = header["coreCountPerHost"]!!
- return@forEachIndexed
- }
-
- clusterIdx++
- clusterId = values[clusterIdCol].trim()
- speed = values[speedCol].trim().toDouble() * 1000.0
- numberOfHosts = values[numberOfHostsCol].trim().toInt()
- memoryPerHost = values[memoryPerHostCol].trim().toLong() * 1000L
- coresPerHost = values[coresPerHostCol].trim().toInt()
-
- val unknownProcessingNode = ProcessingNode("unknown", "unknown", "unknown", coresPerHost)
- val unknownMemoryUnit = MemoryUnit("unknown", "unknown", -1.0, memoryPerHost)
-
- repeat(numberOfHosts) {
- nodes.add(
- MachineDef(
- UUID(random.nextLong(), random.nextLong()),
- "node-$clusterId-$it",
- mapOf("cluster" to clusterId),
- MachineModel(
- List(coresPerHost) { coreId ->
- ProcessingUnit(unknownProcessingNode, coreId, speed)
- },
- listOf(unknownMemoryUnit)
- ),
- // For now we assume a simple linear load model with an idle draw of ~200W and a maximum
- // power draw of 350W.
- // Source: https://stackoverflow.com/questions/6128960
- LinearPowerModel(350.0, idlePower = 200.0)
- )
- )
- }
- }
- }
-
- return nodes
- }
-
- override fun close() {
- input.close()
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/EnvironmentReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/EnvironmentReader.kt
deleted file mode 100644
index a968b043..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/EnvironmentReader.kt
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.env
-
-import java.io.Closeable
-
-/**
- * An interface for reading descriptions of topology environments into memory.
- */
-public interface EnvironmentReader : Closeable {
- /**
- * Read the environment into a list.
- */
- public fun read(): List<MachineDef>
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/AvroUtils.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/AvroUtils.kt
deleted file mode 100644
index a4676f31..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/AvroUtils.kt
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-@file:JvmName("AvroUtils")
-package org.opendc.experiments.capelin.export.parquet
-
-import org.apache.avro.LogicalTypes
-import org.apache.avro.Schema
-
-/**
- * Schema for UUID type.
- */
-internal val UUID_SCHEMA = LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING))
-
-/**
- * Schema for timestamp type.
- */
-internal val TIMESTAMP_SCHEMA = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG))
-
-/**
- * Helper function to make a [Schema] field optional.
- */
-internal fun Schema.optional(): Schema {
- return Schema.createUnion(Schema.create(Schema.Type.NULL), this)
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt
deleted file mode 100644
index e3d15c3b..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.export.parquet
-
-import mu.KotlinLogging
-import org.apache.avro.Schema
-import org.apache.avro.generic.GenericData
-import org.apache.avro.generic.GenericRecordBuilder
-import org.apache.parquet.avro.AvroParquetWriter
-import org.apache.parquet.hadoop.ParquetFileWriter
-import org.apache.parquet.hadoop.ParquetWriter
-import org.apache.parquet.hadoop.metadata.CompressionCodecName
-import org.opendc.trace.util.parquet.LocalOutputFile
-import java.io.File
-import java.util.concurrent.ArrayBlockingQueue
-import java.util.concurrent.BlockingQueue
-import kotlin.concurrent.thread
-
-/**
- * A writer that writes data in Parquet format.
- */
-abstract class ParquetDataWriter<in T>(
- path: File,
- private val schema: Schema,
- bufferSize: Int = 4096
-) : AutoCloseable {
- /**
- * The logging instance to use.
- */
- private val logger = KotlinLogging.logger {}
-
- /**
- * The queue of commands to process.
- */
- private val queue: BlockingQueue<T> = ArrayBlockingQueue(bufferSize)
-
- /**
- * An exception to be propagated to the actual writer.
- */
- private var exception: Throwable? = null
-
- /**
- * The thread that is responsible for writing the Parquet records.
- */
- private val writerThread = thread(start = false, name = this.toString()) {
- val writer = let {
- val builder = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(path))
- .withSchema(schema)
- .withCompressionCodec(CompressionCodecName.ZSTD)
- .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
- buildWriter(builder)
- }
-
- val queue = queue
- val buf = mutableListOf<T>()
- var shouldStop = false
-
- try {
- while (!shouldStop) {
- try {
- process(writer, queue.take())
- } catch (e: InterruptedException) {
- shouldStop = true
- }
-
- if (queue.drainTo(buf) > 0) {
- for (data in buf) {
- process(writer, data)
- }
- buf.clear()
- }
- }
- } catch (e: Throwable) {
- logger.error(e) { "Failure in Parquet data writer" }
- exception = e
- } finally {
- writer.close()
- }
- }
-
- /**
- * Build the [ParquetWriter] used to write the Parquet files.
- */
- protected open fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> {
- return builder.build()
- }
-
- /**
- * Convert the specified [data] into a Parquet record.
- */
- protected abstract fun convert(builder: GenericRecordBuilder, data: T)
-
- /**
- * Write the specified metrics to the database.
- */
- fun write(data: T) {
- val exception = exception
- if (exception != null) {
- throw IllegalStateException("Writer thread failed", exception)
- }
-
- queue.put(data)
- }
-
- /**
- * Signal the writer to stop.
- */
- override fun close() {
- writerThread.interrupt()
- writerThread.join()
- }
-
- init {
- writerThread.start()
- }
-
- /**
- * Process the specified [data] to be written to the Parquet file.
- */
- private fun process(writer: ParquetWriter<GenericData.Record>, data: T) {
- val builder = GenericRecordBuilder(schema)
- convert(builder, data)
- writer.write(builder.build())
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt
deleted file mode 100644
index b057e932..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.export.parquet
-
-import org.opendc.telemetry.compute.ComputeMonitor
-import org.opendc.telemetry.compute.table.HostData
-import org.opendc.telemetry.compute.table.ServerData
-import org.opendc.telemetry.compute.table.ServiceData
-import java.io.File
-
-/**
- * A [ComputeMonitor] that logs the events to a Parquet file.
- */
-class ParquetExportMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable {
- private val serverWriter = ParquetServerDataWriter(
- File(base, "server/$partition/data.parquet").also { it.parentFile.mkdirs() },
- bufferSize
- )
-
- private val hostWriter = ParquetHostDataWriter(
- File(base, "host/$partition/data.parquet").also { it.parentFile.mkdirs() },
- bufferSize
- )
-
- private val serviceWriter = ParquetServiceDataWriter(
- File(base, "service/$partition/data.parquet").also { it.parentFile.mkdirs() },
- bufferSize
- )
-
- override fun record(data: ServerData) {
- serverWriter.write(data)
- }
-
- override fun record(data: HostData) {
- hostWriter.write(data)
- }
-
- override fun record(data: ServiceData) {
- serviceWriter.write(data)
- }
-
- override fun close() {
- hostWriter.close()
- serviceWriter.close()
- serverWriter.close()
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt
deleted file mode 100644
index 58388cb1..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.export.parquet
-
-import org.apache.avro.Schema
-import org.apache.avro.SchemaBuilder
-import org.apache.avro.generic.GenericData
-import org.apache.avro.generic.GenericRecordBuilder
-import org.apache.parquet.avro.AvroParquetWriter
-import org.apache.parquet.hadoop.ParquetWriter
-import org.opendc.telemetry.compute.table.HostData
-import java.io.File
-
-/**
- * A Parquet event writer for [HostData]s.
- */
-public class ParquetHostDataWriter(path: File, bufferSize: Int) :
- ParquetDataWriter<HostData>(path, SCHEMA, bufferSize) {
-
- override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> {
- return builder
- .withDictionaryEncoding("host_id", true)
- .build()
- }
-
- override fun convert(builder: GenericRecordBuilder, data: HostData) {
- builder["timestamp"] = data.timestamp.toEpochMilli()
-
- builder["host_id"] = data.host.id
-
- builder["uptime"] = data.uptime
- builder["downtime"] = data.downtime
- val bootTime = data.bootTime
- if (bootTime != null) {
- builder["boot_time"] = bootTime.toEpochMilli()
- }
-
- builder["cpu_count"] = data.host.cpuCount
- builder["cpu_limit"] = data.cpuLimit
- builder["cpu_time_active"] = data.cpuActiveTime
- builder["cpu_time_idle"] = data.cpuIdleTime
- builder["cpu_time_steal"] = data.cpuStealTime
- builder["cpu_time_lost"] = data.cpuLostTime
-
- builder["mem_limit"] = data.host.memCapacity
-
- builder["power_total"] = data.powerTotal
-
- builder["guests_terminated"] = data.guestsTerminated
- builder["guests_running"] = data.guestsRunning
- builder["guests_error"] = data.guestsError
- builder["guests_invalid"] = data.guestsInvalid
- }
-
- override fun toString(): String = "host-writer"
-
- companion object {
- private val SCHEMA: Schema = SchemaBuilder
- .record("host")
- .namespace("org.opendc.telemetry.compute")
- .fields()
- .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault()
- .name("host_id").type(UUID_SCHEMA).noDefault()
- .requiredLong("uptime")
- .requiredLong("downtime")
- .name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault()
- .requiredInt("cpu_count")
- .requiredDouble("cpu_limit")
- .requiredLong("cpu_time_active")
- .requiredLong("cpu_time_idle")
- .requiredLong("cpu_time_steal")
- .requiredLong("cpu_time_lost")
- .requiredLong("mem_limit")
- .requiredDouble("power_total")
- .requiredInt("guests_terminated")
- .requiredInt("guests_running")
- .requiredInt("guests_error")
- .requiredInt("guests_invalid")
- .endRecord()
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt
deleted file mode 100644
index 43b5f469..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.export.parquet
-
-import org.apache.avro.Schema
-import org.apache.avro.SchemaBuilder
-import org.apache.avro.generic.GenericData
-import org.apache.avro.generic.GenericRecordBuilder
-import org.apache.parquet.avro.AvroParquetWriter
-import org.apache.parquet.hadoop.ParquetWriter
-import org.opendc.telemetry.compute.table.ServerData
-import java.io.File
-import java.util.*
-
-/**
- * A Parquet event writer for [ServerData]s.
- */
-public class ParquetServerDataWriter(path: File, bufferSize: Int) :
- ParquetDataWriter<ServerData>(path, SCHEMA, bufferSize) {
-
- override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> {
- return builder
- .withDictionaryEncoding("server_id", true)
- .withDictionaryEncoding("host_id", true)
- .build()
- }
-
- override fun convert(builder: GenericRecordBuilder, data: ServerData) {
- builder["timestamp"] = data.timestamp.toEpochMilli()
-
- builder["server_id"] = data.server.id
- builder["host_id"] = data.host?.id
-
- builder["uptime"] = data.uptime
- builder["downtime"] = data.downtime
- val bootTime = data.bootTime
- if (bootTime != null) {
- builder["boot_time"] = bootTime.toEpochMilli()
- }
- builder["scheduling_latency"] = data.schedulingLatency
-
- builder["cpu_count"] = data.server.cpuCount
- builder["cpu_limit"] = data.cpuLimit
- builder["cpu_time_active"] = data.cpuActiveTime
- builder["cpu_time_idle"] = data.cpuIdleTime
- builder["cpu_time_steal"] = data.cpuStealTime
- builder["cpu_time_lost"] = data.cpuLostTime
-
- builder["mem_limit"] = data.server.memCapacity
- }
-
- override fun toString(): String = "server-writer"
-
- companion object {
- private val SCHEMA: Schema = SchemaBuilder
- .record("server")
- .namespace("org.opendc.telemetry.compute")
- .fields()
- .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault()
- .name("server_id").type(UUID_SCHEMA).noDefault()
- .name("host_id").type(UUID_SCHEMA.optional()).noDefault()
- .requiredLong("uptime")
- .requiredLong("downtime")
- .name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault()
- .requiredLong("scheduling_latency")
- .requiredInt("cpu_count")
- .requiredDouble("cpu_limit")
- .requiredLong("cpu_time_active")
- .requiredLong("cpu_time_idle")
- .requiredLong("cpu_time_steal")
- .requiredLong("cpu_time_lost")
- .requiredLong("mem_limit")
- .endRecord()
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt
deleted file mode 100644
index 2928f445..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.export.parquet
-
-import org.apache.avro.Schema
-import org.apache.avro.SchemaBuilder
-import org.apache.avro.generic.GenericRecordBuilder
-import org.opendc.telemetry.compute.table.ServiceData
-import java.io.File
-
-/**
- * A Parquet event writer for [ServiceData]s.
- */
-public class ParquetServiceDataWriter(path: File, bufferSize: Int) :
- ParquetDataWriter<ServiceData>(path, SCHEMA, bufferSize) {
-
- override fun convert(builder: GenericRecordBuilder, data: ServiceData) {
- builder["timestamp"] = data.timestamp.toEpochMilli()
- builder["hosts_up"] = data.hostsUp
- builder["hosts_down"] = data.hostsDown
- builder["servers_pending"] = data.serversPending
- builder["servers_active"] = data.serversActive
- builder["attempts_success"] = data.attemptsSuccess
- builder["attempts_failure"] = data.attemptsFailure
- builder["attempts_error"] = data.attemptsError
- }
-
- override fun toString(): String = "service-writer"
-
- companion object {
- private val SCHEMA: Schema = SchemaBuilder
- .record("service")
- .namespace("org.opendc.telemetry.compute")
- .fields()
- .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault()
- .requiredInt("hosts_up")
- .requiredInt("hosts_down")
- .requiredInt("servers_pending")
- .requiredInt("servers_active")
- .requiredInt("attempts_success")
- .requiredInt("attempts_failure")
- .requiredInt("attempts_error")
- .endRecord()
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt
index c4ddd158..a2e71243 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt
@@ -22,23 +22,12 @@
package org.opendc.experiments.capelin.model
-public enum class SamplingStrategy {
- REGULAR,
- HPC,
- HPC_LOAD
-}
+import org.opendc.compute.workload.ComputeWorkload
/**
- * A workload that is considered for a scenario.
- */
-public open class Workload(
- public open val name: String,
- public val fraction: Double,
- public val samplingStrategy: SamplingStrategy = SamplingStrategy.REGULAR
-)
-
-/**
- * A workload that is composed of multiple workloads.
+ * A single workload originating from a trace.
+ *
+ * @param name the name of the workload.
+ * @param source The source of the workload data.
*/
-public class CompositeWorkload(override val name: String, public val workloads: List<Workload>, public val totalLoad: Double) :
- Workload(name, -1.0)
+data class Workload(val name: String, val source: ComputeWorkload)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/MachineDef.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/ClusterSpec.kt
index b0c0318f..b8b65d28 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/MachineDef.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/ClusterSpec.kt
@@ -20,19 +20,27 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.env
-
-import org.opendc.simulator.compute.model.MachineModel
-import org.opendc.simulator.compute.power.PowerModel
-import java.util.*
+package org.opendc.experiments.capelin.topology
/**
- * A definition of a machine in a cluster.
+ * Definition of a compute cluster modeled in the simulation.
+ *
+ * @param id A unique identifier representing the compute cluster.
+ * @param name The name of the cluster.
+ * @param cpuCount The total number of CPUs in the cluster.
+ * @param cpuSpeed The speed of a CPU in the cluster in MHz.
+ * @param memCapacity The total memory capacity of the cluster (in MiB).
+ * @param hostCount The number of hosts in the cluster.
+ * @param memCapacityPerHost The memory capacity per host in the cluster (MiB).
+ * @param cpuCountPerHost The number of CPUs per host in the cluster.
*/
-public data class MachineDef(
- val uid: UUID,
+public data class ClusterSpec(
+ val id: String,
val name: String,
- val meta: Map<String, Any>,
- val model: MachineModel,
- val powerModel: PowerModel
+ val cpuCount: Int,
+ val cpuSpeed: Double,
+ val memCapacity: Double,
+ val hostCount: Int,
+ val memCapacityPerHost: Double,
+ val cpuCountPerHost: Int
)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/ClusterSpecReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/ClusterSpecReader.kt
new file mode 100644
index 00000000..5a175f2c
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/ClusterSpecReader.kt
@@ -0,0 +1,121 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.topology
+
+import com.fasterxml.jackson.annotation.JsonProperty
+import com.fasterxml.jackson.databind.MappingIterator
+import com.fasterxml.jackson.databind.ObjectReader
+import com.fasterxml.jackson.dataformat.csv.CsvMapper
+import com.fasterxml.jackson.dataformat.csv.CsvSchema
+import java.io.File
+import java.io.InputStream
+
+/**
+ * A helper class for reading a cluster specification file.
+ */
+class ClusterSpecReader {
+ /**
+ * The [CsvMapper] to map the environment file to an object.
+ */
+ private val mapper = CsvMapper()
+
+ /**
+ * The [ObjectReader] to convert the lines into objects.
+ */
+ private val reader: ObjectReader = mapper.readerFor(Entry::class.java).with(schema)
+
+ /**
+ * Read the specified [file].
+ */
+ fun read(file: File): List<ClusterSpec> {
+ return reader.readValues<Entry>(file).use { read(it) }
+ }
+
+ /**
+ * Read the specified [input].
+ */
+ fun read(input: InputStream): List<ClusterSpec> {
+ return reader.readValues<Entry>(input).use { read(it) }
+ }
+
+ /**
+ * Convert the specified [MappingIterator] into a list of [ClusterSpec]s.
+ */
+ private fun read(it: MappingIterator<Entry>): List<ClusterSpec> {
+ val result = mutableListOf<ClusterSpec>()
+
+ for (entry in it) {
+ val def = ClusterSpec(
+ entry.id,
+ entry.name,
+ entry.cpuCount,
+ entry.cpuSpeed * 1000, // Convert to MHz
+ entry.memCapacity * 1000, // Convert to MiB
+ entry.hostCount,
+ entry.memCapacityPerHost * 1000,
+ entry.cpuCountPerHost
+ )
+ result.add(def)
+ }
+
+ return result
+ }
+
+ private open class Entry(
+ @JsonProperty("ClusterID")
+ val id: String,
+ @JsonProperty("ClusterName")
+ val name: String,
+ @JsonProperty("Cores")
+ val cpuCount: Int,
+ @JsonProperty("Speed")
+ val cpuSpeed: Double,
+ @JsonProperty("Memory")
+ val memCapacity: Double,
+ @JsonProperty("numberOfHosts")
+ val hostCount: Int,
+ @JsonProperty("memoryCapacityPerHost")
+ val memCapacityPerHost: Double,
+ @JsonProperty("coreCountPerHost")
+ val cpuCountPerHost: Int
+ )
+
+ companion object {
+ /**
+ * The [CsvSchema] that is used to parse the trace.
+ */
+ private val schema = CsvSchema.builder()
+ .addColumn("ClusterID", CsvSchema.ColumnType.STRING)
+ .addColumn("ClusterName", CsvSchema.ColumnType.STRING)
+ .addColumn("Cores", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Speed", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Memory", CsvSchema.ColumnType.NUMBER)
+ .addColumn("numberOfHosts", CsvSchema.ColumnType.NUMBER)
+ .addColumn("memoryCapacityPerHost", CsvSchema.ColumnType.NUMBER)
+ .addColumn("coreCountPerHost", CsvSchema.ColumnType.NUMBER)
+ .setAllowComments(true)
+ .setColumnSeparator(';')
+ .setUseHeader(true)
+ .build()
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt
new file mode 100644
index 00000000..5ab4261a
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/topology/TopologyFactories.kt
@@ -0,0 +1,103 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+@file:JvmName("TopologyFactories")
+package org.opendc.experiments.capelin.topology
+
+import org.opendc.compute.workload.topology.HostSpec
+import org.opendc.compute.workload.topology.Topology
+import org.opendc.simulator.compute.model.MachineModel
+import org.opendc.simulator.compute.model.MemoryUnit
+import org.opendc.simulator.compute.model.ProcessingNode
+import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.power.LinearPowerModel
+import org.opendc.simulator.compute.power.PowerModel
+import org.opendc.simulator.compute.power.SimplePowerDriver
+import java.io.File
+import java.io.InputStream
+import java.util.*
+import kotlin.math.roundToLong
+
+/**
+ * A [ClusterSpecReader] that is used to read the cluster definition file.
+ */
+private val reader = ClusterSpecReader()
+
+/**
+ * Construct a [Topology] from the specified [file].
+ */
+fun clusterTopology(
+ file: File,
+ powerModel: PowerModel = LinearPowerModel(350.0, idlePower = 200.0),
+ random: Random = Random(0)
+): Topology = clusterTopology(reader.read(file), powerModel, random)
+
+/**
+ * Construct a [Topology] from the specified [input].
+ */
+fun clusterTopology(
+ input: InputStream,
+ powerModel: PowerModel = LinearPowerModel(350.0, idlePower = 200.0),
+ random: Random = Random(0)
+): Topology = clusterTopology(reader.read(input), powerModel, random)
+
+/**
+ * Construct a [Topology] from the given list of [clusters].
+ */
+fun clusterTopology(
+ clusters: List<ClusterSpec>,
+ powerModel: PowerModel,
+ random: Random = Random(0)
+): Topology {
+ return object : Topology {
+ override fun resolve(): List<HostSpec> {
+ val hosts = mutableListOf<HostSpec>()
+ for (cluster in clusters) {
+ val cpuSpeed = cluster.cpuSpeed
+ val memoryPerHost = cluster.memCapacityPerHost.roundToLong()
+
+ val unknownProcessingNode = ProcessingNode("unknown", "unknown", "unknown", cluster.cpuCountPerHost)
+ val unknownMemoryUnit = MemoryUnit("unknown", "unknown", -1.0, memoryPerHost)
+ val machineModel = MachineModel(
+ List(cluster.cpuCountPerHost) { coreId -> ProcessingUnit(unknownProcessingNode, coreId, cpuSpeed) },
+ listOf(unknownMemoryUnit)
+ )
+
+ repeat(cluster.hostCount) {
+ val spec = HostSpec(
+ UUID(random.nextLong(), it.toLong()),
+ "node-${cluster.name}-$it",
+ mapOf("cluster" to cluster.id),
+ machineModel,
+ SimplePowerDriver(powerModel)
+ )
+
+ hosts += spec
+ }
+ }
+
+ return hosts
+ }
+
+ override fun toString(): String = "ClusterSpecTopology"
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt
deleted file mode 100644
index 0bf4ada6..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace
-
-import org.opendc.experiments.capelin.model.CompositeWorkload
-import org.opendc.experiments.capelin.model.Workload
-import org.opendc.simulator.compute.workload.SimWorkload
-
-/**
- * A [TraceReader] for the internal VM workload trace format.
- *
- * @param rawReaders The internal raw trace readers to use.
- * @param workload The workload to read.
- * @param seed The seed to use for sampling.
- */
-public class ParquetTraceReader(
- rawReaders: List<RawParquetTraceReader>,
- workload: Workload,
- seed: Int
-) : TraceReader<SimWorkload> {
- /**
- * The iterator over the actual trace.
- */
- private val iterator: Iterator<TraceEntry<SimWorkload>> =
- rawReaders
- .map { it.read() }
- .run {
- if (workload is CompositeWorkload) {
- this.zip(workload.workloads)
- } else {
- this.zip(listOf(workload))
- }
- }
- .flatMap { sampleWorkload(it.first, workload, it.second, seed).sortedBy(TraceEntry<SimWorkload>::start) }
- .iterator()
-
- override fun hasNext(): Boolean = iterator.hasNext()
-
- override fun next(): TraceEntry<SimWorkload> = iterator.next()
-
- override fun close() {}
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReader.kt
deleted file mode 100644
index 9549af42..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReader.kt
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace
-
-import com.fasterxml.jackson.annotation.JsonProperty
-import com.fasterxml.jackson.databind.ObjectMapper
-import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
-import com.fasterxml.jackson.module.kotlin.readValue
-import org.opendc.simulator.compute.kernel.interference.VmInterferenceGroup
-import java.io.InputStream
-
-/**
- * A parser for the JSON performance interference setup files used for the TPDS article on Capelin.
- */
-class PerformanceInterferenceReader {
- /**
- * The [ObjectMapper] to use.
- */
- private val mapper = jacksonObjectMapper()
-
- init {
- mapper.addMixIn(VmInterferenceGroup::class.java, GroupMixin::class.java)
- }
-
- /**
- * Read the performance interface model from the input.
- */
- fun read(input: InputStream): List<VmInterferenceGroup> {
- return input.use { mapper.readValue(input) }
- }
-
- private data class GroupMixin(
- @JsonProperty("minServerLoad")
- val targetLoad: Double,
- @JsonProperty("performanceScore")
- val score: Double,
- @JsonProperty("vms")
- val members: Set<String>,
- )
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt
deleted file mode 100644
index ca937328..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace
-
-import org.opendc.experiments.capelin.trace.bp.BPTraceFormat
-import org.opendc.simulator.compute.workload.SimTraceWorkload
-import org.opendc.simulator.compute.workload.SimWorkload
-import org.opendc.trace.*
-import java.io.File
-import java.util.UUID
-
-/**
- * A [TraceReader] for the internal VM workload trace format.
- *
- * @param path The directory of the traces.
- */
-class RawParquetTraceReader(private val path: File) {
- /**
- * The [Trace] that represents this trace.
- */
- private val trace = BPTraceFormat().open(path.toURI().toURL())
-
- /**
- * Read the fragments into memory.
- */
- private fun parseFragments(): Map<String, List<SimTraceWorkload.Fragment>> {
- val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader()
-
- val fragments = mutableMapOf<String, MutableList<SimTraceWorkload.Fragment>>()
-
- return try {
- while (reader.nextRow()) {
- val id = reader.get(RESOURCE_STATE_ID)
- val time = reader.get(RESOURCE_STATE_TIMESTAMP)
- val duration = reader.get(RESOURCE_STATE_DURATION)
- val cores = reader.getInt(RESOURCE_STATE_NCPUS)
- val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE)
-
- val fragment = SimTraceWorkload.Fragment(
- time.toEpochMilli(),
- duration.toMillis(),
- cpuUsage,
- cores
- )
-
- fragments.getOrPut(id) { mutableListOf() }.add(fragment)
- }
-
- fragments
- } finally {
- reader.close()
- }
- }
-
- /**
- * Read the metadata into a workload.
- */
- private fun parseMeta(fragments: Map<String, List<SimTraceWorkload.Fragment>>): List<TraceEntry<SimWorkload>> {
- val reader = checkNotNull(trace.getTable(TABLE_RESOURCES)).newReader()
-
- var counter = 0
- val entries = mutableListOf<TraceEntry<SimWorkload>>()
-
- return try {
- while (reader.nextRow()) {
-
- val id = reader.get(RESOURCE_ID)
- if (!fragments.containsKey(id)) {
- continue
- }
-
- val submissionTime = reader.get(RESOURCE_START_TIME)
- val endTime = reader.get(RESOURCE_STOP_TIME)
- val maxCores = reader.getInt(RESOURCE_NCPUS)
- val requiredMemory = reader.getDouble(RESOURCE_MEM_CAPACITY) / 1000.0 // Convert from KB to MB
- val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray())
-
- val vmFragments = fragments.getValue(id).asSequence()
- val totalLoad = vmFragments.sumOf { it.usage } * 5 * 60 // avg MHz * duration = MFLOPs
- val workload = SimTraceWorkload(vmFragments)
- entries.add(
- TraceEntry(
- uid, id, submissionTime.toEpochMilli(), workload,
- mapOf(
- "submit-time" to submissionTime.toEpochMilli(),
- "end-time" to endTime.toEpochMilli(),
- "total-load" to totalLoad,
- "cores" to maxCores,
- "required-memory" to requiredMemory.toLong(),
- "workload" to workload
- )
- )
- )
- }
-
- entries
- } catch (e: Exception) {
- e.printStackTrace()
- throw e
- } finally {
- reader.close()
- }
- }
-
- /**
- * The entries in the trace.
- */
- private val entries: List<TraceEntry<SimWorkload>>
-
- init {
- val fragments = parseFragments()
- entries = parseMeta(fragments)
- }
-
- /**
- * Read the entries in the trace.
- */
- fun read(): List<TraceEntry<SimWorkload>> = entries
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt
deleted file mode 100644
index ed82217d..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace
-
-import mu.KotlinLogging
-import org.apache.avro.generic.GenericData
-import org.apache.parquet.avro.AvroParquetReader
-import org.apache.parquet.filter2.compat.FilterCompat
-import org.apache.parquet.filter2.predicate.FilterApi
-import org.apache.parquet.filter2.predicate.Statistics
-import org.apache.parquet.filter2.predicate.UserDefinedPredicate
-import org.apache.parquet.io.api.Binary
-import org.opendc.simulator.compute.workload.SimTraceWorkload
-import org.opendc.simulator.compute.workload.SimWorkload
-import org.opendc.trace.util.parquet.LocalInputFile
-import java.io.File
-import java.io.Serializable
-import java.util.SortedSet
-import java.util.TreeSet
-import java.util.UUID
-import java.util.concurrent.ArrayBlockingQueue
-import kotlin.concurrent.thread
-
-/**
- * A [TraceReader] for the internal VM workload trace format that streams workloads on the fly.
- *
- * @param traceFile The directory of the traces.
- * @param selectedVms The list of VMs to read from the trace.
- */
-class StreamingParquetTraceReader(traceFile: File, selectedVms: List<String> = emptyList()) : TraceReader<SimWorkload> {
- private val logger = KotlinLogging.logger {}
-
- /**
- * The internal iterator to use for this reader.
- */
- private val iterator: Iterator<TraceEntry<SimWorkload>>
-
- /**
- * The intermediate buffer to store the read records in.
- */
- private val queue = ArrayBlockingQueue<Pair<String, SimTraceWorkload.Fragment>>(1024)
-
- /**
- * An optional filter for filtering the selected VMs
- */
- private val filter =
- if (selectedVms.isEmpty())
- null
- else
- FilterCompat.get(
- FilterApi.userDefined(
- FilterApi.binaryColumn("id"),
- SelectedVmFilter(
- TreeSet(selectedVms)
- )
- )
- )
-
- /**
- * A poisonous fragment.
- */
- private val poison = Pair("\u0000", SimTraceWorkload.Fragment(0L, 0, 0.0, 0))
-
- /**
- * The thread to read the records in.
- */
- private val readerThread = thread(start = true, name = "sc20-reader") {
- val reader = AvroParquetReader
- .builder<GenericData.Record>(LocalInputFile(File(traceFile, "trace.parquet")))
- .disableCompatibility()
- .withFilter(filter)
- .build()
-
- try {
- while (true) {
- val record = reader.read()
-
- if (record == null) {
- queue.put(poison)
- break
- }
-
- val id = record["id"].toString()
- val time = record["time"] as Long
- val duration = record["duration"] as Long
- val cores = record["cores"] as Int
- val cpuUsage = record["cpuUsage"] as Double
-
- val fragment = SimTraceWorkload.Fragment(
- time,
- duration,
- cpuUsage,
- cores
- )
-
- queue.put(id to fragment)
- }
- } catch (e: InterruptedException) {
- // Do not rethrow this
- } finally {
- reader.close()
- }
- }
-
- /**
- * Fill the buffers with the VMs
- */
- private fun pull(buffers: Map<String, List<MutableList<SimTraceWorkload.Fragment>>>) {
- if (!hasNext) {
- return
- }
-
- val fragments = mutableListOf<Pair<String, SimTraceWorkload.Fragment>>()
- queue.drainTo(fragments)
-
- for ((id, fragment) in fragments) {
- if (id == poison.first) {
- hasNext = false
- return
- }
- buffers[id]?.forEach { it.add(fragment) }
- }
- }
-
- /**
- * A flag to indicate whether the reader has more entries.
- */
- private var hasNext: Boolean = true
-
- /**
- * Initialize the reader.
- */
- init {
- val takenIds = mutableSetOf<UUID>()
- val entries = mutableMapOf<String, GenericData.Record>()
- val buffers = mutableMapOf<String, MutableList<MutableList<SimTraceWorkload.Fragment>>>()
-
- val metaReader = AvroParquetReader
- .builder<GenericData.Record>(LocalInputFile(File(traceFile, "meta.parquet")))
- .disableCompatibility()
- .withFilter(filter)
- .build()
-
- while (true) {
- val record = metaReader.read() ?: break
- val id = record["id"].toString()
- entries[id] = record
- }
-
- metaReader.close()
-
- val selection = selectedVms.ifEmpty { entries.keys }
-
- // Create the entry iterator
- iterator = selection.asSequence()
- .mapNotNull { entries[it] }
- .mapIndexed { index, record ->
- val id = record["id"].toString()
- val submissionTime = record["submissionTime"] as Long
- val endTime = record["endTime"] as Long
- val maxCores = record["maxCores"] as Int
- val requiredMemory = record["requiredMemory"] as Long
- val uid = UUID.nameUUIDFromBytes("$id-$index".toByteArray())
-
- assert(uid !in takenIds)
- takenIds += uid
-
- logger.info { "Processing VM $id" }
-
- val internalBuffer = mutableListOf<SimTraceWorkload.Fragment>()
- val externalBuffer = mutableListOf<SimTraceWorkload.Fragment>()
- buffers.getOrPut(id) { mutableListOf() }.add(externalBuffer)
- val fragments = sequence {
- var time = submissionTime
- repeat@ while (true) {
- if (externalBuffer.isEmpty()) {
- if (hasNext) {
- pull(buffers)
- continue
- } else {
- break
- }
- }
-
- internalBuffer.addAll(externalBuffer)
- externalBuffer.clear()
-
- for (fragment in internalBuffer) {
- yield(fragment)
-
- time += fragment.duration
- if (time >= endTime) {
- break@repeat
- }
- }
-
- internalBuffer.clear()
- }
-
- buffers.remove(id)
- }
- val workload = SimTraceWorkload(fragments)
- val meta = mapOf(
- "cores" to maxCores,
- "required-memory" to requiredMemory,
- "workload" to workload
- )
-
- TraceEntry(uid, id, submissionTime, workload, meta)
- }
- .sortedBy { it.start }
- .toList()
- .iterator()
- }
-
- override fun hasNext(): Boolean = iterator.hasNext()
-
- override fun next(): TraceEntry<SimWorkload> = iterator.next()
-
- override fun close() {
- readerThread.interrupt()
- }
-
- private class SelectedVmFilter(val selectedVms: SortedSet<String>) : UserDefinedPredicate<Binary>(), Serializable {
- override fun keep(value: Binary?): Boolean = value != null && selectedVms.contains(value.toStringUsingUTF8())
-
- override fun canDrop(statistics: Statistics<Binary>): Boolean {
- val min = statistics.min
- val max = statistics.max
-
- return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isEmpty()
- }
-
- override fun inverseCanDrop(statistics: Statistics<Binary>): Boolean {
- val min = statistics.min
- val max = statistics.max
-
- return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isNotEmpty()
- }
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt
deleted file mode 100644
index 1f3878eb..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt
+++ /dev/null
@@ -1,260 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace
-
-import com.github.ajalt.clikt.core.CliktCommand
-import com.github.ajalt.clikt.parameters.arguments.argument
-import com.github.ajalt.clikt.parameters.groups.OptionGroup
-import com.github.ajalt.clikt.parameters.groups.cooccurring
-import com.github.ajalt.clikt.parameters.options.*
-import com.github.ajalt.clikt.parameters.types.*
-import mu.KotlinLogging
-import org.apache.avro.generic.GenericData
-import org.apache.avro.generic.GenericRecordBuilder
-import org.apache.parquet.avro.AvroParquetWriter
-import org.apache.parquet.hadoop.ParquetWriter
-import org.apache.parquet.hadoop.metadata.CompressionCodecName
-import org.opendc.experiments.capelin.trace.azure.AzureTraceFormat
-import org.opendc.experiments.capelin.trace.bp.BP_RESOURCES_SCHEMA
-import org.opendc.experiments.capelin.trace.bp.BP_RESOURCE_STATES_SCHEMA
-import org.opendc.experiments.capelin.trace.sv.SvTraceFormat
-import org.opendc.trace.*
-import org.opendc.trace.bitbrains.BitbrainsTraceFormat
-import org.opendc.trace.util.parquet.LocalOutputFile
-import java.io.File
-import java.util.*
-import kotlin.math.max
-import kotlin.math.min
-import kotlin.math.roundToLong
-
-/**
- * A script to convert a trace in text format into a Parquet trace.
- */
-fun main(args: Array<String>): Unit = TraceConverterCli().main(args)
-
-/**
- * Represents the command for converting traces
- */
-class TraceConverterCli : CliktCommand(name = "trace-converter") {
- /**
- * The logger instance for the converter.
- */
- private val logger = KotlinLogging.logger {}
-
- /**
- * The directory where the trace should be stored.
- */
- private val output by option("-O", "--output", help = "path to store the trace")
- .file(canBeFile = false, mustExist = false)
- .defaultLazy { File("output") }
-
- /**
- * The directory where the input trace is located.
- */
- private val input by argument("input", help = "path to the input trace")
- .file(canBeFile = false)
-
- /**
- * The input format of the trace.
- */
- private val format by option("-f", "--format", help = "input format of trace")
- .choice(
- "solvinity" to SvTraceFormat(),
- "bitbrains" to BitbrainsTraceFormat(),
- "azure" to AzureTraceFormat()
- )
- .required()
-
- /**
- * The sampling options.
- */
- private val samplingOptions by SamplingOptions().cooccurring()
-
- override fun run() {
- val metaParquet = File(output, "meta.parquet")
- val traceParquet = File(output, "trace.parquet")
-
- if (metaParquet.exists()) {
- metaParquet.delete()
- }
- if (traceParquet.exists()) {
- traceParquet.delete()
- }
-
- val trace = format.open(input.toURI().toURL())
-
- logger.info { "Building resources table" }
-
- val metaWriter = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(metaParquet))
- .withSchema(BP_RESOURCES_SCHEMA)
- .withCompressionCodec(CompressionCodecName.ZSTD)
- .enablePageWriteChecksum()
- .build()
-
- val selectedVms = metaWriter.use { convertResources(trace, it) }
-
- logger.info { "Wrote ${selectedVms.size} rows" }
- logger.info { "Building resource states table" }
-
- val writer = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(traceParquet))
- .withSchema(BP_RESOURCE_STATES_SCHEMA)
- .withCompressionCodec(CompressionCodecName.ZSTD)
- .enableDictionaryEncoding()
- .enablePageWriteChecksum()
- .withBloomFilterEnabled("id", true)
- .withBloomFilterNDV("id", selectedVms.size.toLong())
- .build()
-
- val statesCount = writer.use { convertResourceStates(trace, it, selectedVms) }
- logger.info { "Wrote $statesCount rows" }
- }
-
- /**
- * Convert the resources table for the trace.
- */
- private fun convertResources(trace: Trace, writer: ParquetWriter<GenericData.Record>): Set<String> {
- val random = samplingOptions?.let { Random(it.seed) }
- val samplingFraction = samplingOptions?.fraction ?: 1.0
- val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader()
-
- var hasNextRow = reader.nextRow()
- val selectedVms = mutableSetOf<String>()
-
- while (hasNextRow) {
- var id: String
- var numCpus = Int.MIN_VALUE
- var memCapacity = Double.MIN_VALUE
- var memUsage = Double.MIN_VALUE
- var startTime = Long.MAX_VALUE
- var stopTime = Long.MIN_VALUE
-
- do {
- id = reader.get(RESOURCE_STATE_ID)
-
- val timestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli()
- startTime = min(startTime, timestamp)
- stopTime = max(stopTime, timestamp)
-
- numCpus = max(numCpus, reader.getInt(RESOURCE_STATE_NCPUS))
-
- memCapacity = max(memCapacity, reader.getDouble(RESOURCE_STATE_MEM_CAPACITY))
- if (reader.hasColumn(RESOURCE_STATE_MEM_USAGE)) {
- memUsage = max(memUsage, reader.getDouble(RESOURCE_STATE_MEM_USAGE))
- }
-
- hasNextRow = reader.nextRow()
- } while (hasNextRow && id == reader.get(RESOURCE_STATE_ID))
-
- // Sample only a fraction of the VMs
- if (random != null && random.nextDouble() > samplingFraction) {
- continue
- }
-
- val builder = GenericRecordBuilder(BP_RESOURCES_SCHEMA)
-
- builder["id"] = id
- builder["submissionTime"] = startTime
- builder["endTime"] = stopTime
- builder["maxCores"] = numCpus
- builder["requiredMemory"] = max(memCapacity, memUsage).roundToLong()
-
- logger.info { "Selecting VM $id" }
-
- writer.write(builder.build())
- selectedVms.add(id)
- }
-
- return selectedVms
- }
-
- /**
- * Convert the resource states table for the trace.
- */
- private fun convertResourceStates(trace: Trace, writer: ParquetWriter<GenericData.Record>, selectedVms: Set<String>): Int {
- val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader()
-
- var hasNextRow = reader.nextRow()
- var count = 0
-
- while (hasNextRow) {
- var lastTimestamp = Long.MIN_VALUE
-
- do {
- val id = reader.get(RESOURCE_STATE_ID)
-
- if (id !in selectedVms) {
- hasNextRow = reader.nextRow()
- continue
- }
-
- val builder = GenericRecordBuilder(BP_RESOURCE_STATES_SCHEMA)
- builder["id"] = id
-
- val timestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli()
- if (lastTimestamp < 0) {
- lastTimestamp = timestamp - 5 * 60 * 1000L
- }
-
- val duration = timestamp - lastTimestamp
- val cores = reader.getInt(RESOURCE_STATE_NCPUS)
- val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE)
- val flops = (cpuUsage * duration / 1000.0).roundToLong()
-
- builder["time"] = timestamp
- builder["duration"] = duration
- builder["cores"] = cores
- builder["cpuUsage"] = cpuUsage
- builder["flops"] = flops
-
- writer.write(builder.build())
-
- lastTimestamp = timestamp
- hasNextRow = reader.nextRow()
- } while (hasNextRow && id == reader.get(RESOURCE_STATE_ID))
-
- count++
- }
-
- return count
- }
-
- /**
- * Options for sampling the workload trace.
- */
- private class SamplingOptions : OptionGroup() {
- /**
- * The fraction of VMs to sample
- */
- val fraction by option("--sampling-fraction", help = "fraction of the workload to sample")
- .double()
- .restrictTo(0.0001, 1.0)
- .required()
-
- /**
- * The seed for sampling the trace.
- */
- val seed by option("--sampling-seed", help = "seed for sampling the workload")
- .long()
- .default(0)
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceEntry.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceEntry.kt
deleted file mode 100644
index 303a6a8c..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceEntry.kt
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2019 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace
-
-import java.util.UUID
-
-/**
- * An entry in a workload trace.
- *
- * @param uid The unique identifier of the entry.
- * @param name The name of the entry.
- * @param start The start time of the workload.
- * @param workload The workload of the entry.
- * @param meta The meta-data associated with the workload.
- */
-public data class TraceEntry<out T>(
- val uid: UUID,
- val name: String,
- val start: Long,
- val workload: T,
- val meta: Map<String, Any>
-)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceReader.kt
deleted file mode 100644
index 08304edc..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceReader.kt
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace
-
-/**
- * An interface for reading workloads into memory.
- *
- * This interface must guarantee that the entries are delivered in order of submission time.
- *
- * @param T The shape of the workloads supported by this reader.
- */
-public interface TraceReader<T> : Iterator<TraceEntry<T>>, AutoCloseable
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt
deleted file mode 100644
index cb32ce88..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace
-
-import mu.KotlinLogging
-import org.opendc.experiments.capelin.model.CompositeWorkload
-import org.opendc.experiments.capelin.model.SamplingStrategy
-import org.opendc.experiments.capelin.model.Workload
-import org.opendc.simulator.compute.workload.SimWorkload
-import java.util.*
-import kotlin.random.Random
-
-private val logger = KotlinLogging.logger {}
-
-/**
- * Sample the workload for the specified [run].
- */
-public fun sampleWorkload(
- trace: List<TraceEntry<SimWorkload>>,
- workload: Workload,
- subWorkload: Workload,
- seed: Int
-): List<TraceEntry<SimWorkload>> {
- return when {
- workload is CompositeWorkload -> sampleRegularWorkload(trace, workload, subWorkload, seed)
- workload.samplingStrategy == SamplingStrategy.HPC ->
- sampleHpcWorkload(trace, workload, seed, sampleOnLoad = false)
- workload.samplingStrategy == SamplingStrategy.HPC_LOAD ->
- sampleHpcWorkload(trace, workload, seed, sampleOnLoad = true)
- else ->
- sampleRegularWorkload(trace, workload, workload, seed)
- }
-}
-
-/**
- * Sample a regular (non-HPC) workload.
- */
-public fun sampleRegularWorkload(
- trace: List<TraceEntry<SimWorkload>>,
- workload: Workload,
- subWorkload: Workload,
- seed: Int
-): List<TraceEntry<SimWorkload>> {
- val fraction = subWorkload.fraction
-
- val shuffled = trace.shuffled(Random(seed))
- val res = mutableListOf<TraceEntry<SimWorkload>>()
- val totalLoad = if (workload is CompositeWorkload) {
- workload.totalLoad
- } else {
- shuffled.sumOf { it.meta.getValue("total-load") as Double }
- }
- var currentLoad = 0.0
-
- for (entry in shuffled) {
- val entryLoad = entry.meta.getValue("total-load") as Double
- if ((currentLoad + entryLoad) / totalLoad > fraction) {
- break
- }
-
- currentLoad += entryLoad
- res += entry
- }
-
- logger.info { "Sampled ${trace.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }
-
- return res
-}
-
-/**
- * Sample a HPC workload.
- */
-public fun sampleHpcWorkload(
- trace: List<TraceEntry<SimWorkload>>,
- workload: Workload,
- seed: Int,
- sampleOnLoad: Boolean
-): List<TraceEntry<SimWorkload>> {
- val pattern = Regex("^vm__workload__(ComputeNode|cn).*")
- val random = Random(seed)
-
- val fraction = workload.fraction
- val (hpc, nonHpc) = trace.partition { entry ->
- val name = entry.name
- name.matches(pattern)
- }
-
- val hpcSequence = generateSequence(0) { it + 1 }
- .map { index ->
- val res = mutableListOf<TraceEntry<SimWorkload>>()
- hpc.mapTo(res) { sample(it, index) }
- res.shuffle(random)
- res
- }
- .flatten()
-
- val nonHpcSequence = generateSequence(0) { it + 1 }
- .map { index ->
- val res = mutableListOf<TraceEntry<SimWorkload>>()
- nonHpc.mapTo(res) { sample(it, index) }
- res.shuffle(random)
- res
- }
- .flatten()
-
- logger.debug { "Found ${hpc.size} HPC workloads and ${nonHpc.size} non-HPC workloads" }
-
- val totalLoad = if (workload is CompositeWorkload) {
- workload.totalLoad
- } else {
- trace.sumOf { it.meta.getValue("total-load") as Double }
- }
-
- logger.debug { "Total trace load: $totalLoad" }
- var hpcCount = 0
- var hpcLoad = 0.0
- var nonHpcCount = 0
- var nonHpcLoad = 0.0
-
- val res = mutableListOf<TraceEntry<SimWorkload>>()
-
- if (sampleOnLoad) {
- var currentLoad = 0.0
- for (entry in hpcSequence) {
- val entryLoad = entry.meta.getValue("total-load") as Double
- if ((currentLoad + entryLoad) / totalLoad > fraction) {
- break
- }
-
- hpcLoad += entryLoad
- hpcCount += 1
- currentLoad += entryLoad
- res += entry
- }
-
- for (entry in nonHpcSequence) {
- val entryLoad = entry.meta.getValue("total-load") as Double
- if ((currentLoad + entryLoad) / totalLoad > 1) {
- break
- }
-
- nonHpcLoad += entryLoad
- nonHpcCount += 1
- currentLoad += entryLoad
- res += entry
- }
- } else {
- hpcSequence
- .take((fraction * trace.size).toInt())
- .forEach { entry ->
- hpcLoad += entry.meta.getValue("total-load") as Double
- hpcCount += 1
- res.add(entry)
- }
-
- nonHpcSequence
- .take(((1 - fraction) * trace.size).toInt())
- .forEach { entry ->
- nonHpcLoad += entry.meta.getValue("total-load") as Double
- nonHpcCount += 1
- res.add(entry)
- }
- }
-
- logger.debug { "HPC $hpcCount (load $hpcLoad) and non-HPC $nonHpcCount (load $nonHpcLoad)" }
- logger.debug { "Total sampled load: ${hpcLoad + nonHpcLoad}" }
- logger.info { "Sampled ${trace.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }
-
- return res
-}
-
-/**
- * Sample a random trace entry.
- */
-private fun sample(entry: TraceEntry<SimWorkload>, i: Int): TraceEntry<SimWorkload> {
- val uid = UUID.nameUUIDFromBytes("${entry.uid}-$i".toByteArray())
- return entry.copy(uid = uid)
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTable.kt
deleted file mode 100644
index f98f4b2c..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTable.kt
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace.azure
-
-import com.fasterxml.jackson.dataformat.csv.CsvFactory
-import org.opendc.trace.*
-import java.nio.file.Files
-import java.nio.file.Path
-import java.util.stream.Collectors
-import kotlin.io.path.extension
-import kotlin.io.path.nameWithoutExtension
-
-/**
- * The resource state [Table] for the Azure v1 VM traces.
- */
-internal class AzureResourceStateTable(private val factory: CsvFactory, path: Path) : Table {
- /**
- * The partitions that belong to the table.
- */
- private val partitions = Files.walk(path, 1)
- .filter { !Files.isDirectory(it) && it.extension == "csv" }
- .collect(Collectors.toMap({ it.nameWithoutExtension }, { it }))
- .toSortedMap()
-
- override val name: String = TABLE_RESOURCE_STATES
-
- override val isSynthetic: Boolean = false
-
- override val columns: List<TableColumn<*>> = listOf(
- RESOURCE_STATE_ID,
- RESOURCE_STATE_TIMESTAMP,
- RESOURCE_STATE_CPU_USAGE_PCT
- )
-
- override fun newReader(): TableReader {
- val it = partitions.iterator()
-
- return object : TableReader {
- var delegate: TableReader? = nextDelegate()
-
- override fun nextRow(): Boolean {
- var delegate = delegate
-
- while (delegate != null) {
- if (delegate.nextRow()) {
- break
- }
-
- delegate.close()
- delegate = nextDelegate()
- }
-
- this.delegate = delegate
- return delegate != null
- }
-
- override fun hasColumn(column: TableColumn<*>): Boolean = delegate?.hasColumn(column) ?: false
-
- override fun <T> get(column: TableColumn<T>): T {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.get(column)
- }
-
- override fun getBoolean(column: TableColumn<Boolean>): Boolean {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.getBoolean(column)
- }
-
- override fun getInt(column: TableColumn<Int>): Int {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.getInt(column)
- }
-
- override fun getLong(column: TableColumn<Long>): Long {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.getLong(column)
- }
-
- override fun getDouble(column: TableColumn<Double>): Double {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.getDouble(column)
- }
-
- override fun close() {
- delegate?.close()
- }
-
- private fun nextDelegate(): TableReader? {
- return if (it.hasNext()) {
- val (_, path) = it.next()
- return AzureResourceStateTableReader(factory.createParser(path.toFile()))
- } else {
- null
- }
- }
-
- override fun toString(): String = "AzureCompositeTableReader"
- }
- }
-
- override fun newReader(partition: String): TableReader {
- val path = requireNotNull(partitions[partition]) { "Invalid partition $partition" }
- return AzureResourceStateTableReader(factory.createParser(path.toFile()))
- }
-
- override fun toString(): String = "AzureResourceStateTable"
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTableReader.kt
deleted file mode 100644
index f80c0e82..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTableReader.kt
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace.azure
-
-import com.fasterxml.jackson.core.JsonToken
-import com.fasterxml.jackson.dataformat.csv.CsvParser
-import com.fasterxml.jackson.dataformat.csv.CsvSchema
-import org.opendc.trace.*
-import java.time.Instant
-
-/**
- * A [TableReader] for the Azure v1 VM resource state table.
- */
-internal class AzureResourceStateTableReader(private val parser: CsvParser) : TableReader {
- init {
- parser.schema = schema
- }
-
- override fun nextRow(): Boolean {
- reset()
-
- if (!nextStart()) {
- return false
- }
-
- while (true) {
- val token = parser.nextValue()
-
- if (token == null || token == JsonToken.END_OBJECT) {
- break
- }
-
- when (parser.currentName) {
- "timestamp" -> timestamp = Instant.ofEpochSecond(parser.longValue)
- "vm id" -> id = parser.text
- "avg cpu" -> cpuUsagePct = parser.doubleValue
- }
- }
-
- return true
- }
-
- override fun hasColumn(column: TableColumn<*>): Boolean {
- return when (column) {
- RESOURCE_STATE_ID -> true
- RESOURCE_STATE_TIMESTAMP -> true
- RESOURCE_STATE_CPU_USAGE_PCT -> true
- else -> false
- }
- }
-
- override fun <T> get(column: TableColumn<T>): T {
- val res: Any? = when (column) {
- RESOURCE_STATE_ID -> id
- RESOURCE_STATE_TIMESTAMP -> timestamp
- RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct
- else -> throw IllegalArgumentException("Invalid column")
- }
-
- @Suppress("UNCHECKED_CAST")
- return res as T
- }
-
- override fun getBoolean(column: TableColumn<Boolean>): Boolean {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getInt(column: TableColumn<Int>): Int {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getLong(column: TableColumn<Long>): Long {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getDouble(column: TableColumn<Double>): Double {
- return when (column) {
- RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun close() {
- parser.close()
- }
-
- /**
- * Advance the parser until the next object start.
- */
- private fun nextStart(): Boolean {
- var token = parser.nextValue()
-
- while (token != null && token != JsonToken.START_OBJECT) {
- token = parser.nextValue()
- }
-
- return token != null
- }
-
- /**
- * State fields of the reader.
- */
- private var id: String? = null
- private var timestamp: Instant? = null
- private var cpuUsagePct = Double.NaN
-
- /**
- * Reset the state.
- */
- private fun reset() {
- id = null
- timestamp = null
- cpuUsagePct = Double.NaN
- }
-
- companion object {
- /**
- * The [CsvSchema] that is used to parse the trace.
- */
- private val schema = CsvSchema.builder()
- .addColumn("timestamp", CsvSchema.ColumnType.NUMBER)
- .addColumn("vm id", CsvSchema.ColumnType.STRING)
- .addColumn("CPU min cpu", CsvSchema.ColumnType.NUMBER)
- .addColumn("CPU max cpu", CsvSchema.ColumnType.NUMBER)
- .addColumn("CPU avg cpu", CsvSchema.ColumnType.NUMBER)
- .setAllowComments(true)
- .build()
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTable.kt
deleted file mode 100644
index c9d4f7eb..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTable.kt
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace.azure
-
-import com.fasterxml.jackson.dataformat.csv.CsvFactory
-import org.opendc.trace.*
-import java.nio.file.Path
-
-/**
- * The resource [Table] for the Azure v1 VM traces.
- */
-internal class AzureResourceTable(private val factory: CsvFactory, private val path: Path) : Table {
- override val name: String = TABLE_RESOURCES
-
- override val isSynthetic: Boolean = false
-
- override val columns: List<TableColumn<*>> = listOf(
- RESOURCE_ID,
- RESOURCE_START_TIME,
- RESOURCE_STOP_TIME,
- RESOURCE_NCPUS,
- RESOURCE_MEM_CAPACITY
- )
-
- override fun newReader(): TableReader {
- return AzureResourceTableReader(factory.createParser(path.resolve("vmtable/vmtable.csv").toFile()))
- }
-
- override fun newReader(partition: String): TableReader {
- throw IllegalArgumentException("No partition $partition")
- }
-
- override fun toString(): String = "AzureResourceTable"
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTableReader.kt
deleted file mode 100644
index b712b854..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTableReader.kt
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace.azure
-
-import com.fasterxml.jackson.core.JsonToken
-import com.fasterxml.jackson.dataformat.csv.CsvParser
-import com.fasterxml.jackson.dataformat.csv.CsvSchema
-import org.apache.parquet.example.Paper.schema
-import org.opendc.trace.*
-import java.time.Instant
-
-/**
- * A [TableReader] for the Azure v1 VM resources table.
- */
-internal class AzureResourceTableReader(private val parser: CsvParser) : TableReader {
- init {
- parser.schema = schema
- }
-
- override fun nextRow(): Boolean {
- reset()
-
- if (!nextStart()) {
- return false
- }
-
- while (true) {
- val token = parser.nextValue()
-
- if (token == null || token == JsonToken.END_OBJECT) {
- break
- }
-
- when (parser.currentName) {
- "vm id" -> id = parser.text
- "vm created" -> startTime = Instant.ofEpochSecond(parser.longValue)
- "vm deleted" -> stopTime = Instant.ofEpochSecond(parser.longValue)
- "vm virtual core count" -> cpuCores = parser.intValue
- "vm memory" -> memCapacity = parser.doubleValue * 1e6 // GB to KB
- }
- }
-
- return true
- }
-
- override fun hasColumn(column: TableColumn<*>): Boolean {
- return when (column) {
- RESOURCE_ID -> true
- RESOURCE_START_TIME -> true
- RESOURCE_STOP_TIME -> true
- RESOURCE_NCPUS -> true
- RESOURCE_MEM_CAPACITY -> true
- else -> false
- }
- }
-
- override fun <T> get(column: TableColumn<T>): T {
- val res: Any? = when (column) {
- RESOURCE_ID -> id
- RESOURCE_START_TIME -> startTime
- RESOURCE_STOP_TIME -> stopTime
- RESOURCE_NCPUS -> getInt(RESOURCE_STATE_NCPUS)
- RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_STATE_MEM_CAPACITY)
- else -> throw IllegalArgumentException("Invalid column")
- }
-
- @Suppress("UNCHECKED_CAST")
- return res as T
- }
-
- override fun getBoolean(column: TableColumn<Boolean>): Boolean {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getInt(column: TableColumn<Int>): Int {
- return when (column) {
- RESOURCE_NCPUS -> cpuCores
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun getLong(column: TableColumn<Long>): Long {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getDouble(column: TableColumn<Double>): Double {
- return when (column) {
- RESOURCE_MEM_CAPACITY -> memCapacity
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun close() {
- parser.close()
- }
-
- /**
- * Advance the parser until the next object start.
- */
- private fun nextStart(): Boolean {
- var token = parser.nextValue()
-
- while (token != null && token != JsonToken.START_OBJECT) {
- token = parser.nextValue()
- }
-
- return token != null
- }
-
- /**
- * State fields of the reader.
- */
- private var id: String? = null
- private var startTime: Instant? = null
- private var stopTime: Instant? = null
- private var cpuCores = -1
- private var memCapacity = Double.NaN
-
- /**
- * Reset the state.
- */
- fun reset() {
- id = null
- startTime = null
- stopTime = null
- cpuCores = -1
- memCapacity = Double.NaN
- }
-
- companion object {
- /**
- * The [CsvSchema] that is used to parse the trace.
- */
- private val schema = CsvSchema.builder()
- .addColumn("vm id", CsvSchema.ColumnType.NUMBER)
- .addColumn("subscription id", CsvSchema.ColumnType.STRING)
- .addColumn("deployment id", CsvSchema.ColumnType.NUMBER)
- .addColumn("timestamp vm created", CsvSchema.ColumnType.NUMBER)
- .addColumn("timestamp vm deleted", CsvSchema.ColumnType.NUMBER)
- .addColumn("max cpu", CsvSchema.ColumnType.NUMBER)
- .addColumn("avg cpu", CsvSchema.ColumnType.NUMBER)
- .addColumn("p95 cpu", CsvSchema.ColumnType.NUMBER)
- .addColumn("vm category", CsvSchema.ColumnType.NUMBER)
- .addColumn("vm virtual core count", CsvSchema.ColumnType.NUMBER)
- .addColumn("vm memory", CsvSchema.ColumnType.NUMBER)
- .setAllowComments(true)
- .build()
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTrace.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTrace.kt
deleted file mode 100644
index 24c60bab..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTrace.kt
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace.azure
-
-import com.fasterxml.jackson.dataformat.csv.CsvFactory
-import org.opendc.trace.*
-import java.nio.file.Path
-
-/**
- * [Trace] implementation for the Azure v1 VM traces.
- */
-class AzureTrace internal constructor(private val factory: CsvFactory, private val path: Path) : Trace {
- override val tables: List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES)
-
- override fun containsTable(name: String): Boolean = name in tables
-
- override fun getTable(name: String): Table? {
- return when (name) {
- TABLE_RESOURCES -> AzureResourceTable(factory, path)
- TABLE_RESOURCE_STATES -> AzureResourceStateTable(factory, path)
- else -> null
- }
- }
-
- override fun toString(): String = "AzureTrace[$path]"
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTraceFormat.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTraceFormat.kt
deleted file mode 100644
index 744e43a0..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTraceFormat.kt
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace.azure
-
-import com.fasterxml.jackson.dataformat.csv.CsvFactory
-import com.fasterxml.jackson.dataformat.csv.CsvParser
-import org.opendc.trace.spi.TraceFormat
-import java.net.URL
-import java.nio.file.Paths
-import kotlin.io.path.exists
-
-/**
- * A format implementation for the Azure v1 format.
- */
-class AzureTraceFormat : TraceFormat {
- /**
- * The name of this trace format.
- */
- override val name: String = "azure-v1"
-
- /**
- * The [CsvFactory] used to create the parser.
- */
- private val factory = CsvFactory()
- .enable(CsvParser.Feature.ALLOW_COMMENTS)
- .enable(CsvParser.Feature.TRIM_SPACES)
-
- /**
- * Open the trace file.
- */
- override fun open(url: URL): AzureTrace {
- val path = Paths.get(url.toURI())
- require(path.exists()) { "URL $url does not exist" }
- return AzureTrace(factory, path)
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt
deleted file mode 100644
index f051bf88..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace.bp
-
-import org.apache.avro.generic.GenericRecord
-import org.opendc.trace.*
-import org.opendc.trace.util.parquet.LocalParquetReader
-import java.nio.file.Path
-
-/**
- * The resource state [Table] in the Bitbrains Parquet format.
- */
-internal class BPResourceStateTable(private val path: Path) : Table {
- override val name: String = TABLE_RESOURCE_STATES
- override val isSynthetic: Boolean = false
-
- override val columns: List<TableColumn<*>> = listOf(
- RESOURCE_STATE_ID,
- RESOURCE_STATE_TIMESTAMP,
- RESOURCE_STATE_DURATION,
- RESOURCE_STATE_NCPUS,
- RESOURCE_STATE_CPU_USAGE,
- )
-
- override fun newReader(): TableReader {
- val reader = LocalParquetReader<GenericRecord>(path.resolve("trace.parquet"))
- return BPResourceStateTableReader(reader)
- }
-
- override fun newReader(partition: String): TableReader {
- throw IllegalArgumentException("Unknown partition $partition")
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTableReader.kt
deleted file mode 100644
index 0e7ee555..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTableReader.kt
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace.bp
-
-import org.apache.avro.generic.GenericRecord
-import org.opendc.trace.*
-import org.opendc.trace.util.parquet.LocalParquetReader
-import java.time.Duration
-import java.time.Instant
-
-/**
- * A [TableReader] implementation for the Bitbrains Parquet format.
- */
-internal class BPResourceStateTableReader(private val reader: LocalParquetReader<GenericRecord>) : TableReader {
- /**
- * The current record.
- */
- private var record: GenericRecord? = null
-
- override fun nextRow(): Boolean {
- record = reader.read()
- return record != null
- }
-
- override fun hasColumn(column: TableColumn<*>): Boolean {
- return when (column) {
- RESOURCE_STATE_ID -> true
- RESOURCE_STATE_TIMESTAMP -> true
- RESOURCE_STATE_DURATION -> true
- RESOURCE_STATE_NCPUS -> true
- RESOURCE_STATE_CPU_USAGE -> true
- else -> false
- }
- }
-
- override fun <T> get(column: TableColumn<T>): T {
- val record = checkNotNull(record) { "Reader in invalid state" }
-
- @Suppress("UNCHECKED_CAST")
- val res: Any = when (column) {
- RESOURCE_STATE_ID -> record["id"].toString()
- RESOURCE_STATE_TIMESTAMP -> Instant.ofEpochMilli(record["time"] as Long)
- RESOURCE_STATE_DURATION -> Duration.ofMillis(record["duration"] as Long)
- RESOURCE_STATE_NCPUS -> record["cores"]
- RESOURCE_STATE_CPU_USAGE -> (record["cpuUsage"] as Number).toDouble()
- else -> throw IllegalArgumentException("Invalid column")
- }
-
- @Suppress("UNCHECKED_CAST")
- return res as T
- }
-
- override fun getBoolean(column: TableColumn<Boolean>): Boolean {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getInt(column: TableColumn<Int>): Int {
- val record = checkNotNull(record) { "Reader in invalid state" }
-
- return when (column) {
- RESOURCE_STATE_NCPUS -> record["cores"] as Int
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun getLong(column: TableColumn<Long>): Long {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getDouble(column: TableColumn<Double>): Double {
- val record = checkNotNull(record) { "Reader in invalid state" }
- return when (column) {
- RESOURCE_STATE_CPU_USAGE -> (record["cpuUsage"] as Number).toDouble()
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun close() {
- reader.close()
- }
-
- override fun toString(): String = "BPResourceStateTableReader"
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt
deleted file mode 100644
index 5b0f013f..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace.bp
-
-import org.apache.avro.generic.GenericRecord
-import org.opendc.trace.*
-import org.opendc.trace.util.parquet.LocalParquetReader
-import java.nio.file.Path
-
-/**
- * The resource [Table] in the Bitbrains Parquet format.
- */
-internal class BPResourceTable(private val path: Path) : Table {
- override val name: String = TABLE_RESOURCES
- override val isSynthetic: Boolean = false
-
- override val columns: List<TableColumn<*>> = listOf(
- RESOURCE_ID,
- RESOURCE_START_TIME,
- RESOURCE_STOP_TIME,
- RESOURCE_NCPUS,
- RESOURCE_MEM_CAPACITY
- )
-
- override fun newReader(): TableReader {
- val reader = LocalParquetReader<GenericRecord>(path.resolve("meta.parquet"))
- return BPResourceTableReader(reader)
- }
-
- override fun newReader(partition: String): TableReader {
- throw IllegalArgumentException("Unknown partition $partition")
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt
deleted file mode 100644
index 4416aae8..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace.bp
-
-import org.apache.avro.generic.GenericRecord
-import org.opendc.trace.*
-import org.opendc.trace.util.parquet.LocalParquetReader
-import java.time.Instant
-
-/**
- * A [TableReader] implementation for the Bitbrains Parquet format.
- */
-internal class BPResourceTableReader(private val reader: LocalParquetReader<GenericRecord>) : TableReader {
- /**
- * The current record.
- */
- private var record: GenericRecord? = null
-
- override fun nextRow(): Boolean {
- record = reader.read()
- return record != null
- }
-
- override fun hasColumn(column: TableColumn<*>): Boolean {
- return when (column) {
- RESOURCE_ID -> true
- RESOURCE_START_TIME -> true
- RESOURCE_STOP_TIME -> true
- RESOURCE_NCPUS -> true
- RESOURCE_MEM_CAPACITY -> true
- else -> false
- }
- }
-
- override fun <T> get(column: TableColumn<T>): T {
- val record = checkNotNull(record) { "Reader in invalid state" }
-
- @Suppress("UNCHECKED_CAST")
- val res: Any = when (column) {
- RESOURCE_ID -> record["id"].toString()
- RESOURCE_START_TIME -> Instant.ofEpochMilli(record["submissionTime"] as Long)
- RESOURCE_STOP_TIME -> Instant.ofEpochMilli(record["endTime"] as Long)
- RESOURCE_NCPUS -> getInt(RESOURCE_NCPUS)
- RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_MEM_CAPACITY)
- else -> throw IllegalArgumentException("Invalid column")
- }
-
- @Suppress("UNCHECKED_CAST")
- return res as T
- }
-
- override fun getBoolean(column: TableColumn<Boolean>): Boolean {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getInt(column: TableColumn<Int>): Int {
- val record = checkNotNull(record) { "Reader in invalid state" }
-
- return when (column) {
- RESOURCE_NCPUS -> record["maxCores"] as Int
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun getLong(column: TableColumn<Long>): Long {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getDouble(column: TableColumn<Double>): Double {
- val record = checkNotNull(record) { "Reader in invalid state" }
-
- return when (column) {
- RESOURCE_MEM_CAPACITY -> (record["requiredMemory"] as Number).toDouble() * 1000.0 // MB to KB
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun close() {
- reader.close()
- }
-
- override fun toString(): String = "BPResourceTableReader"
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTrace.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTrace.kt
deleted file mode 100644
index 486587b1..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTrace.kt
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace.bp
-
-import org.opendc.trace.TABLE_RESOURCES
-import org.opendc.trace.TABLE_RESOURCE_STATES
-import org.opendc.trace.Table
-import org.opendc.trace.Trace
-import java.nio.file.Path
-
-/**
- * A [Trace] in the Bitbrains Parquet format.
- */
-public class BPTrace internal constructor(private val path: Path) : Trace {
- override val tables: List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES)
-
- override fun containsTable(name: String): Boolean =
- name == TABLE_RESOURCES || name == TABLE_RESOURCE_STATES
-
- override fun getTable(name: String): Table? {
- return when (name) {
- TABLE_RESOURCES -> BPResourceTable(path)
- TABLE_RESOURCE_STATES -> BPResourceStateTable(path)
- else -> null
- }
- }
-
- override fun toString(): String = "BPTrace[$path]"
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTraceFormat.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTraceFormat.kt
deleted file mode 100644
index 49d5b4c5..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTraceFormat.kt
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace.bp
-
-import org.opendc.trace.spi.TraceFormat
-import java.net.URL
-import java.nio.file.Paths
-import kotlin.io.path.exists
-
-/**
- * A format implementation for the GWF trace format.
- */
-public class BPTraceFormat : TraceFormat {
- /**
- * The name of this trace format.
- */
- override val name: String = "bitbrains-parquet"
-
- /**
- * Open a Bitbrains Parquet trace.
- */
- override fun open(url: URL): BPTrace {
- val path = Paths.get(url.toURI())
- require(path.exists()) { "URL $url does not exist" }
- return BPTrace(path)
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/Schemas.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/Schemas.kt
deleted file mode 100644
index 7dd8161d..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/Schemas.kt
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace.bp
-
-import org.apache.avro.Schema
-import org.apache.avro.SchemaBuilder
-
-/**
- * Schema for the resources table in the trace.
- */
-val BP_RESOURCES_SCHEMA: Schema = SchemaBuilder
- .record("meta")
- .namespace("org.opendc.trace.capelin")
- .fields()
- .requiredString("id")
- .requiredLong("submissionTime")
- .requiredLong("endTime")
- .requiredInt("maxCores")
- .requiredLong("requiredMemory")
- .endRecord()
-
-/**
- * Schema for the resource states table in the trace.
- */
-val BP_RESOURCE_STATES_SCHEMA: Schema = SchemaBuilder
- .record("meta")
- .namespace("org.opendc.trace.capelin")
- .fields()
- .requiredString("id")
- .requiredLong("time")
- .requiredLong("duration")
- .requiredInt("cores")
- .requiredDouble("cpuUsage")
- .requiredLong("flops")
- .endRecord()
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt
deleted file mode 100644
index 67140fe9..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace.sv
-
-import org.opendc.trace.*
-import java.nio.file.Files
-import java.nio.file.Path
-import java.util.stream.Collectors
-import kotlin.io.path.bufferedReader
-import kotlin.io.path.extension
-import kotlin.io.path.nameWithoutExtension
-
-/**
- * The resource state [Table] in the extended Bitbrains format.
- */
-internal class SvResourceStateTable(path: Path) : Table {
- /**
- * The partitions that belong to the table.
- */
- private val partitions = Files.walk(path, 1)
- .filter { !Files.isDirectory(it) && it.extension == "txt" }
- .collect(Collectors.toMap({ it.nameWithoutExtension }, { it }))
- .toSortedMap()
-
- override val name: String = TABLE_RESOURCE_STATES
-
- override val isSynthetic: Boolean = false
-
- override val columns: List<TableColumn<*>> = listOf(
- RESOURCE_STATE_ID,
- RESOURCE_STATE_CLUSTER_ID,
- RESOURCE_STATE_TIMESTAMP,
- RESOURCE_STATE_NCPUS,
- RESOURCE_STATE_CPU_CAPACITY,
- RESOURCE_STATE_CPU_USAGE,
- RESOURCE_STATE_CPU_USAGE_PCT,
- RESOURCE_STATE_CPU_DEMAND,
- RESOURCE_STATE_CPU_READY_PCT,
- RESOURCE_STATE_MEM_CAPACITY,
- RESOURCE_STATE_DISK_READ,
- RESOURCE_STATE_DISK_WRITE,
- )
-
- override fun newReader(): TableReader {
- val it = partitions.iterator()
-
- return object : TableReader {
- var delegate: TableReader? = nextDelegate()
-
- override fun nextRow(): Boolean {
- var delegate = delegate
-
- while (delegate != null) {
- if (delegate.nextRow()) {
- break
- }
-
- delegate.close()
- delegate = nextDelegate()
- }
-
- this.delegate = delegate
- return delegate != null
- }
-
- override fun hasColumn(column: TableColumn<*>): Boolean = delegate?.hasColumn(column) ?: false
-
- override fun <T> get(column: TableColumn<T>): T {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.get(column)
- }
-
- override fun getBoolean(column: TableColumn<Boolean>): Boolean {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.getBoolean(column)
- }
-
- override fun getInt(column: TableColumn<Int>): Int {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.getInt(column)
- }
-
- override fun getLong(column: TableColumn<Long>): Long {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.getLong(column)
- }
-
- override fun getDouble(column: TableColumn<Double>): Double {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.getDouble(column)
- }
-
- override fun close() {
- delegate?.close()
- }
-
- private fun nextDelegate(): TableReader? {
- return if (it.hasNext()) {
- val (_, path) = it.next()
- val reader = path.bufferedReader()
- return SvResourceStateTableReader(reader)
- } else {
- null
- }
- }
-
- override fun toString(): String = "SvCompositeTableReader"
- }
- }
-
- override fun newReader(partition: String): TableReader {
- val path = requireNotNull(partitions[partition]) { "Invalid partition $partition" }
- val reader = path.bufferedReader()
- return SvResourceStateTableReader(reader)
- }
-
- override fun toString(): String = "SvResourceStateTable"
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt
deleted file mode 100644
index 6ea403fe..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace.sv
-
-import org.opendc.trace.*
-import java.io.BufferedReader
-import java.time.Instant
-
-/**
- * A [TableReader] for the Bitbrains resource state table.
- */
-internal class SvResourceStateTableReader(private val reader: BufferedReader) : TableReader {
- override fun nextRow(): Boolean {
- reset()
-
- var line: String
- var num = 0
-
- while (true) {
- line = reader.readLine() ?: return false
- num++
-
- if (line[0] == '#' || line.isBlank()) {
- // Ignore empty lines or comments
- continue
- }
-
- break
- }
-
- line = line.trim()
-
- val length = line.length
- var col = 0
- var start: Int
- var end = 0
-
- while (end < length) {
- // Trim all whitespace before the field
- start = end
- while (start < length && line[start].isWhitespace()) {
- start++
- }
-
- end = line.indexOf(' ', start)
-
- if (end < 0) {
- end = length
- }
-
- val field = line.subSequence(start, end) as String
- when (col++) {
- COL_TIMESTAMP -> timestamp = Instant.ofEpochSecond(field.toLong(10))
- COL_CPU_USAGE -> cpuUsage = field.toDouble()
- COL_CPU_DEMAND -> cpuDemand = field.toDouble()
- COL_DISK_READ -> diskRead = field.toDouble()
- COL_DISK_WRITE -> diskWrite = field.toDouble()
- COL_CLUSTER_ID -> cluster = field.trim()
- COL_NCPUS -> cpuCores = field.toInt(10)
- COL_CPU_READY_PCT -> cpuReadyPct = field.toDouble()
- COL_POWERED_ON -> poweredOn = field.toInt(10) == 1
- COL_CPU_CAPACITY -> cpuCapacity = field.toDouble()
- COL_ID -> id = field.trim()
- COL_MEM_CAPACITY -> memCapacity = field.toDouble()
- }
- }
-
- return true
- }
-
- override fun hasColumn(column: TableColumn<*>): Boolean {
- return when (column) {
- RESOURCE_STATE_ID -> true
- RESOURCE_STATE_CLUSTER_ID -> true
- RESOURCE_STATE_TIMESTAMP -> true
- RESOURCE_STATE_NCPUS -> true
- RESOURCE_STATE_CPU_CAPACITY -> true
- RESOURCE_STATE_CPU_USAGE -> true
- RESOURCE_STATE_CPU_USAGE_PCT -> true
- RESOURCE_STATE_CPU_DEMAND -> true
- RESOURCE_STATE_CPU_READY_PCT -> true
- RESOURCE_STATE_MEM_CAPACITY -> true
- RESOURCE_STATE_DISK_READ -> true
- RESOURCE_STATE_DISK_WRITE -> true
- else -> false
- }
- }
-
- override fun <T> get(column: TableColumn<T>): T {
- val res: Any? = when (column) {
- RESOURCE_STATE_ID -> id
- RESOURCE_STATE_CLUSTER_ID -> cluster
- RESOURCE_STATE_TIMESTAMP -> timestamp
- RESOURCE_STATE_NCPUS -> getInt(RESOURCE_STATE_NCPUS)
- RESOURCE_STATE_CPU_CAPACITY -> getDouble(RESOURCE_STATE_CPU_CAPACITY)
- RESOURCE_STATE_CPU_USAGE -> getDouble(RESOURCE_STATE_CPU_USAGE)
- RESOURCE_STATE_CPU_USAGE_PCT -> getDouble(RESOURCE_STATE_CPU_USAGE_PCT)
- RESOURCE_STATE_MEM_CAPACITY -> getDouble(RESOURCE_STATE_MEM_CAPACITY)
- RESOURCE_STATE_DISK_READ -> getDouble(RESOURCE_STATE_DISK_READ)
- RESOURCE_STATE_DISK_WRITE -> getDouble(RESOURCE_STATE_DISK_WRITE)
- else -> throw IllegalArgumentException("Invalid column")
- }
-
- @Suppress("UNCHECKED_CAST")
- return res as T
- }
-
- override fun getBoolean(column: TableColumn<Boolean>): Boolean {
- return when (column) {
- RESOURCE_STATE_POWERED_ON -> poweredOn
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun getInt(column: TableColumn<Int>): Int {
- return when (column) {
- RESOURCE_STATE_NCPUS -> cpuCores
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun getLong(column: TableColumn<Long>): Long {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getDouble(column: TableColumn<Double>): Double {
- return when (column) {
- RESOURCE_STATE_CPU_CAPACITY -> cpuCapacity
- RESOURCE_STATE_CPU_USAGE -> cpuUsage
- RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsage / cpuCapacity
- RESOURCE_STATE_CPU_DEMAND -> cpuDemand
- RESOURCE_STATE_MEM_CAPACITY -> memCapacity
- RESOURCE_STATE_DISK_READ -> diskRead
- RESOURCE_STATE_DISK_WRITE -> diskWrite
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun close() {
- reader.close()
- }
-
- /**
- * State fields of the reader.
- */
- private var id: String? = null
- private var cluster: String? = null
- private var timestamp: Instant? = null
- private var cpuCores = -1
- private var cpuCapacity = Double.NaN
- private var cpuUsage = Double.NaN
- private var cpuDemand = Double.NaN
- private var cpuReadyPct = Double.NaN
- private var memCapacity = Double.NaN
- private var diskRead = Double.NaN
- private var diskWrite = Double.NaN
- private var poweredOn: Boolean = false
-
- /**
- * Reset the state of the reader.
- */
- private fun reset() {
- id = null
- timestamp = null
- cluster = null
- cpuCores = -1
- cpuCapacity = Double.NaN
- cpuUsage = Double.NaN
- cpuDemand = Double.NaN
- cpuReadyPct = Double.NaN
- memCapacity = Double.NaN
- diskRead = Double.NaN
- diskWrite = Double.NaN
- poweredOn = false
- }
-
- /**
- * Default column indices for the extended Bitbrains format.
- */
- private val COL_TIMESTAMP = 0
- private val COL_CPU_USAGE = 1
- private val COL_CPU_DEMAND = 2
- private val COL_DISK_READ = 4
- private val COL_DISK_WRITE = 6
- private val COL_CLUSTER_ID = 10
- private val COL_NCPUS = 12
- private val COL_CPU_READY_PCT = 13
- private val COL_POWERED_ON = 14
- private val COL_CPU_CAPACITY = 18
- private val COL_ID = 19
- private val COL_MEM_CAPACITY = 20
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTrace.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTrace.kt
deleted file mode 100644
index dbd63de5..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTrace.kt
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace.sv
-
-import org.opendc.trace.*
-import java.nio.file.Path
-
-/**
- * [Trace] implementation for the extended Bitbrains format.
- */
-public class SvTrace internal constructor(private val path: Path) : Trace {
- override val tables: List<String> = listOf(TABLE_RESOURCE_STATES)
-
- override fun containsTable(name: String): Boolean = TABLE_RESOURCE_STATES == name
-
- override fun getTable(name: String): Table? {
- if (!containsTable(name)) {
- return null
- }
-
- return SvResourceStateTable(path)
- }
-
- override fun toString(): String = "SvTrace[$path]"
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTraceFormat.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTraceFormat.kt
deleted file mode 100644
index 0cce8559..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTraceFormat.kt
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace.sv
-
-import org.opendc.trace.spi.TraceFormat
-import java.net.URL
-import java.nio.file.Paths
-import kotlin.io.path.exists
-
-/**
- * A format implementation for the extended Bitbrains trace format.
- */
-public class SvTraceFormat : TraceFormat {
- /**
- * The name of this trace format.
- */
- override val name: String = "sv"
-
- /**
- * Open the trace file.
- */
- override fun open(url: URL): SvTrace {
- val path = Paths.get(url.toURI())
- require(path.exists()) { "URL $url does not exist" }
- return SvTrace(path)
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt
deleted file mode 100644
index 065a8c93..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.util
-
-import io.opentelemetry.sdk.metrics.SdkMeterProvider
-import io.opentelemetry.sdk.metrics.export.MetricProducer
-import io.opentelemetry.sdk.resources.Resource
-import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
-import kotlinx.coroutines.coroutineScope
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.yield
-import org.opendc.compute.service.ComputeService
-import org.opendc.compute.service.scheduler.ComputeScheduler
-import org.opendc.compute.simulator.SimHost
-import org.opendc.experiments.capelin.env.MachineDef
-import org.opendc.experiments.capelin.trace.TraceReader
-import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider
-import org.opendc.simulator.compute.kernel.SimHypervisorProvider
-import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
-import org.opendc.simulator.compute.power.SimplePowerDriver
-import org.opendc.simulator.compute.workload.SimTraceWorkload
-import org.opendc.simulator.compute.workload.SimWorkload
-import org.opendc.simulator.resources.SimResourceInterpreter
-import org.opendc.telemetry.compute.*
-import org.opendc.telemetry.sdk.toOtelClock
-import java.time.Clock
-import kotlin.coroutines.CoroutineContext
-import kotlin.math.max
-
-/**
- * Helper class to manage a [ComputeService] simulation.
- */
-class ComputeServiceSimulator(
- private val context: CoroutineContext,
- private val clock: Clock,
- scheduler: ComputeScheduler,
- machines: List<MachineDef>,
- private val failureModel: FailureModel? = null,
- interferenceModel: VmInterferenceModel? = null,
- hypervisorProvider: SimHypervisorProvider = SimFairShareHypervisorProvider()
-) : AutoCloseable {
- /**
- * The [ComputeService] that has been configured by the manager.
- */
- val service: ComputeService
-
- /**
- * The [MetricProducer] that are used by the [ComputeService] and the simulated hosts.
- */
- val producers: List<MetricProducer>
- get() = _metricProducers
- private val _metricProducers = mutableListOf<MetricProducer>()
-
- /**
- * The [SimResourceInterpreter] to simulate the hosts.
- */
- private val interpreter = SimResourceInterpreter(context, clock)
-
- /**
- * The hosts that belong to this class.
- */
- private val hosts = mutableSetOf<SimHost>()
-
- init {
- val (service, serviceMeterProvider) = createService(scheduler)
- this._metricProducers.add(serviceMeterProvider)
- this.service = service
-
- for (def in machines) {
- val (host, hostMeterProvider) = createHost(def, hypervisorProvider, interferenceModel)
- this._metricProducers.add(hostMeterProvider)
- hosts.add(host)
- this.service.addHost(host)
- }
- }
-
- /**
- * Run a simulation of the [ComputeService] by replaying the workload trace given by [reader].
- */
- suspend fun run(reader: TraceReader<SimWorkload>) {
- val injector = failureModel?.createInjector(context, clock, service)
- val client = service.newClient()
-
- // Create new image for the virtual machine
- val image = client.newImage("vm-image")
-
- try {
- coroutineScope {
- // Start the fault injector
- injector?.start()
-
- var offset = Long.MIN_VALUE
-
- while (reader.hasNext()) {
- val entry = reader.next()
-
- if (offset < 0) {
- offset = entry.start - clock.millis()
- }
-
- // Make sure the trace entries are ordered by submission time
- assert(entry.start - offset >= 0) { "Invalid trace order" }
- delay(max(0, (entry.start - offset) - clock.millis()))
-
- launch {
- val workloadOffset = -offset + 300001
- val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace, workloadOffset)
-
- val server = client.newServer(
- entry.name,
- image,
- client.newFlavor(
- entry.name,
- entry.meta["cores"] as Int,
- entry.meta["required-memory"] as Long
- ),
- meta = entry.meta + mapOf("workload" to workload)
- )
-
- // Wait for the server reach its end time
- val endTime = entry.meta["end-time"] as Long
- delay(endTime + workloadOffset - clock.millis() + 1)
-
- // Delete the server after reaching the end-time of the virtual machine
- server.delete()
- }
- }
- }
-
- yield()
- } finally {
- injector?.close()
- reader.close()
- client.close()
- }
- }
-
- override fun close() {
- service.close()
-
- for (host in hosts) {
- host.close()
- }
-
- hosts.clear()
- }
-
- /**
- * Construct a [ComputeService] instance.
- */
- private fun createService(scheduler: ComputeScheduler): Pair<ComputeService, SdkMeterProvider> {
- val resource = Resource.builder()
- .put(ResourceAttributes.SERVICE_NAME, "opendc-compute")
- .build()
-
- val meterProvider = SdkMeterProvider.builder()
- .setClock(clock.toOtelClock())
- .setResource(resource)
- .build()
-
- val service = ComputeService(context, clock, meterProvider, scheduler)
- return service to meterProvider
- }
-
- /**
- * Construct a [SimHost] instance for the specified [MachineDef].
- */
- private fun createHost(
- def: MachineDef,
- hypervisorProvider: SimHypervisorProvider,
- interferenceModel: VmInterferenceModel? = null
- ): Pair<SimHost, SdkMeterProvider> {
- val resource = Resource.builder()
- .put(HOST_ID, def.uid.toString())
- .put(HOST_NAME, def.name)
- .put(HOST_ARCH, ResourceAttributes.HostArchValues.AMD64)
- .put(HOST_NCPUS, def.model.cpus.size)
- .put(HOST_MEM_CAPACITY, def.model.memory.sumOf { it.size })
- .build()
-
- val meterProvider = SdkMeterProvider.builder()
- .setClock(clock.toOtelClock())
- .setResource(resource)
- .build()
-
- val host = SimHost(
- def.uid,
- def.name,
- def.model,
- def.meta,
- context,
- interpreter,
- meterProvider,
- hypervisorProvider,
- powerDriver = SimplePowerDriver(def.powerModel),
- interferenceDomain = interferenceModel?.newDomain()
- )
-
- return host to meterProvider
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt
deleted file mode 100644
index 83393896..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.util
-
-import org.opendc.compute.service.ComputeService
-import org.opendc.compute.simulator.failure.HostFaultInjector
-import java.time.Clock
-import kotlin.coroutines.CoroutineContext
-
-/**
- * Factory interface for constructing [HostFaultInjector] for modeling failures of compute service hosts.
- */
-interface FailureModel {
- /**
- * Construct a [HostFaultInjector] for the specified [service].
- */
- fun createInjector(context: CoroutineContext, clock: Clock, service: ComputeService): HostFaultInjector
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt
deleted file mode 100644
index 89b4a31c..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-@file:JvmName("FailureModels")
-package org.opendc.experiments.capelin
-
-import org.apache.commons.math3.distribution.LogNormalDistribution
-import org.apache.commons.math3.random.Well19937c
-import org.opendc.compute.service.ComputeService
-import org.opendc.compute.simulator.SimHost
-import org.opendc.compute.simulator.failure.HostFaultInjector
-import org.opendc.compute.simulator.failure.StartStopHostFault
-import org.opendc.compute.simulator.failure.StochasticVictimSelector
-import org.opendc.experiments.capelin.util.FailureModel
-import java.time.Clock
-import java.time.Duration
-import kotlin.coroutines.CoroutineContext
-import kotlin.math.ln
-import kotlin.random.Random
-
-/**
- * Obtain a [FailureModel] based on the GRID'5000 failure trace.
- *
- * This fault injector uses parameters from the GRID'5000 failure trace as described in
- * "A Framework for the Study of Grid Inter-Operation Mechanisms", A. Iosup, 2009.
- */
-fun grid5000(failureInterval: Duration, seed: Int): FailureModel {
- return object : FailureModel {
- override fun createInjector(
- context: CoroutineContext,
- clock: Clock,
- service: ComputeService
- ): HostFaultInjector {
- val rng = Well19937c(seed)
- val hosts = service.hosts.map { it as SimHost }.toSet()
-
- // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009
- // GRID'5000
- return HostFaultInjector(
- context,
- clock,
- hosts,
- iat = LogNormalDistribution(rng, ln(failureInterval.toHours().toDouble()), 1.03),
- selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), Random(seed)),
- fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71))
- )
- }
-
- override fun toString(): String = "Grid5000FailureModel"
- }
-}
-
-/**
- * Obtain the [HostFaultInjector] to use for the experiments.
- *
- * This fault injector uses parameters from the GRID'5000 failure trace as described in
- * "A Framework for the Study of Grid Inter-Operation Mechanisms", A. Iosup, 2009.
- */
-fun createFaultInjector(
- context: CoroutineContext,
- clock: Clock,
- hosts: Set<SimHost>,
- seed: Int,
- failureInterval: Double
-): HostFaultInjector {
- val rng = Well19937c(seed)
-
- // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009
- // GRID'5000
- return HostFaultInjector(
- context,
- clock,
- hosts,
- iat = LogNormalDistribution(rng, ln(failureInterval), 1.03),
- selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), Random(seed)),
- fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71))
- )
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/VmPlacementReader.kt
index b55bd577..67de2777 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/VmPlacementReader.kt
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.trace
+package org.opendc.experiments.capelin.util
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml b/opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml
index d1c01b8e..d46b50c3 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml
@@ -36,7 +36,7 @@
<Logger name="org.opendc.experiments.capelin" level="info" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
- <Logger name="org.opendc.experiments.capelin.trace" level="debug" additivity="false">
+ <Logger name="org.opendc.experiments.vm.trace" level="debug" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
<Logger name="org.apache.hadoop" level="warn" additivity="false">