diff options
8 files changed, 115 insertions, 139 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt index fa4e9ed8..ca937328 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt @@ -90,9 +90,9 @@ class RawParquetTraceReader(private val path: File) { } val submissionTime = reader.get(RESOURCE_START_TIME) - val endTime = reader.get(RESOURCE_END_TIME) + val endTime = reader.get(RESOURCE_STOP_TIME) val maxCores = reader.getInt(RESOURCE_NCPUS) - val requiredMemory = reader.getDouble(RESOURCE_MEM_CAPACITY) + val requiredMemory = reader.getDouble(RESOURCE_MEM_CAPACITY) / 1000.0 // Convert from KB to MB val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray()) val vmFragments = fragments.getValue(id).asSequence() diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt index 74d1e574..bff8c55e 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt @@ -38,7 +38,7 @@ internal class BPResourceTable(private val path: Path) : Table { return when (column) { RESOURCE_ID -> true RESOURCE_START_TIME -> true - RESOURCE_END_TIME -> true + RESOURCE_STOP_TIME -> true RESOURCE_NCPUS -> true RESOURCE_MEM_CAPACITY -> true else -> false diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt index 0a105783..4416aae8 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt @@ -45,7 +45,7 @@ internal class BPResourceTableReader(private val reader: LocalParquetReader<Gene return when (column) { RESOURCE_ID -> true RESOURCE_START_TIME -> true - RESOURCE_END_TIME -> true + RESOURCE_STOP_TIME -> true RESOURCE_NCPUS -> true RESOURCE_MEM_CAPACITY -> true else -> false @@ -59,9 +59,9 @@ internal class BPResourceTableReader(private val reader: LocalParquetReader<Gene val res: Any = when (column) { RESOURCE_ID -> record["id"].toString() RESOURCE_START_TIME -> Instant.ofEpochMilli(record["submissionTime"] as Long) - RESOURCE_END_TIME -> Instant.ofEpochMilli(record["endTime"] as Long) - RESOURCE_NCPUS -> record["maxCores"] - RESOURCE_MEM_CAPACITY -> (record["requiredMemory"] as Number).toDouble() + RESOURCE_STOP_TIME -> Instant.ofEpochMilli(record["endTime"] as Long) + RESOURCE_NCPUS -> getInt(RESOURCE_NCPUS) + RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_MEM_CAPACITY) else -> throw IllegalArgumentException("Invalid column") } @@ -90,7 +90,7 @@ internal class BPResourceTableReader(private val reader: LocalParquetReader<Gene val record = checkNotNull(record) { "Reader in invalid state" } return when (column) { - RESOURCE_MEM_CAPACITY -> (record["requiredMemory"] as Number).toDouble() + RESOURCE_MEM_CAPACITY -> (record["requiredMemory"] as Number).toDouble() * 1000.0 // MB to KB else -> throw IllegalArgumentException("Invalid column") } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt index 24abb109..3a9bda69 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt @@ -31,7 +31,7 @@ import kotlin.io.path.extension import kotlin.io.path.nameWithoutExtension /** - * The resource state [Table] in the Bitbrains format. + * The resource state [Table] in the extended Bitbrains format. */ internal class SvResourceStateTable(path: Path) : Table { /** @@ -40,6 +40,7 @@ internal class SvResourceStateTable(path: Path) : Table { private val partitions = Files.walk(path, 1) .filter { !Files.isDirectory(it) && it.extension == "txt" } .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) + .toSortedMap() override val name: String = TABLE_RESOURCE_STATES @@ -126,7 +127,7 @@ internal class SvResourceStateTable(path: Path) : Table { } } - override fun toString(): String = "BitbrainsCompositeTableReader" + override fun toString(): String = "SvCompositeTableReader" } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt index 1a556f8d..a7d2d70a 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt @@ -30,13 +30,8 @@ import java.time.Instant * A [TableReader] for the Bitbrains resource state table. */ internal class SvResourceStateTableReader(private val reader: BufferedReader) : TableReader { - /** - * The current parser state. - */ - private val state = RowState() - override fun nextRow(): Boolean { - state.reset() + reset() var line: String var num = 0 @@ -75,18 +70,18 @@ internal class SvResourceStateTableReader(private val reader: BufferedReader) : val field = line.subSequence(start, end) as String when (col++) { - COL_TIMESTAMP -> state.timestamp = Instant.ofEpochSecond(field.toLong(10)) - COL_CPU_USAGE -> state.cpuUsage = field.toDouble() - COL_CPU_DEMAND -> state.cpuDemand = field.toDouble() - COL_DISK_READ -> state.diskRead = field.toDouble() - COL_DISK_WRITE -> state.diskWrite = field.toDouble() - COL_CLUSTER_ID -> state.cluster = field.trim() - COL_NCPUS -> state.cpuCores = field.toInt(10) - COL_CPU_READY_PCT -> state.cpuReadyPct = field.toDouble() - COL_POWERED_ON -> state.poweredOn = field.toInt(10) == 1 - COL_CPU_CAPACITY -> state.cpuCapacity = field.toDouble() - COL_ID -> state.id = field.trim() - COL_MEM_CAPACITY -> state.memCapacity = field.toDouble() + COL_TIMESTAMP -> timestamp = Instant.ofEpochSecond(field.toLong(10)) + COL_CPU_USAGE -> cpuUsage = field.toDouble() + COL_CPU_DEMAND -> cpuDemand = field.toDouble() + COL_DISK_READ -> diskRead = field.toDouble() + COL_DISK_WRITE -> diskWrite = field.toDouble() + COL_CLUSTER_ID -> cluster = field.trim() + COL_NCPUS -> cpuCores = field.toInt(10) + COL_CPU_READY_PCT -> cpuReadyPct = field.toDouble() + COL_POWERED_ON -> poweredOn = field.toInt(10) == 1 + COL_CPU_CAPACITY -> cpuCapacity = field.toDouble() + COL_ID -> id = field.trim() + COL_MEM_CAPACITY -> memCapacity = field.toDouble() } } @@ -113,16 +108,16 @@ internal class SvResourceStateTableReader(private val reader: BufferedReader) : override fun <T> get(column: TableColumn<T>): T { val res: Any? = when (column) { - RESOURCE_STATE_ID -> state.id - RESOURCE_STATE_CLUSTER_ID -> state.cluster - RESOURCE_STATE_TIMESTAMP -> state.timestamp - RESOURCE_STATE_NCPUS -> state.cpuCores - RESOURCE_STATE_CPU_CAPACITY -> state.cpuCapacity - RESOURCE_STATE_CPU_USAGE -> state.cpuUsage - RESOURCE_STATE_CPU_USAGE_PCT -> state.cpuUsage / state.cpuCapacity - RESOURCE_STATE_MEM_CAPACITY -> state.memCapacity - RESOURCE_STATE_DISK_READ -> state.diskRead - RESOURCE_STATE_DISK_WRITE -> state.diskWrite + RESOURCE_STATE_ID -> id + RESOURCE_STATE_CLUSTER_ID -> cluster + RESOURCE_STATE_TIMESTAMP -> timestamp + RESOURCE_STATE_NCPUS -> getInt(RESOURCE_STATE_NCPUS) + RESOURCE_STATE_CPU_CAPACITY -> getDouble(RESOURCE_STATE_CPU_CAPACITY) + RESOURCE_STATE_CPU_USAGE -> getDouble(RESOURCE_STATE_CPU_USAGE) + RESOURCE_STATE_CPU_USAGE_PCT -> getDouble(RESOURCE_STATE_CPU_USAGE_PCT) + RESOURCE_STATE_MEM_CAPACITY -> getDouble(RESOURCE_STATE_MEM_CAPACITY) + RESOURCE_STATE_DISK_READ -> getDouble(RESOURCE_STATE_DISK_READ) + RESOURCE_STATE_DISK_WRITE -> getDouble(RESOURCE_STATE_DISK_WRITE) else -> throw IllegalArgumentException("Invalid column") } @@ -132,14 +127,14 @@ internal class SvResourceStateTableReader(private val reader: BufferedReader) : override fun getBoolean(column: TableColumn<Boolean>): Boolean { return when (column) { - RESOURCE_STATE_POWERED_ON -> state.poweredOn + RESOURCE_STATE_POWERED_ON -> poweredOn else -> throw IllegalArgumentException("Invalid column") } } override fun getInt(column: TableColumn<Int>): Int { return when (column) { - RESOURCE_STATE_NCPUS -> state.cpuCores + RESOURCE_STATE_NCPUS -> cpuCores else -> throw IllegalArgumentException("Invalid column") } } @@ -150,12 +145,13 @@ internal class SvResourceStateTableReader(private val reader: BufferedReader) : override fun getDouble(column: TableColumn<Double>): Double { return when (column) { - RESOURCE_STATE_CPU_CAPACITY -> state.cpuCapacity - RESOURCE_STATE_CPU_USAGE -> state.cpuUsage - RESOURCE_STATE_CPU_USAGE_PCT -> state.cpuUsage / state.cpuCapacity - RESOURCE_STATE_MEM_CAPACITY -> state.memCapacity - RESOURCE_STATE_DISK_READ -> state.diskRead - RESOURCE_STATE_DISK_WRITE -> state.diskWrite + RESOURCE_STATE_CPU_CAPACITY -> cpuCapacity + RESOURCE_STATE_CPU_USAGE -> cpuUsage + RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsage / cpuCapacity + RESOURCE_STATE_CPU_DEMAND -> cpuDemand + RESOURCE_STATE_MEM_CAPACITY -> memCapacity + RESOURCE_STATE_DISK_READ -> diskRead + RESOURCE_STATE_DISK_WRITE -> diskWrite else -> throw IllegalArgumentException("Invalid column") } } @@ -165,51 +161,37 @@ internal class SvResourceStateTableReader(private val reader: BufferedReader) : } /** - * The current row state. + * State fields of the reader. */ - private class RowState { - @JvmField - var id: String? = null - @JvmField - var cluster: String? = null - @JvmField - var timestamp: Instant? = null - @JvmField - var cpuCores = -1 - @JvmField - var cpuCapacity = Double.NaN - @JvmField - var cpuUsage = Double.NaN - @JvmField - var cpuDemand = Double.NaN - @JvmField - var cpuReadyPct = Double.NaN - @JvmField - var memCapacity = Double.NaN - @JvmField - var diskRead = Double.NaN - @JvmField - var diskWrite = Double.NaN - @JvmField - var poweredOn: Boolean = false - - /** - * Reset the state. - */ - fun reset() { - id = null - timestamp = null - cluster = null - cpuCores = -1 - cpuCapacity = Double.NaN - cpuUsage = Double.NaN - cpuDemand = Double.NaN - cpuReadyPct = Double.NaN - memCapacity = Double.NaN - diskRead = Double.NaN - diskWrite = Double.NaN - poweredOn = false - } + private var id: String? = null + private var cluster: String? = null + private var timestamp: Instant? = null + private var cpuCores = -1 + private var cpuCapacity = Double.NaN + private var cpuUsage = Double.NaN + private var cpuDemand = Double.NaN + private var cpuReadyPct = Double.NaN + private var memCapacity = Double.NaN + private var diskRead = Double.NaN + private var diskWrite = Double.NaN + private var poweredOn: Boolean = false + + /** + * Reset the state of the reader. + */ + private fun reset() { + id = null + timestamp = null + cluster = null + cpuCores = -1 + cpuCapacity = Double.NaN + cpuUsage = Double.NaN + cpuDemand = Double.NaN + cpuReadyPct = Double.NaN + memCapacity = Double.NaN + diskRead = Double.NaN + diskWrite = Double.NaN + poweredOn = false } /** diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt index 44dec95b..e2e5ea6d 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt @@ -41,7 +41,7 @@ public val RESOURCE_START_TIME: TableColumn<Instant> = TableColumn("resource:sta * End time for the resource. */ @JvmField -public val RESOURCE_END_TIME: TableColumn<Instant> = TableColumn("resource:end_time", Instant::class.java) +public val RESOURCE_STOP_TIME: TableColumn<Instant> = TableColumn("resource:stop_time", Instant::class.java) /** * Number of CPUs for the resource. @@ -50,7 +50,7 @@ public val RESOURCE_END_TIME: TableColumn<Instant> = TableColumn("resource:end_t public val RESOURCE_NCPUS: TableColumn<Int> = intColumn("resource:num_cpus") /** - * Memory capacity for the resource. + * Memory capacity for the resource in KB. */ @JvmField public val RESOURCE_MEM_CAPACITY: TableColumn<Double> = doubleColumn("resource:mem_capacity") diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt index 64b7d465..fb9099bf 100644 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt @@ -32,18 +32,13 @@ import java.util.regex.Pattern * A [TableReader] implementation for the GWF format. */ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { - /** - * The current parser state. - */ - private val state = RowState() - init { parser.schema = schema } override fun nextRow(): Boolean { // Reset the row state - state.reset() + reset() if (!nextStart()) { return false @@ -57,12 +52,12 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { } when (parser.currentName) { - "WorkflowID" -> state.workflowId = parser.longValue - "JobID" -> state.jobId = parser.longValue - "SubmitTime" -> state.submitTime = parser.longValue - "RunTime" -> state.runtime = parser.longValue - "NProcs" -> state.nProcs = parser.intValue - "ReqNProcs" -> state.reqNProcs = parser.intValue + "WorkflowID" -> workflowId = parser.longValue + "JobID" -> jobId = parser.longValue + "SubmitTime" -> submitTime = parser.longValue + "RunTime" -> runtime = parser.longValue + "NProcs" -> nProcs = parser.intValue + "ReqNProcs" -> reqNProcs = parser.intValue "Dependencies" -> parseParents(parser.valueAsString) } } @@ -85,13 +80,13 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { override fun <T> get(column: TableColumn<T>): T { val res: Any = when (column) { - TASK_WORKFLOW_ID -> state.workflowId - TASK_ID -> state.jobId - TASK_SUBMIT_TIME -> state.submitTime - TASK_RUNTIME -> state.runtime - TASK_REQ_NCPUS -> state.nProcs - TASK_ALLOC_NCPUS -> state.reqNProcs - TASK_PARENTS -> state.dependencies + TASK_WORKFLOW_ID -> workflowId + TASK_ID -> jobId + TASK_SUBMIT_TIME -> submitTime + TASK_RUNTIME -> runtime + TASK_REQ_NCPUS -> nProcs + TASK_ALLOC_NCPUS -> reqNProcs + TASK_PARENTS -> dependencies else -> throw IllegalArgumentException("Invalid column") } @@ -105,18 +100,18 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { override fun getInt(column: TableColumn<Int>): Int { return when (column) { - TASK_REQ_NCPUS -> state.nProcs - TASK_ALLOC_NCPUS -> state.reqNProcs + TASK_REQ_NCPUS -> nProcs + TASK_ALLOC_NCPUS -> reqNProcs else -> throw IllegalArgumentException("Invalid column") } } override fun getLong(column: TableColumn<Long>): Long { return when (column) { - TASK_WORKFLOW_ID -> state.workflowId - TASK_ID -> state.jobId - TASK_SUBMIT_TIME -> state.submitTime - TASK_RUNTIME -> state.runtime + TASK_WORKFLOW_ID -> workflowId + TASK_ID -> jobId + TASK_SUBMIT_TIME -> submitTime + TASK_RUNTIME -> runtime else -> throw IllegalArgumentException("Invalid column") } } @@ -166,29 +161,27 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { } /** - * The current row state. + * Reader state fields. */ - private class RowState { - var workflowId = -1L - var jobId = -1L - var submitTime = -1L - var runtime = -1L - var nProcs = -1 - var reqNProcs = -1 - var dependencies = emptySet<Long>() + private var workflowId = -1L + private var jobId = -1L + private var submitTime = -1L + private var runtime = -1L + private var nProcs = -1 + private var reqNProcs = -1 + private var dependencies = emptySet<Long>() - /** - * Reset the state. - */ - fun reset() { - workflowId = -1 - jobId = -1 - submitTime = -1 - runtime = -1 - nProcs = -1 - reqNProcs = -1 - dependencies = emptySet() - } + /** + * Reset the state. + */ + private fun reset() { + workflowId = -1 + jobId = -1 + submitTime = -1 + runtime = -1 + nProcs = -1 + reqNProcs = -1 + dependencies = emptySet() } companion object { diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt index 7eff0f5a..a755a107 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt @@ -43,5 +43,5 @@ public class WtfTrace internal constructor(private val path: Path) : Trace { return WtfTaskTable(path) } - override fun toString(): String = "SwfTrace[$path]" + override fun toString(): String = "WtfTrace[$path]" } |
