summaryrefslogtreecommitdiff
path: root/opendc-experiments
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-02 11:50:43 +0200
committerGitHub <noreply@github.com>2021-09-02 11:50:43 +0200
commit05f80bd9fb7caf765e3ebbb70d48d0d5e185bd42 (patch)
tree5fa9501621ad327028c2f2e12c9c367f44f6aebe /opendc-experiments
parent99f391d11db57c3db3f326958de8f66502969cdb (diff)
parent5935531137a22fdb920921580d491f86adec65c9 (diff)
merge: Add generic trace reading library
This pull request adds a generic trace reading library to OpenDC. The library has been designed to support a wide range of trace formats and uses a streaming approach to improve performance of reading large traces. * Add trace reading API * Implement API for GWF format * Implement API for SWF format * Implement API for WTF format * Implement API for Bitbrains format * Implement API for Bitbrains Parquet format **Breaking API Changes** * `opendc-format` has been removed in favour of `opendc-trace-*`
Diffstat (limited to 'opendc-experiments')
-rw-r--r--opendc-experiments/opendc-experiments-capelin/build.gradle.kts6
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt4
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt2
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/EnvironmentReader.kt35
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/MachineDef.kt38
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt2
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt2
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt64
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt6
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt263
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceEntry.kt44
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceReader.kt32
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt1
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt56
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTableReader.kt103
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt56
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt103
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTrace.kt49
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTraceFormat.kt47
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt140
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt230
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTrace.kt45
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTraceFormat.kt47
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt4
-rw-r--r--opendc-experiments/opendc-experiments-energy21/build.gradle.kts1
-rw-r--r--opendc-experiments/opendc-experiments-tf20/build.gradle.kts5
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/TensorFlowExperiment.kt3
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MLEnvironmentReader.kt17
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MachineDef.kt38
29 files changed, 1194 insertions, 249 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
index 53643aba..65cebe1b 100644
--- a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
+++ b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
@@ -31,7 +31,8 @@ plugins {
dependencies {
api(platform(projects.opendcPlatform))
api(projects.opendcHarness.opendcHarnessApi)
- implementation(projects.opendcFormat)
+ implementation(projects.opendcTrace.opendcTraceParquet)
+ implementation(projects.opendcTrace.opendcTraceBitbrains)
implementation(projects.opendcSimulator.opendcSimulatorCore)
implementation(projects.opendcSimulator.opendcSimulatorCompute)
implementation(projects.opendcSimulator.opendcSimulatorFailures)
@@ -43,6 +44,9 @@ dependencies {
implementation(libs.config)
implementation(libs.progressbar)
implementation(libs.clikt)
+ implementation(libs.jackson.module.kotlin) {
+ exclude(group = "org.jetbrains.kotlin", module = "kotlin-reflect")
+ }
implementation(libs.parquet)
testImplementation(libs.log4j.slf4j)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
index 7f428b2a..46e11056 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
@@ -45,10 +45,10 @@ import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher
import org.opendc.compute.service.scheduler.weights.RamWeigher
import org.opendc.compute.service.scheduler.weights.VCpuWeigher
import org.opendc.compute.simulator.SimHost
+import org.opendc.experiments.capelin.env.EnvironmentReader
import org.opendc.experiments.capelin.monitor.ExperimentMetricExporter
import org.opendc.experiments.capelin.monitor.ExperimentMonitor
-import org.opendc.format.environment.EnvironmentReader
-import org.opendc.format.trace.TraceReader
+import org.opendc.experiments.capelin.trace.TraceReader
import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.compute.power.SimplePowerDriver
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt
index d73d14f5..babd8ada 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt
@@ -22,8 +22,6 @@
package org.opendc.experiments.capelin.env
-import org.opendc.format.environment.EnvironmentReader
-import org.opendc.format.environment.MachineDef
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/EnvironmentReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/EnvironmentReader.kt
new file mode 100644
index 00000000..a968b043
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/EnvironmentReader.kt
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.env
+
+import java.io.Closeable
+
+/**
+ * An interface for reading descriptions of topology environments into memory.
+ */
+public interface EnvironmentReader : Closeable {
+ /**
+ * Read the environment into a list.
+ */
+ public fun read(): List<MachineDef>
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/MachineDef.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/MachineDef.kt
new file mode 100644
index 00000000..b0c0318f
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/MachineDef.kt
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.env
+
+import org.opendc.simulator.compute.model.MachineModel
+import org.opendc.simulator.compute.power.PowerModel
+import java.util.*
+
+/**
+ * A definition of a machine in a cluster.
+ */
+public data class MachineDef(
+ val uid: UUID,
+ val name: String,
+ val meta: Map<String, Any>,
+ val model: MachineModel,
+ val powerModel: PowerModel
+)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt
index d8f7ff75..897a6692 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt
@@ -28,7 +28,7 @@ import org.apache.avro.generic.GenericData
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.opendc.experiments.capelin.telemetry.Event
-import org.opendc.format.util.LocalOutputFile
+import org.opendc.trace.util.parquet.LocalOutputFile
import java.io.Closeable
import java.io.File
import java.util.concurrent.ArrayBlockingQueue
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt
index 0f49ecd2..0bf4ada6 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt
@@ -24,8 +24,6 @@ package org.opendc.experiments.capelin.trace
import org.opendc.experiments.capelin.model.CompositeWorkload
import org.opendc.experiments.capelin.model.Workload
-import org.opendc.format.trace.TraceEntry
-import org.opendc.format.trace.TraceReader
import org.opendc.simulator.compute.workload.SimWorkload
/**
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt
index 16ad6816..fa4e9ed8 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt
@@ -22,12 +22,10 @@
package org.opendc.experiments.capelin.trace
-import org.apache.avro.generic.GenericData
-import org.opendc.format.trace.TraceEntry
-import org.opendc.format.trace.TraceReader
-import org.opendc.format.util.LocalParquetReader
+import org.opendc.experiments.capelin.trace.bp.BPTraceFormat
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.compute.workload.SimWorkload
+import org.opendc.trace.*
import java.io.File
import java.util.UUID
@@ -38,26 +36,29 @@ import java.util.UUID
*/
class RawParquetTraceReader(private val path: File) {
/**
+ * The [Trace] that represents this trace.
+ */
+ private val trace = BPTraceFormat().open(path.toURI().toURL())
+
+ /**
* Read the fragments into memory.
*/
- private fun parseFragments(path: File): Map<String, List<SimTraceWorkload.Fragment>> {
- val reader = LocalParquetReader<GenericData.Record>(File(path, "trace.parquet"))
+ private fun parseFragments(): Map<String, List<SimTraceWorkload.Fragment>> {
+ val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader()
val fragments = mutableMapOf<String, MutableList<SimTraceWorkload.Fragment>>()
return try {
- while (true) {
- val record = reader.read() ?: break
-
- val id = record["id"].toString()
- val time = record["time"] as Long
- val duration = record["duration"] as Long
- val cores = record["cores"] as Int
- val cpuUsage = record["cpuUsage"] as Double
+ while (reader.nextRow()) {
+ val id = reader.get(RESOURCE_STATE_ID)
+ val time = reader.get(RESOURCE_STATE_TIMESTAMP)
+ val duration = reader.get(RESOURCE_STATE_DURATION)
+ val cores = reader.getInt(RESOURCE_STATE_NCPUS)
+ val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE)
val fragment = SimTraceWorkload.Fragment(
- time,
- duration,
+ time.toEpochMilli(),
+ duration.toMillis(),
cpuUsage,
cores
)
@@ -74,25 +75,24 @@ class RawParquetTraceReader(private val path: File) {
/**
* Read the metadata into a workload.
*/
- private fun parseMeta(path: File, fragments: Map<String, List<SimTraceWorkload.Fragment>>): List<TraceEntry<SimWorkload>> {
- val metaReader = LocalParquetReader<GenericData.Record>(File(path, "meta.parquet"))
+ private fun parseMeta(fragments: Map<String, List<SimTraceWorkload.Fragment>>): List<TraceEntry<SimWorkload>> {
+ val reader = checkNotNull(trace.getTable(TABLE_RESOURCES)).newReader()
var counter = 0
val entries = mutableListOf<TraceEntry<SimWorkload>>()
return try {
- while (true) {
- val record = metaReader.read() ?: break
+ while (reader.nextRow()) {
- val id = record["id"].toString()
+ val id = reader.get(RESOURCE_ID)
if (!fragments.containsKey(id)) {
continue
}
- val submissionTime = record["submissionTime"] as Long
- val endTime = record["endTime"] as Long
- val maxCores = record["maxCores"] as Int
- val requiredMemory = record["requiredMemory"] as Long
+ val submissionTime = reader.get(RESOURCE_START_TIME)
+ val endTime = reader.get(RESOURCE_END_TIME)
+ val maxCores = reader.getInt(RESOURCE_NCPUS)
+ val requiredMemory = reader.getDouble(RESOURCE_MEM_CAPACITY)
val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray())
val vmFragments = fragments.getValue(id).asSequence()
@@ -100,13 +100,13 @@ class RawParquetTraceReader(private val path: File) {
val workload = SimTraceWorkload(vmFragments)
entries.add(
TraceEntry(
- uid, id, submissionTime, workload,
+ uid, id, submissionTime.toEpochMilli(), workload,
mapOf(
- "submit-time" to submissionTime,
- "end-time" to endTime,
+ "submit-time" to submissionTime.toEpochMilli(),
+ "end-time" to endTime.toEpochMilli(),
"total-load" to totalLoad,
"cores" to maxCores,
- "required-memory" to requiredMemory,
+ "required-memory" to requiredMemory.toLong(),
"workload" to workload
)
)
@@ -118,7 +118,7 @@ class RawParquetTraceReader(private val path: File) {
e.printStackTrace()
throw e
} finally {
- metaReader.close()
+ reader.close()
}
}
@@ -128,8 +128,8 @@ class RawParquetTraceReader(private val path: File) {
private val entries: List<TraceEntry<SimWorkload>>
init {
- val fragments = parseFragments(path)
- entries = parseMeta(path, fragments)
+ val fragments = parseFragments()
+ entries = parseMeta(fragments)
}
/**
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt
index 35f4c5b8..61e4cab5 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt
@@ -30,11 +30,9 @@ import org.apache.parquet.filter2.predicate.FilterApi
import org.apache.parquet.filter2.predicate.Statistics
import org.apache.parquet.filter2.predicate.UserDefinedPredicate
import org.apache.parquet.io.api.Binary
-import org.opendc.format.trace.TraceEntry
-import org.opendc.format.trace.TraceReader
-import org.opendc.format.util.LocalInputFile
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.compute.workload.SimWorkload
+import org.opendc.trace.util.parquet.LocalInputFile
import java.io.File
import java.io.Serializable
import java.util.SortedSet
@@ -187,7 +185,7 @@ class StreamingParquetTraceReader(traceFile: File, selectedVms: List<String> = e
assert(uid !in takenIds)
takenIds += uid
- logger.info("Processing VM $id")
+ logger.info { "Processing VM $id" }
val internalBuffer = mutableListOf<SimTraceWorkload.Fragment>()
val externalBuffer = mutableListOf<SimTraceWorkload.Fragment>()
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt
index d7daa35b..a021de8d 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt
@@ -36,9 +36,11 @@ import org.apache.avro.generic.GenericData
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
-import org.opendc.format.trace.bitbrains.BitbrainsTraceReader
-import org.opendc.format.util.LocalOutputFile
-import org.opendc.simulator.compute.workload.SimTraceWorkload
+import org.opendc.experiments.capelin.trace.sv.SvTraceFormat
+import org.opendc.trace.*
+import org.opendc.trace.bitbrains.BitbrainsTraceFormat
+import org.opendc.trace.spi.TraceFormat
+import org.opendc.trace.util.parquet.LocalOutputFile
import java.io.BufferedReader
import java.io.File
import java.io.FileReader
@@ -156,220 +158,101 @@ sealed class TraceConversion(name: String) : OptionGroup(name) {
): MutableList<Fragment>
}
-class SolvinityConversion : TraceConversion("Solvinity") {
- private val clusters by option()
- .split(",")
-
- private val vmPlacements by option("--vm-placements", help = "file containing the VM placements")
- .file(canBeDir = false)
- .convert { VmPlacementReader(it.inputStream()).use { reader -> reader.read() } }
- .required()
+/**
+ * A [TraceConversion] that uses the Trace API to perform the conversion.
+ */
+abstract class AbstractConversion(name: String) : TraceConversion(name) {
+ abstract val format: TraceFormat
override fun read(
traceDirectory: File,
metaSchema: Schema,
metaWriter: ParquetWriter<GenericData.Record>
): MutableList<Fragment> {
- val clusters = clusters?.toSet() ?: emptySet()
- val timestampCol = 0
- val cpuUsageCol = 1
- val coreCol = 12
- val provisionedMemoryCol = 20
- val traceInterval = 5 * 60 * 1000L
-
- // Identify start time of the entire trace
- var minTimestamp = Long.MAX_VALUE
- traceDirectory.walk()
- .filterNot { it.isDirectory }
- .filter { it.extension == "csv" || it.extension == "txt" }
- .toList()
- .forEach file@{ vmFile ->
- BufferedReader(FileReader(vmFile)).use { reader ->
- reader.lineSequence()
- .chunked(128)
- .forEach { lines ->
- for (line in lines) {
- // Ignore comments in the trace
- if (line.startsWith("#") || line.isBlank()) {
- continue
- }
-
- val vmId = vmFile.name
+ val fragments = mutableListOf<Fragment>()
+ val trace = format.open(traceDirectory.toURI().toURL())
+ val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader()
- // Check if VM in topology
- val clusterName = vmPlacements[vmId]
- if (clusterName == null || !clusters.contains(clusterName)) {
- continue
- }
+ var lastId: String? = null
+ var maxCores = Int.MIN_VALUE
+ var requiredMemory = Long.MIN_VALUE
+ var minTime = Long.MAX_VALUE
+ var maxTime = Long.MIN_VALUE
+ var lastTimestamp = Long.MIN_VALUE
- val values = line.split("\t")
- val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L
+ while (reader.nextRow()) {
+ val id = reader.get(RESOURCE_STATE_ID)
- if (timestamp < minTimestamp) {
- minTimestamp = timestamp
- }
- return@file
- }
- }
- }
+ if (lastId != null && lastId != id) {
+ val metaRecord = GenericData.Record(metaSchema)
+ metaRecord.put("id", lastId)
+ metaRecord.put("submissionTime", minTime)
+ metaRecord.put("endTime", maxTime)
+ metaRecord.put("maxCores", maxCores)
+ metaRecord.put("requiredMemory", requiredMemory)
+ metaWriter.write(metaRecord)
}
+ lastId = id
- println("Start of trace at $minTimestamp")
+ val timestamp = reader.get(RESOURCE_STATE_TIMESTAMP)
+ val timestampMs = timestamp.toEpochMilli()
+ val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE)
+ val cores = reader.getInt(RESOURCE_STATE_NCPUS)
+ val memCapacity = reader.getDouble(RESOURCE_STATE_MEM_CAPACITY)
- val allFragments = mutableListOf<Fragment>()
+ maxCores = max(maxCores, cores)
+ requiredMemory = max(requiredMemory, (memCapacity / 1000).toLong())
- val begin = 15 * 24 * 60 * 60 * 1000L
- val end = 45 * 24 * 60 * 60 * 1000L
-
- traceDirectory.walk()
- .filterNot { it.isDirectory }
- .filter { it.extension == "csv" || it.extension == "txt" }
- .toList()
- .forEach { vmFile ->
- println(vmFile)
-
- var vmId = ""
- var maxCores = -1
- var requiredMemory = -1L
- var cores: Int
- var minTime = Long.MAX_VALUE
-
- val flopsFragments = sequence {
- var last: Fragment? = null
-
- BufferedReader(FileReader(vmFile)).use { reader ->
- reader.lineSequence()
- .chunked(128)
- .forEach { lines ->
- for (line in lines) {
- // Ignore comments in the trace
- if (line.startsWith("#") || line.isBlank()) {
- continue
- }
-
- val values = line.split("\t")
-
- vmId = vmFile.name
-
- // Check if VM in topology
- val clusterName = vmPlacements[vmId]
- if (clusterName == null || !clusters.contains(clusterName)) {
- continue
- }
-
- val timestamp =
- (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - minTimestamp
- if (begin > timestamp || timestamp > end) {
- continue
- }
-
- cores = values[coreCol].trim().toInt()
- requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong())
- maxCores = max(maxCores, cores)
- minTime = min(minTime, timestamp)
- val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz
- requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong())
- maxCores = max(maxCores, cores)
-
- val flops: Long = (cpuUsage * 5 * 60).toLong()
-
- last = if (last != null && last!!.flops == 0L && flops == 0L) {
- val oldFragment = last!!
- Fragment(
- vmId,
- oldFragment.tick,
- oldFragment.flops + flops,
- oldFragment.duration + traceInterval,
- cpuUsage,
- cores
- )
- } else {
- val fragment =
- Fragment(
- vmId,
- timestamp,
- flops,
- traceInterval,
- cpuUsage,
- cores
- )
- if (last != null) {
- yield(last!!)
- }
- fragment
- }
- }
- }
- }
-
- if (last != null) {
- yield(last!!)
- }
- }
-
- var maxTime = Long.MIN_VALUE
- flopsFragments.filter { it.tick in begin until end }.forEach { fragment ->
- allFragments.add(fragment)
- maxTime = max(maxTime, fragment.tick)
- }
-
- if (minTime in begin until end) {
- val metaRecord = GenericData.Record(metaSchema)
- metaRecord.put("id", vmId)
- metaRecord.put("submissionTime", minTime)
- metaRecord.put("endTime", maxTime)
- metaRecord.put("maxCores", maxCores)
- metaRecord.put("requiredMemory", requiredMemory)
- metaWriter.write(metaRecord)
- }
+ if (lastTimestamp < 0) {
+ lastTimestamp = timestampMs - 5 * 60 * 1000L
+ minTime = min(minTime, lastTimestamp)
}
- return allFragments
- }
-}
-
-/**
- * Conversion of the Bitbrains public trace.
- */
-class BitbrainsConversion : TraceConversion("Bitbrains") {
- override fun read(
- traceDirectory: File,
- metaSchema: Schema,
- metaWriter: ParquetWriter<GenericData.Record>
- ): MutableList<Fragment> {
- val fragments = mutableListOf<Fragment>()
- BitbrainsTraceReader(traceDirectory).use { reader ->
- reader.forEach { entry ->
- val trace = (entry.workload as SimTraceWorkload).trace
- var maxTime = Long.MIN_VALUE
- trace.forEach { fragment ->
- val flops: Long = (fragment.usage * fragment.duration / 1000).toLong()
+ if (fragments.isEmpty()) {
+ val duration = 5 * 60 * 1000L
+ val flops: Long = (cpuUsage * duration / 1000).toLong()
+ fragments.add(Fragment(id, lastTimestamp, flops, duration, cpuUsage, cores))
+ } else {
+ val last = fragments.last()
+ val duration = timestampMs - lastTimestamp
+ val flops: Long = (cpuUsage * duration / 1000).toLong()
+
+ // Perform run-length encoding
+ if (last.id == id && (duration == 0L || last.usage == cpuUsage)) {
+ fragments[fragments.size - 1] = last.copy(duration = last.duration + duration)
+ } else {
fragments.add(
Fragment(
- entry.name,
- fragment.timestamp,
+ id,
+ lastTimestamp,
flops,
- fragment.duration,
- fragment.usage,
- fragment.cores
+ duration,
+ cpuUsage,
+ cores
)
)
- maxTime = max(maxTime, fragment.timestamp + fragment.duration)
}
-
- val metaRecord = GenericData.Record(metaSchema)
- metaRecord.put("id", entry.name)
- metaRecord.put("submissionTime", entry.start)
- metaRecord.put("endTime", maxTime)
- metaRecord.put("maxCores", entry.meta["cores"])
- metaRecord.put("requiredMemory", entry.meta["required-memory"])
- metaWriter.write(metaRecord)
}
+
+ val last = fragments.last()
+ maxTime = max(maxTime, last.tick + last.duration)
+ lastTimestamp = timestampMs
}
return fragments
}
}
+class SolvinityConversion : AbstractConversion("Solvinity") {
+ override val format: TraceFormat = SvTraceFormat()
+}
+
+/**
+ * Conversion of the Bitbrains public trace.
+ */
+class BitbrainsConversion : AbstractConversion("Bitbrains") {
+ override val format: TraceFormat = BitbrainsTraceFormat()
+}
+
/**
* Conversion of the Azure public VM trace.
*/
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceEntry.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceEntry.kt
new file mode 100644
index 00000000..303a6a8c
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceEntry.kt
@@ -0,0 +1,44 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace
+
+import java.util.UUID
+
+/**
+ * An entry in a workload trace.
+ *
+ * @param uid The unique identifier of the entry.
+ * @param name The name of the entry.
+ * @param start The start time of the workload.
+ * @param workload The workload of the entry.
+ * @param meta The meta-data associated with the workload.
+ */
+public data class TraceEntry<out T>(
+ val uid: UUID,
+ val name: String,
+ val start: Long,
+ val workload: T,
+ val meta: Map<String, Any>
+)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceReader.kt
new file mode 100644
index 00000000..08304edc
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceReader.kt
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace
+
+/**
+ * An interface for reading workloads into memory.
+ *
+ * This interface must guarantee that the entries are delivered in order of submission time.
+ *
+ * @param T The shape of the workloads supported by this reader.
+ */
+public interface TraceReader<T> : Iterator<TraceEntry<T>>, AutoCloseable
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt
index 6de3f265..cb32ce88 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt
@@ -26,7 +26,6 @@ import mu.KotlinLogging
import org.opendc.experiments.capelin.model.CompositeWorkload
import org.opendc.experiments.capelin.model.SamplingStrategy
import org.opendc.experiments.capelin.model.Workload
-import org.opendc.format.trace.TraceEntry
import org.opendc.simulator.compute.workload.SimWorkload
import java.util.*
import kotlin.random.Random
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt
new file mode 100644
index 00000000..35bfd5ef
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt
@@ -0,0 +1,56 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace.bp
+
+import org.apache.avro.generic.GenericRecord
+import org.opendc.trace.*
+import org.opendc.trace.util.parquet.LocalParquetReader
+import java.nio.file.Path
+
+/**
+ * The resource state [Table] in the Bitbrains Parquet format.
+ */
+internal class BPResourceStateTable(private val path: Path) : Table {
+ override val name: String = TABLE_RESOURCE_STATES
+ override val isSynthetic: Boolean = false
+
+ override fun isSupported(column: TableColumn<*>): Boolean {
+ return when (column) {
+ RESOURCE_STATE_ID -> true
+ RESOURCE_STATE_TIMESTAMP -> true
+ RESOURCE_STATE_DURATION -> true
+ RESOURCE_STATE_NCPUS -> true
+ RESOURCE_STATE_CPU_USAGE -> true
+ else -> false
+ }
+ }
+
+ override fun newReader(): TableReader {
+ val reader = LocalParquetReader<GenericRecord>(path.resolve("trace.parquet"))
+ return BPResourceStateTableReader(reader)
+ }
+
+ override fun newReader(partition: String): TableReader {
+ throw IllegalArgumentException("Unknown partition $partition")
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTableReader.kt
new file mode 100644
index 00000000..0e7ee555
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTableReader.kt
@@ -0,0 +1,103 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace.bp
+
+import org.apache.avro.generic.GenericRecord
+import org.opendc.trace.*
+import org.opendc.trace.util.parquet.LocalParquetReader
+import java.time.Duration
+import java.time.Instant
+
+/**
+ * A [TableReader] implementation for the Bitbrains Parquet format.
+ */
+internal class BPResourceStateTableReader(private val reader: LocalParquetReader<GenericRecord>) : TableReader {
+ /**
+ * The current record.
+ */
+ private var record: GenericRecord? = null
+
+ override fun nextRow(): Boolean {
+ record = reader.read()
+ return record != null
+ }
+
+ override fun hasColumn(column: TableColumn<*>): Boolean {
+ return when (column) {
+ RESOURCE_STATE_ID -> true
+ RESOURCE_STATE_TIMESTAMP -> true
+ RESOURCE_STATE_DURATION -> true
+ RESOURCE_STATE_NCPUS -> true
+ RESOURCE_STATE_CPU_USAGE -> true
+ else -> false
+ }
+ }
+
+ override fun <T> get(column: TableColumn<T>): T {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+
+ @Suppress("UNCHECKED_CAST")
+ val res: Any = when (column) {
+ RESOURCE_STATE_ID -> record["id"].toString()
+ RESOURCE_STATE_TIMESTAMP -> Instant.ofEpochMilli(record["time"] as Long)
+ RESOURCE_STATE_DURATION -> Duration.ofMillis(record["duration"] as Long)
+ RESOURCE_STATE_NCPUS -> record["cores"]
+ RESOURCE_STATE_CPU_USAGE -> (record["cpuUsage"] as Number).toDouble()
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ return res as T
+ }
+
+ override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInt(column: TableColumn<Int>): Int {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+
+ return when (column) {
+ RESOURCE_STATE_NCPUS -> record["cores"] as Int
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getLong(column: TableColumn<Long>): Long {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getDouble(column: TableColumn<Double>): Double {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+ return when (column) {
+ RESOURCE_STATE_CPU_USAGE -> (record["cpuUsage"] as Number).toDouble()
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun close() {
+ reader.close()
+ }
+
+ override fun toString(): String = "BPResourceStateTableReader"
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt
new file mode 100644
index 00000000..74d1e574
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt
@@ -0,0 +1,56 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace.bp
+
+import org.apache.avro.generic.GenericRecord
+import org.opendc.trace.*
+import org.opendc.trace.util.parquet.LocalParquetReader
+import java.nio.file.Path
+
+/**
+ * The resource [Table] in the Bitbrains Parquet format.
+ */
+internal class BPResourceTable(private val path: Path) : Table {
+ override val name: String = TABLE_RESOURCES
+ override val isSynthetic: Boolean = false
+
+ override fun isSupported(column: TableColumn<*>): Boolean {
+ return when (column) {
+ RESOURCE_ID -> true
+ RESOURCE_START_TIME -> true
+ RESOURCE_END_TIME -> true
+ RESOURCE_NCPUS -> true
+ RESOURCE_MEM_CAPACITY -> true
+ else -> false
+ }
+ }
+
+ override fun newReader(): TableReader {
+ val reader = LocalParquetReader<GenericRecord>(path.resolve("meta.parquet"))
+ return BPResourceTableReader(reader)
+ }
+
+ override fun newReader(partition: String): TableReader {
+ throw IllegalArgumentException("Unknown partition $partition")
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt
new file mode 100644
index 00000000..0a105783
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt
@@ -0,0 +1,103 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace.bp
+
+import org.apache.avro.generic.GenericRecord
+import org.opendc.trace.*
+import org.opendc.trace.util.parquet.LocalParquetReader
+import java.time.Instant
+
+/**
+ * A [TableReader] implementation for the Bitbrains Parquet format.
+ */
+internal class BPResourceTableReader(private val reader: LocalParquetReader<GenericRecord>) : TableReader {
+ /**
+ * The current record.
+ */
+ private var record: GenericRecord? = null
+
+ override fun nextRow(): Boolean {
+ record = reader.read()
+ return record != null
+ }
+
+ override fun hasColumn(column: TableColumn<*>): Boolean {
+ return when (column) {
+ RESOURCE_ID -> true
+ RESOURCE_START_TIME -> true
+ RESOURCE_END_TIME -> true
+ RESOURCE_NCPUS -> true
+ RESOURCE_MEM_CAPACITY -> true
+ else -> false
+ }
+ }
+
+ override fun <T> get(column: TableColumn<T>): T {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+
+ @Suppress("UNCHECKED_CAST")
+ val res: Any = when (column) {
+ RESOURCE_ID -> record["id"].toString()
+ RESOURCE_START_TIME -> Instant.ofEpochMilli(record["submissionTime"] as Long)
+ RESOURCE_END_TIME -> Instant.ofEpochMilli(record["endTime"] as Long)
+ RESOURCE_NCPUS -> record["maxCores"]
+ RESOURCE_MEM_CAPACITY -> (record["requiredMemory"] as Number).toDouble()
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ return res as T
+ }
+
+ override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInt(column: TableColumn<Int>): Int {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+
+ return when (column) {
+ RESOURCE_NCPUS -> record["maxCores"] as Int
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getLong(column: TableColumn<Long>): Long {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getDouble(column: TableColumn<Double>): Double {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+
+ return when (column) {
+ RESOURCE_MEM_CAPACITY -> (record["requiredMemory"] as Number).toDouble()
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun close() {
+ reader.close()
+ }
+
+ override fun toString(): String = "BPResourceTableReader"
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTrace.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTrace.kt
new file mode 100644
index 00000000..486587b1
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTrace.kt
@@ -0,0 +1,49 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace.bp
+
+import org.opendc.trace.TABLE_RESOURCES
+import org.opendc.trace.TABLE_RESOURCE_STATES
+import org.opendc.trace.Table
+import org.opendc.trace.Trace
+import java.nio.file.Path
+
+/**
+ * A [Trace] in the Bitbrains Parquet format.
+ */
+public class BPTrace internal constructor(private val path: Path) : Trace {
+ override val tables: List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES)
+
+ override fun containsTable(name: String): Boolean =
+ name == TABLE_RESOURCES || name == TABLE_RESOURCE_STATES
+
+ override fun getTable(name: String): Table? {
+ return when (name) {
+ TABLE_RESOURCES -> BPResourceTable(path)
+ TABLE_RESOURCE_STATES -> BPResourceStateTable(path)
+ else -> null
+ }
+ }
+
+ override fun toString(): String = "BPTrace[$path]"
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTraceFormat.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTraceFormat.kt
new file mode 100644
index 00000000..49d5b4c5
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTraceFormat.kt
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace.bp
+
+import org.opendc.trace.spi.TraceFormat
+import java.net.URL
+import java.nio.file.Paths
+import kotlin.io.path.exists
+
+/**
+ * A format implementation for the GWF trace format.
+ */
+public class BPTraceFormat : TraceFormat {
+ /**
+ * The name of this trace format.
+ */
+ override val name: String = "bitbrains-parquet"
+
+ /**
+ * Open a Bitbrains Parquet trace.
+ */
+ override fun open(url: URL): BPTrace {
+ val path = Paths.get(url.toURI())
+ require(path.exists()) { "URL $url does not exist" }
+ return BPTrace(path)
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt
new file mode 100644
index 00000000..71c9d52e
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt
@@ -0,0 +1,140 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace.sv
+
+import org.opendc.trace.*
+import java.nio.file.Files
+import java.nio.file.Path
+import java.util.stream.Collectors
+import kotlin.io.path.bufferedReader
+import kotlin.io.path.extension
+import kotlin.io.path.nameWithoutExtension
+
+/**
+ * The resource state [Table] in the Bitbrains format.
+ */
+internal class SvResourceStateTable(path: Path) : Table {
+ /**
+ * The partitions that belong to the table.
+ */
+ private val partitions = Files.walk(path, 1)
+ .filter { !Files.isDirectory(it) && it.extension == "txt" }
+ .collect(Collectors.toMap({ it.nameWithoutExtension }, { it }))
+
+ override val name: String = TABLE_RESOURCE_STATES
+
+ override val isSynthetic: Boolean = false
+
+ override fun isSupported(column: TableColumn<*>): Boolean {
+ return when (column) {
+ RESOURCE_STATE_ID -> true
+ RESOURCE_STATE_CLUSTER_ID -> true
+ RESOURCE_STATE_TIMESTAMP -> true
+ RESOURCE_STATE_NCPUS -> true
+ RESOURCE_STATE_CPU_CAPACITY -> true
+ RESOURCE_STATE_CPU_USAGE -> true
+ RESOURCE_STATE_CPU_USAGE_PCT -> true
+ RESOURCE_STATE_CPU_DEMAND -> true
+ RESOURCE_STATE_CPU_READY_PCT -> true
+ RESOURCE_STATE_MEM_CAPACITY -> true
+ RESOURCE_STATE_DISK_READ -> true
+ RESOURCE_STATE_DISK_WRITE -> true
+ else -> false
+ }
+ }
+
+ override fun newReader(): TableReader {
+ val it = partitions.iterator()
+
+ return object : TableReader {
+ var delegate: TableReader? = nextDelegate()
+
+ override fun nextRow(): Boolean {
+ var delegate = delegate
+
+ while (delegate != null) {
+ if (delegate.nextRow()) {
+ break
+ }
+
+ delegate.close()
+ delegate = nextDelegate()
+ }
+
+ this.delegate = delegate
+ return delegate != null
+ }
+
+ override fun hasColumn(column: TableColumn<*>): Boolean = delegate?.hasColumn(column) ?: false
+
+ override fun <T> get(column: TableColumn<T>): T {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.get(column)
+ }
+
+ override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getBoolean(column)
+ }
+
+ override fun getInt(column: TableColumn<Int>): Int {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getInt(column)
+ }
+
+ override fun getLong(column: TableColumn<Long>): Long {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getLong(column)
+ }
+
+ override fun getDouble(column: TableColumn<Double>): Double {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getDouble(column)
+ }
+
+ override fun close() {
+ delegate?.close()
+ }
+
+ private fun nextDelegate(): TableReader? {
+ return if (it.hasNext()) {
+ val (partition, path) = it.next()
+ val reader = path.bufferedReader()
+ return SvResourceStateTableReader(partition, reader)
+ } else {
+ null
+ }
+ }
+
+ override fun toString(): String = "BitbrainsCompositeTableReader"
+ }
+ }
+
+ override fun newReader(partition: String): TableReader {
+ val path = requireNotNull(partitions[partition]) { "Invalid partition $partition" }
+ val reader = path.bufferedReader()
+ return SvResourceStateTableReader(partition, reader)
+ }
+
+ override fun toString(): String = "SvResourceStateTable"
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt
new file mode 100644
index 00000000..adcdb2ea
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt
@@ -0,0 +1,230 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace.sv
+
+import org.opendc.trace.*
+import java.io.BufferedReader
+import java.time.Instant
+
+/**
+ * A [TableReader] for the Bitbrains resource state table.
+ */
+internal class SvResourceStateTableReader(partition: String, private val reader: BufferedReader) : TableReader {
+ /**
+ * The current parser state.
+ */
+ private val state = RowState()
+
+ override fun nextRow(): Boolean {
+ state.reset()
+
+ var line: String
+ var num = 0
+
+ while (true) {
+ line = reader.readLine() ?: return false
+ num++
+
+ if (line[0] == '#' || line.isBlank()) {
+ // Ignore empty lines or comments
+ continue
+ }
+
+ break
+ }
+
+ line = line.trim()
+
+ val length = line.length
+ var col = 0
+ var start = 0
+ var end = 0
+
+ while (end < length) {
+ // Trim all whitespace before the field
+ start = end
+ while (start < length && line[start].isWhitespace()) {
+ start++
+ }
+
+ end = line.indexOf(' ', start)
+
+ if (end < 0) {
+ break
+ }
+
+ val field = line.subSequence(start, end) as String
+ when (col++) {
+ COL_TIMESTAMP -> state.timestamp = Instant.ofEpochSecond(field.toLong(10))
+ COL_CPU_USAGE -> state.cpuUsage = field.toDouble()
+ COL_CPU_DEMAND -> state.cpuDemand = field.toDouble()
+ COL_DISK_READ -> state.diskRead = field.toDouble()
+ COL_DISK_WRITE -> state.diskWrite = field.toDouble()
+ COL_CLUSTER_ID -> state.cluster = field.trim()
+ COL_NCPUS -> state.cpuCores = field.toInt(10)
+ COL_CPU_READY_PCT -> state.cpuReadyPct = field.toDouble()
+ COL_POWERED_ON -> state.poweredOn = field.toInt(10) == 1
+ COL_CPU_CAPACITY -> state.cpuCapacity = field.toDouble()
+ COL_ID -> state.id = field.trim()
+ COL_MEM_CAPACITY -> state.memCapacity = field.toDouble()
+ }
+ }
+
+ return true
+ }
+
+ override fun hasColumn(column: TableColumn<*>): Boolean {
+ return when (column) {
+ RESOURCE_STATE_ID -> true
+ RESOURCE_STATE_CLUSTER_ID -> true
+ RESOURCE_STATE_TIMESTAMP -> true
+ RESOURCE_STATE_NCPUS -> true
+ RESOURCE_STATE_CPU_CAPACITY -> true
+ RESOURCE_STATE_CPU_USAGE -> true
+ RESOURCE_STATE_CPU_USAGE_PCT -> true
+ RESOURCE_STATE_CPU_DEMAND -> true
+ RESOURCE_STATE_CPU_READY_PCT -> true
+ RESOURCE_STATE_MEM_CAPACITY -> true
+ RESOURCE_STATE_DISK_READ -> true
+ RESOURCE_STATE_DISK_WRITE -> true
+ else -> false
+ }
+ }
+
+ override fun <T> get(column: TableColumn<T>): T {
+ val res: Any? = when (column) {
+ RESOURCE_STATE_ID -> state.id
+ RESOURCE_STATE_CLUSTER_ID -> state.cluster
+ RESOURCE_STATE_TIMESTAMP -> state.timestamp
+ RESOURCE_STATE_NCPUS -> state.cpuCores
+ RESOURCE_STATE_CPU_CAPACITY -> state.cpuCapacity
+ RESOURCE_STATE_CPU_USAGE -> state.cpuUsage
+ RESOURCE_STATE_CPU_USAGE_PCT -> state.cpuUsage / state.cpuCapacity
+ RESOURCE_STATE_MEM_CAPACITY -> state.memCapacity
+ RESOURCE_STATE_DISK_READ -> state.diskRead
+ RESOURCE_STATE_DISK_WRITE -> state.diskWrite
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ return res as T
+ }
+
+ override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ return when (column) {
+ RESOURCE_STATE_POWERED_ON -> state.poweredOn
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getInt(column: TableColumn<Int>): Int {
+ return when (column) {
+ RESOURCE_STATE_NCPUS -> state.cpuCores
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getLong(column: TableColumn<Long>): Long {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getDouble(column: TableColumn<Double>): Double {
+ return when (column) {
+ RESOURCE_STATE_CPU_CAPACITY -> state.cpuCapacity
+ RESOURCE_STATE_CPU_USAGE -> state.cpuUsage
+ RESOURCE_STATE_CPU_USAGE_PCT -> state.cpuUsage / state.cpuCapacity
+ RESOURCE_STATE_MEM_CAPACITY -> state.memCapacity
+ RESOURCE_STATE_DISK_READ -> state.diskRead
+ RESOURCE_STATE_DISK_WRITE -> state.diskWrite
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun close() {
+ reader.close()
+ }
+
+ /**
+ * The current row state.
+ */
+ private class RowState {
+ @JvmField
+ var id: String? = null
+ @JvmField
+ var cluster: String? = null
+ @JvmField
+ var timestamp: Instant? = null
+ @JvmField
+ var cpuCores = -1
+ @JvmField
+ var cpuCapacity = Double.NaN
+ @JvmField
+ var cpuUsage = Double.NaN
+ @JvmField
+ var cpuDemand = Double.NaN
+ @JvmField
+ var cpuReadyPct = Double.NaN
+ @JvmField
+ var memCapacity = Double.NaN
+ @JvmField
+ var diskRead = Double.NaN
+ @JvmField
+ var diskWrite = Double.NaN
+ @JvmField
+ var poweredOn: Boolean = false
+
+ /**
+ * Reset the state.
+ */
+ fun reset() {
+ id = null
+ timestamp = null
+ cluster = null
+ cpuCores = -1
+ cpuCapacity = Double.NaN
+ cpuUsage = Double.NaN
+ cpuDemand = Double.NaN
+ cpuReadyPct = Double.NaN
+ memCapacity = Double.NaN
+ diskRead = Double.NaN
+ diskWrite = Double.NaN
+ poweredOn = false
+ }
+ }
+
+ /**
+ * Default column indices for the extended Bitbrains format.
+ */
+ private val COL_TIMESTAMP = 0
+ private val COL_CPU_USAGE = 1
+ private val COL_CPU_DEMAND = 2
+ private val COL_DISK_READ = 4
+ private val COL_DISK_WRITE = 6
+ private val COL_CLUSTER_ID = 10
+ private val COL_NCPUS = 12
+ private val COL_CPU_READY_PCT = 13
+ private val COL_POWERED_ON = 14
+ private val COL_CPU_CAPACITY = 18
+ private val COL_ID = 19
+ private val COL_MEM_CAPACITY = 20
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTrace.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTrace.kt
new file mode 100644
index 00000000..dbd63de5
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTrace.kt
@@ -0,0 +1,45 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace.sv
+
+import org.opendc.trace.*
+import java.nio.file.Path
+
+/**
+ * [Trace] implementation for the extended Bitbrains format.
+ */
+public class SvTrace internal constructor(private val path: Path) : Trace {
+ override val tables: List<String> = listOf(TABLE_RESOURCE_STATES)
+
+ override fun containsTable(name: String): Boolean = TABLE_RESOURCE_STATES == name
+
+ override fun getTable(name: String): Table? {
+ if (!containsTable(name)) {
+ return null
+ }
+
+ return SvResourceStateTable(path)
+ }
+
+ override fun toString(): String = "SvTrace[$path]"
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTraceFormat.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTraceFormat.kt
new file mode 100644
index 00000000..0cce8559
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTraceFormat.kt
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace.sv
+
+import org.opendc.trace.spi.TraceFormat
+import java.net.URL
+import java.nio.file.Paths
+import kotlin.io.path.exists
+
+/**
+ * A format implementation for the extended Bitbrains trace format.
+ */
+public class SvTraceFormat : TraceFormat {
+ /**
+ * The name of this trace format.
+ */
+ override val name: String = "sv"
+
+ /**
+ * Open the trace file.
+ */
+ override fun open(url: URL): SvTrace {
+ val path = Paths.get(url.toURI())
+ require(path.exists()) { "URL $url does not exist" }
+ return SvTrace(path)
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index e4d3fed3..2934bbe6 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -36,13 +36,13 @@ import org.opendc.compute.service.scheduler.filters.RamFilter
import org.opendc.compute.service.scheduler.filters.VCpuFilter
import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
import org.opendc.experiments.capelin.env.ClusterEnvironmentReader
+import org.opendc.experiments.capelin.env.EnvironmentReader
import org.opendc.experiments.capelin.model.Workload
import org.opendc.experiments.capelin.monitor.ExperimentMonitor
import org.opendc.experiments.capelin.trace.ParquetTraceReader
import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader
import org.opendc.experiments.capelin.trace.RawParquetTraceReader
-import org.opendc.format.environment.EnvironmentReader
-import org.opendc.format.trace.TraceReader
+import org.opendc.experiments.capelin.trace.TraceReader
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.core.runBlockingSimulation
diff --git a/opendc-experiments/opendc-experiments-energy21/build.gradle.kts b/opendc-experiments/opendc-experiments-energy21/build.gradle.kts
index bc05f09b..7d34d098 100644
--- a/opendc-experiments/opendc-experiments-energy21/build.gradle.kts
+++ b/opendc-experiments/opendc-experiments-energy21/build.gradle.kts
@@ -31,7 +31,6 @@ plugins {
dependencies {
api(platform(projects.opendcPlatform))
api(projects.opendcHarness.opendcHarnessApi)
- implementation(projects.opendcFormat)
implementation(projects.opendcSimulator.opendcSimulatorCore)
implementation(projects.opendcSimulator.opendcSimulatorCompute)
implementation(projects.opendcSimulator.opendcSimulatorFailures)
diff --git a/opendc-experiments/opendc-experiments-tf20/build.gradle.kts b/opendc-experiments/opendc-experiments-tf20/build.gradle.kts
index b088045b..882c4894 100644
--- a/opendc-experiments/opendc-experiments-tf20/build.gradle.kts
+++ b/opendc-experiments/opendc-experiments-tf20/build.gradle.kts
@@ -34,8 +34,11 @@ dependencies {
implementation(projects.opendcSimulator.opendcSimulatorCore)
implementation(projects.opendcSimulator.opendcSimulatorCompute)
implementation(projects.opendcTelemetry.opendcTelemetrySdk)
- implementation(projects.opendcFormat)
implementation(projects.opendcUtils)
implementation(libs.kotlin.logging)
+ implementation(libs.jackson.module.kotlin) {
+ exclude(group = "org.jetbrains.kotlin", module = "kotlin-reflect")
+ }
+ implementation("org.jetbrains.kotlin:kotlin-reflect:1.5.30")
}
diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/TensorFlowExperiment.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/TensorFlowExperiment.kt
index 9a48aced..2153a862 100644
--- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/TensorFlowExperiment.kt
+++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/TensorFlowExperiment.kt
@@ -55,7 +55,8 @@ public class TensorFlowExperiment : Experiment(name = "tf20") {
.build()
val meter = meterProvider.get("opendc-tf20")
- val def = MLEnvironmentReader(TensorFlowExperiment::class.java.getResourceAsStream(environmentFile)).read().first()
+ val envInput = checkNotNull(TensorFlowExperiment::class.java.getResourceAsStream(environmentFile))
+ val def = MLEnvironmentReader().readEnvironment(envInput).first()
val device = SimTFDevice(
def.uid, def.meta["gpu"] as Boolean, coroutineContext, clock, meter, def.model.cpus[0],
def.model.memory[0], LinearPowerModel(250.0, 60.0)
diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MLEnvironmentReader.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MLEnvironmentReader.kt
index 3e61f508..3cdf28fd 100644
--- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MLEnvironmentReader.kt
+++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MLEnvironmentReader.kt
@@ -25,8 +25,6 @@ package org.opendc.experiments.tf20.util
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
-import org.opendc.format.environment.EnvironmentReader
-import org.opendc.format.environment.MachineDef
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
@@ -36,13 +34,16 @@ import java.io.InputStream
import java.util.*
/**
- * An [EnvironmentReader] for the TensorFlow experiments.
+ * An environment reader for the TensorFlow experiments.
*/
-public class MLEnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonObjectMapper()) : EnvironmentReader {
+public class MLEnvironmentReader {
+ /**
+ * The [ObjectMapper] to convert the format.
+ */
+ private val mapper = jacksonObjectMapper()
- private val setup: Setup = mapper.readValue(input)
-
- override fun read(): List<MachineDef> {
+ public fun readEnvironment(input: InputStream): List<MachineDef> {
+ val setup: Setup = mapper.readValue(input)
var counter = 0
return setup.rooms.flatMap { room ->
room.objects.flatMap { roomObject ->
@@ -109,6 +110,4 @@ public class MLEnvironmentReader(input: InputStream, mapper: ObjectMapper = jack
}
}
}
-
- override fun close() {}
}
diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MachineDef.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MachineDef.kt
new file mode 100644
index 00000000..271f5923
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MachineDef.kt
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.tf20.util
+
+import org.opendc.simulator.compute.model.MachineModel
+import org.opendc.simulator.compute.power.PowerModel
+import java.util.*
+
+/**
+ * A definition of a machine in a cluster.
+ */
+public data class MachineDef(
+ val uid: UUID,
+ val name: String,
+ val meta: Map<String, Any>,
+ val model: MachineModel,
+ val powerModel: PowerModel
+)