summaryrefslogtreecommitdiff
path: root/opendc-experiments/opendc-experiments-capelin/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-02 13:20:05 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-07 14:44:41 +0200
commit18ff316a6b6ab984ebf8283ea48ed98ec69d8295 (patch)
tree11d933753c515140a6ae846fe96448ad64b165aa /opendc-experiments/opendc-experiments-capelin/src
parent05aeb87093903df3fa00e05353a5f135293fca7e (diff)
refactor(capelin): Restructure input reading classes
Diffstat (limited to 'opendc-experiments/opendc-experiments-capelin/src')
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt5
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReader.kt21
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt4
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt19
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt6
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt4
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt4
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt4
8 files changed, 29 insertions, 38 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index d3bba182..4db04591 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -122,8 +122,9 @@ abstract class Portfolio(name: String) : Experiment(name) {
}
val performanceInterferenceModel = if (operationalPhenomena.hasInterference)
- PerformanceInterferenceReader(FileInputStream(config.getString("interference-model")))
- .use { VmInterferenceModel(it.read(), Random(seeder.nextLong())) }
+ PerformanceInterferenceReader()
+ .read(FileInputStream(config.getString("interference-model")))
+ .let { VmInterferenceModel(it, Random(seeder.nextLong())) }
else
null
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReader.kt
index a19f5699..9549af42 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReader.kt
@@ -31,14 +31,13 @@ import java.io.InputStream
/**
* A parser for the JSON performance interference setup files used for the TPDS article on Capelin.
- *
- * @param input The input stream to read from.
- * @param mapper The Jackson object mapper to use.
*/
-class PerformanceInterferenceReader(
- private val input: InputStream,
- private val mapper: ObjectMapper = jacksonObjectMapper()
-) : AutoCloseable {
+class PerformanceInterferenceReader {
+ /**
+ * The [ObjectMapper] to use.
+ */
+ private val mapper = jacksonObjectMapper()
+
init {
mapper.addMixIn(VmInterferenceGroup::class.java, GroupMixin::class.java)
}
@@ -46,12 +45,8 @@ class PerformanceInterferenceReader(
/**
* Read the performance interface model from the input.
*/
- fun read(): List<VmInterferenceGroup> {
- return mapper.readValue(input)
- }
-
- override fun close() {
- input.close()
+ fun read(input: InputStream): List<VmInterferenceGroup> {
+ return input.use { mapper.readValue(input) }
}
private data class GroupMixin(
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt
index 61e4cab5..ed82217d 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt
@@ -41,8 +41,6 @@ import java.util.UUID
import java.util.concurrent.ArrayBlockingQueue
import kotlin.concurrent.thread
-private val logger = KotlinLogging.logger {}
-
/**
* A [TraceReader] for the internal VM workload trace format that streams workloads on the fly.
*
@@ -50,6 +48,8 @@ private val logger = KotlinLogging.logger {}
* @param selectedVms The list of VMs to read from the trace.
*/
class StreamingParquetTraceReader(traceFile: File, selectedVms: List<String> = emptyList()) : TraceReader<SimWorkload> {
+ private val logger = KotlinLogging.logger {}
+
/**
* The internal iterator to use for this reader.
*/
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt
index 7a1683f0..b55bd577 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt
@@ -29,24 +29,19 @@ import java.io.InputStream
/**
* A parser for the JSON VM placement data files used for the TPDS article on Capelin.
- *
- * @param input The input stream to read from.
- * @param mapper The Jackson object mapper to use.
*/
-public class VmPlacementReader(
- private val input: InputStream,
- private val mapper: ObjectMapper = jacksonObjectMapper()
-) : AutoCloseable {
+class VmPlacementReader {
+ /**
+ * The [ObjectMapper] to parse the placement.
+ */
+ private val mapper = jacksonObjectMapper()
+
/**
* Read the VM placements from the input.
*/
- public fun read(): Map<String, String> {
+ fun read(input: InputStream): Map<String, String> {
return mapper.readValue<Map<String, String>>(input)
.mapKeys { "vm__workload__${it.key}.txt" }
.mapValues { it.value.split("/")[1] } // Clusters have format XX0 / X00
}
-
- override fun close() {
- input.close()
- }
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt
index 71c9d52e..24abb109 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt
@@ -118,9 +118,9 @@ internal class SvResourceStateTable(path: Path) : Table {
private fun nextDelegate(): TableReader? {
return if (it.hasNext()) {
- val (partition, path) = it.next()
+ val (_, path) = it.next()
val reader = path.bufferedReader()
- return SvResourceStateTableReader(partition, reader)
+ return SvResourceStateTableReader(reader)
} else {
null
}
@@ -133,7 +133,7 @@ internal class SvResourceStateTable(path: Path) : Table {
override fun newReader(partition: String): TableReader {
val path = requireNotNull(partitions[partition]) { "Invalid partition $partition" }
val reader = path.bufferedReader()
- return SvResourceStateTableReader(partition, reader)
+ return SvResourceStateTableReader(reader)
}
override fun toString(): String = "SvResourceStateTable"
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt
index adcdb2ea..1a556f8d 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt
@@ -29,7 +29,7 @@ import java.time.Instant
/**
* A [TableReader] for the Bitbrains resource state table.
*/
-internal class SvResourceStateTableReader(partition: String, private val reader: BufferedReader) : TableReader {
+internal class SvResourceStateTableReader(private val reader: BufferedReader) : TableReader {
/**
* The current parser state.
*/
@@ -57,7 +57,7 @@ internal class SvResourceStateTableReader(partition: String, private val reader:
val length = line.length
var col = 0
- var start = 0
+ var start: Int
var end = 0
while (end < length) {
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index abd8efeb..aed9a4bb 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -197,7 +197,9 @@ class CapelinIntegrationTest {
val perfInterferenceInput = checkNotNull(CapelinIntegrationTest::class.java.getResourceAsStream("/bitbrains-perf-interference.json"))
val performanceInterferenceModel =
- PerformanceInterferenceReader(perfInterferenceInput).use { VmInterferenceModel(it.read(), Random(seed.toLong())) }
+ PerformanceInterferenceReader()
+ .read(perfInterferenceInput)
+ .let { VmInterferenceModel(it, Random(seed.toLong())) }
val meterProvider = createMeterProvider(clock)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt
index 9b1513dc..fbc39b87 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt
@@ -33,9 +33,7 @@ class PerformanceInterferenceReaderTest {
@Test
fun testSmoke() {
val input = checkNotNull(PerformanceInterferenceReader::class.java.getResourceAsStream("/perf-interference.json"))
- val reader = PerformanceInterferenceReader(input)
-
- val result = reader.use { reader.read() }
+ val result = PerformanceInterferenceReader().read(input)
assertAll(
{ assertEquals(2, result.size) },