From 55a4c8208cc44ac626f7b8c61a19d5ec725ec936 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 20 Sep 2021 11:48:18 +0200 Subject: refactor(trace): Unify columns of different tables This change unifies columns of different tables used by trace formats. This concretely means that instead of having columns specific per table (e.g., RESOURCE_ID and RESOURCE_STATE_ID), with this changes these columns are shared between the tables with a single definition (RESOURCE_ID). --- .../compute/workload/ComputeWorkloadLoader.kt | 4 +- .../kotlin/org/opendc/trace/ResourceColumns.kt | 22 +++++++-- .../org/opendc/trace/ResourceStateColumns.kt | 54 +++++----------------- .../main/kotlin/org/opendc/trace/TableColumns.kt | 33 ++----------- .../main/kotlin/org/opendc/trace/TaskColumns.kt | 28 ++++++----- .../opendc/trace/azure/AzureResourceStateTable.kt | 2 +- .../trace/azure/AzureResourceStateTableReader.kt | 4 +- .../org/opendc/trace/azure/AzureTraceFormatTest.kt | 2 +- .../bitbrains/BitbrainsExResourceStateTable.kt | 10 ++-- .../BitbrainsExResourceStateTableReader.kt | 26 +++++------ .../trace/bitbrains/BitbrainsResourceStateTable.kt | 8 ++-- .../bitbrains/BitbrainsResourceStateTableReader.kt | 22 ++++----- .../bitbrains/BitbrainsResourceTableReader.kt | 2 +- .../opendc/trace/opendc/OdcVmResourceStateTable.kt | 4 +- .../trace/opendc/OdcVmResourceStateTableReader.kt | 10 ++-- .../opendc/trace/opendc/OdcVmTraceFormatTest.kt | 2 +- .../org/opendc/trace/tools/TraceConverter.kt | 16 +++---- 17 files changed, 102 insertions(+), 147 deletions(-) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt index c92b212f..8a2585ce 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt @@ -62,10 +62,10 @@ public class ComputeWorkloadLoader(private val baseDir: File) { return try { while (reader.nextRow()) { - val id = reader.get(RESOURCE_STATE_ID) + val id = reader.get(RESOURCE_ID) val time = reader.get(RESOURCE_STATE_TIMESTAMP) val duration = reader.get(RESOURCE_STATE_DURATION) - val cores = reader.getInt(RESOURCE_STATE_CPU_COUNT) + val cores = reader.getInt(RESOURCE_CPU_COUNT) val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE) val fragment = SimTraceWorkload.Fragment( diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt index 219002e0..f1977945 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt @@ -29,28 +29,40 @@ import java.time.Instant * Identifier of the resource. */ @JvmField -public val RESOURCE_ID: TableColumn = stringColumn("resource:id") +public val RESOURCE_ID: TableColumn = column("resource:id") + +/** + * The cluster to which the resource belongs. + */ +@JvmField +public val RESOURCE_CLUSTER_ID: TableColumn = column("resource:cluster_id") /** * Start time for the resource. */ @JvmField -public val RESOURCE_START_TIME: TableColumn = TableColumn("resource:start_time", Instant::class.java) +public val RESOURCE_START_TIME: TableColumn = column("resource:start_time") /** * End time for the resource. */ @JvmField -public val RESOURCE_STOP_TIME: TableColumn = TableColumn("resource:stop_time", Instant::class.java) +public val RESOURCE_STOP_TIME: TableColumn = column("resource:stop_time") /** * Number of CPUs for the resource. */ @JvmField -public val RESOURCE_CPU_COUNT: TableColumn = intColumn("resource:cpu_count") +public val RESOURCE_CPU_COUNT: TableColumn = column("resource:cpu_count") + +/** + * Total CPU capacity of the resource in MHz. + */ +@JvmField +public val RESOURCE_CPU_CAPACITY: TableColumn = column("resource:cpu_capacity") /** * Memory capacity for the resource in KB. */ @JvmField -public val RESOURCE_MEM_CAPACITY: TableColumn = doubleColumn("resource:mem_capacity") +public val RESOURCE_MEM_CAPACITY: TableColumn = column("resource:mem_capacity") diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt index b683923b..44762da5 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt @@ -26,104 +26,74 @@ package org.opendc.trace import java.time.Duration import java.time.Instant -/** - * Identifier of the resource. - */ -@JvmField -public val RESOURCE_STATE_ID: TableColumn = stringColumn("resource_state:id") - -/** - * The cluster to which the resource belongs. - */ -@JvmField -public val RESOURCE_STATE_CLUSTER_ID: TableColumn = stringColumn("resource_state:cluster_id") - /** * Timestamp for the state. */ @JvmField -public val RESOURCE_STATE_TIMESTAMP: TableColumn = TableColumn("resource_state:timestamp", Instant::class.java) +public val RESOURCE_STATE_TIMESTAMP: TableColumn = column("resource_state:timestamp") /** * Duration for the state. */ @JvmField -public val RESOURCE_STATE_DURATION: TableColumn = TableColumn("resource_state:duration", Duration::class.java) +public val RESOURCE_STATE_DURATION: TableColumn = column("resource_state:duration") /** * A flag to indicate that the resource is powered on. */ @JvmField -public val RESOURCE_STATE_POWERED_ON: TableColumn = booleanColumn("resource_state:powered_on") - -/** - * Number of CPUs for the resource. - */ -@JvmField -public val RESOURCE_STATE_CPU_COUNT: TableColumn = intColumn("resource_state:cpu_count") - -/** - * Total CPU capacity of the resource in MHz. - */ -@JvmField -public val RESOURCE_STATE_CPU_CAPACITY: TableColumn = doubleColumn("resource_state:cpu_capacity") +public val RESOURCE_STATE_POWERED_ON: TableColumn = column("resource_state:powered_on") /** * Total CPU usage of the resource in MHz. */ @JvmField -public val RESOURCE_STATE_CPU_USAGE: TableColumn = doubleColumn("resource_state:cpu_usage") +public val RESOURCE_STATE_CPU_USAGE: TableColumn = column("resource_state:cpu_usage") /** * Total CPU usage of the resource in percentage. */ @JvmField -public val RESOURCE_STATE_CPU_USAGE_PCT: TableColumn = doubleColumn("resource_state:cpu_usage_pct") +public val RESOURCE_STATE_CPU_USAGE_PCT: TableColumn = column("resource_state:cpu_usage_pct") /** * Total CPU demand of the resource in MHz. */ @JvmField -public val RESOURCE_STATE_CPU_DEMAND: TableColumn = doubleColumn("resource_state:cpu_demand") +public val RESOURCE_STATE_CPU_DEMAND: TableColumn = column("resource_state:cpu_demand") /** * CPU ready percentage. */ @JvmField -public val RESOURCE_STATE_CPU_READY_PCT: TableColumn = doubleColumn("resource_state:cpu_ready_pct") - -/** - * Memory capacity of the resource in KB. - */ -@JvmField -public val RESOURCE_STATE_MEM_CAPACITY: TableColumn = doubleColumn("resource_state:mem_capacity") +public val RESOURCE_STATE_CPU_READY_PCT: TableColumn = column("resource_state:cpu_ready_pct") /** * Memory usage of the resource in KB. */ @JvmField -public val RESOURCE_STATE_MEM_USAGE: TableColumn = doubleColumn("resource_state:mem_usage") +public val RESOURCE_STATE_MEM_USAGE: TableColumn = column("resource_state:mem_usage") /** * Disk read throughput of the resource in KB/s. */ @JvmField -public val RESOURCE_STATE_DISK_READ: TableColumn = doubleColumn("resource_state:disk_read") +public val RESOURCE_STATE_DISK_READ: TableColumn = column("resource_state:disk_read") /** * Disk write throughput of the resource in KB/s. */ @JvmField -public val RESOURCE_STATE_DISK_WRITE: TableColumn = doubleColumn("resource_state:disk_write") +public val RESOURCE_STATE_DISK_WRITE: TableColumn = column("resource_state:disk_write") /** * Network receive throughput of the resource in KB/s. */ @JvmField -public val RESOURCE_STATE_NET_RX: TableColumn = doubleColumn("resource_state:net_rx") +public val RESOURCE_STATE_NET_RX: TableColumn = column("resource_state:net_rx") /** * Network transmit throughput of the resource in KB/s. */ @JvmField -public val RESOURCE_STATE_NET_TX: TableColumn = doubleColumn("resource_state:net_tx") +public val RESOURCE_STATE_NET_TX: TableColumn = column("resource_state:net_tx") diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumns.kt index 64920498..31a58360 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumns.kt @@ -24,36 +24,11 @@ package org.opendc.trace /** - * Construct a [TableColumn] with [Any] type. + * Construct a [TableColumn] with the specified [name] and type [T]. */ -public fun objectColumn(name: String): TableColumn = TableColumn(name, Any::class.java) +public inline fun column(name: String): TableColumn = column(name, T::class.java) /** - * Construct a [TableColumn] with a [String] type. + * Construct a [TableColumn] with the specified [name] and [type]. */ -public fun stringColumn(name: String): TableColumn = TableColumn(name, String::class.java) - -/** - * Construct a [TableColumn] with a [Number] type. - */ -public fun numberColumn(name: String): TableColumn = TableColumn(name, Number::class.java) - -/** - * Construct a [TableColumn] with an [Int] type. - */ -public fun intColumn(name: String): TableColumn = TableColumn(name, Int::class.java) - -/** - * Construct a [TableColumn] with a [Long] type. - */ -public fun longColumn(name: String): TableColumn = TableColumn(name, Long::class.java) - -/** - * Construct a [TableColumn] with a [Double] type. - */ -public fun doubleColumn(name: String): TableColumn = TableColumn(name, Double::class.java) - -/** - * Construct a [TableColumn] with a [Boolean] type. - */ -public fun booleanColumn(name: String): TableColumn = TableColumn(name, Boolean::class.java) +public fun column(name: String, type: Class): TableColumn = TableColumn(name, type) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt index 46920dce..d103bce4 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt @@ -30,72 +30,70 @@ import java.time.Instant * A column containing the task identifier. */ @JvmField -public val TASK_ID: TableColumn = stringColumn("task:id") +public val TASK_ID: TableColumn = column("task:id") /** * A column containing the identifier of the workflow. */ @JvmField -public val TASK_WORKFLOW_ID: TableColumn = stringColumn("task:workflow_id") +public val TASK_WORKFLOW_ID: TableColumn = column("task:workflow_id") /** - * A column containing the submit time of the task. + * A column containing the submission time of the task. */ @JvmField -public val TASK_SUBMIT_TIME: TableColumn = TableColumn("task:submit_time", type = Instant::class.java) +public val TASK_SUBMIT_TIME: TableColumn = column("task:submit_time") /** * A column containing the wait time of the task. */ @JvmField -public val TASK_WAIT_TIME: TableColumn = TableColumn("task:wait_time", type = Instant::class.java) +public val TASK_WAIT_TIME: TableColumn = column("task:wait_time") /** * A column containing the runtime time of the task. */ @JvmField -public val TASK_RUNTIME: TableColumn = TableColumn("task:runtime", type = Duration::class.java) +public val TASK_RUNTIME: TableColumn = column("task:runtime") /** * A column containing the parents of a task. */ -@Suppress("UNCHECKED_CAST") @JvmField -public val TASK_PARENTS: TableColumn> = TableColumn("task:parents", type = Set::class.java as Class>) +public val TASK_PARENTS: TableColumn> = column("task:parents") /** * A column containing the children of a task. */ -@Suppress("UNCHECKED_CAST") @JvmField -public val TASK_CHILDREN: TableColumn> = TableColumn("task:children", type = Set::class.java as Class>) +public val TASK_CHILDREN: TableColumn> = column("task:children") /** * A column containing the requested CPUs of a task. */ @JvmField -public val TASK_REQ_NCPUS: TableColumn = intColumn("task:req_ncpus") +public val TASK_REQ_NCPUS: TableColumn = column("task:req_ncpus") /** * A column containing the allocated CPUs of a task. */ @JvmField -public val TASK_ALLOC_NCPUS: TableColumn = intColumn("task:alloc_ncpus") +public val TASK_ALLOC_NCPUS: TableColumn = column("task:alloc_ncpus") /** * A column containing the status of a task. */ @JvmField -public val TASK_STATUS: TableColumn = intColumn("task:status") +public val TASK_STATUS: TableColumn = column("task:status") /** * A column containing the group id of a task. */ @JvmField -public val TASK_GROUP_ID: TableColumn = intColumn("task:group_id") +public val TASK_GROUP_ID: TableColumn = column("task:group_id") /** * A column containing the user id of a task. */ @JvmField -public val TASK_USER_ID: TableColumn = intColumn("task:user_id") +public val TASK_USER_ID: TableColumn = column("task:user_id") diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt index 84c9b347..e6b89465 100644 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt +++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt @@ -47,7 +47,7 @@ internal class AzureResourceStateTable(private val factory: CsvFactory, path: Pa override val isSynthetic: Boolean = false override val columns: List> = listOf( - RESOURCE_STATE_ID, + RESOURCE_ID, RESOURCE_STATE_TIMESTAMP, RESOURCE_STATE_CPU_USAGE_PCT ) diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt index c17a17ab..6c1cb770 100644 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt @@ -62,7 +62,7 @@ internal class AzureResourceStateTableReader(private val parser: CsvParser) : Ta override fun hasColumn(column: TableColumn<*>): Boolean { return when (column) { - RESOURCE_STATE_ID -> true + RESOURCE_ID -> true RESOURCE_STATE_TIMESTAMP -> true RESOURCE_STATE_CPU_USAGE_PCT -> true else -> false @@ -71,7 +71,7 @@ internal class AzureResourceStateTableReader(private val parser: CsvParser) : Ta override fun get(column: TableColumn): T { val res: Any? = when (column) { - RESOURCE_STATE_ID -> id + RESOURCE_ID -> id RESOURCE_STATE_TIMESTAMP -> timestamp RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct else -> throw IllegalArgumentException("Invalid column") diff --git a/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt b/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt index e5735f0d..2c1a2125 100644 --- a/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt @@ -103,7 +103,7 @@ class AzureTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("+ZcrOp5/c/fJ6mVgP5qMZlOAGDwyjaaDNM0WoWOt2IDb47gT0UwK9lFwkPQv3C7Q", reader.get(RESOURCE_STATE_ID)) }, + { assertEquals("+ZcrOp5/c/fJ6mVgP5qMZlOAGDwyjaaDNM0WoWOt2IDb47gT0UwK9lFwkPQv3C7Q", reader.get(RESOURCE_ID)) }, { assertEquals(0, reader.get(RESOURCE_STATE_TIMESTAMP).epochSecond) }, { assertEquals(2.86979, reader.getDouble(RESOURCE_STATE_CPU_USAGE_PCT), 0.01) } ) diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt index 4a60dff3..44a6c26e 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt @@ -47,16 +47,16 @@ internal class BitbrainsExResourceStateTable(path: Path) : Table { override val isSynthetic: Boolean = false override val columns: List> = listOf( - RESOURCE_STATE_ID, - RESOURCE_STATE_CLUSTER_ID, + RESOURCE_ID, + RESOURCE_CLUSTER_ID, RESOURCE_STATE_TIMESTAMP, - RESOURCE_STATE_CPU_COUNT, - RESOURCE_STATE_CPU_CAPACITY, + RESOURCE_CPU_COUNT, + RESOURCE_CPU_CAPACITY, RESOURCE_STATE_CPU_USAGE, RESOURCE_STATE_CPU_USAGE_PCT, RESOURCE_STATE_CPU_DEMAND, RESOURCE_STATE_CPU_READY_PCT, - RESOURCE_STATE_MEM_CAPACITY, + RESOURCE_MEM_CAPACITY, RESOURCE_STATE_DISK_READ, RESOURCE_STATE_DISK_WRITE, ) diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt index f1cf7307..5619e839 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt @@ -90,16 +90,16 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR override fun hasColumn(column: TableColumn<*>): Boolean { return when (column) { - RESOURCE_STATE_ID -> true - RESOURCE_STATE_CLUSTER_ID -> true + RESOURCE_ID -> true + RESOURCE_CLUSTER_ID -> true RESOURCE_STATE_TIMESTAMP -> true - RESOURCE_STATE_CPU_COUNT -> true - RESOURCE_STATE_CPU_CAPACITY -> true + RESOURCE_CPU_COUNT -> true + RESOURCE_CPU_CAPACITY -> true RESOURCE_STATE_CPU_USAGE -> true RESOURCE_STATE_CPU_USAGE_PCT -> true RESOURCE_STATE_CPU_DEMAND -> true RESOURCE_STATE_CPU_READY_PCT -> true - RESOURCE_STATE_MEM_CAPACITY -> true + RESOURCE_MEM_CAPACITY -> true RESOURCE_STATE_DISK_READ -> true RESOURCE_STATE_DISK_WRITE -> true else -> false @@ -108,14 +108,14 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR override fun get(column: TableColumn): T { val res: Any? = when (column) { - RESOURCE_STATE_ID -> id - RESOURCE_STATE_CLUSTER_ID -> cluster + RESOURCE_ID -> id + RESOURCE_CLUSTER_ID -> cluster RESOURCE_STATE_TIMESTAMP -> timestamp - RESOURCE_STATE_CPU_COUNT -> getInt(RESOURCE_STATE_CPU_COUNT) - RESOURCE_STATE_CPU_CAPACITY -> getDouble(RESOURCE_STATE_CPU_CAPACITY) + RESOURCE_CPU_COUNT -> getInt(RESOURCE_CPU_COUNT) + RESOURCE_CPU_CAPACITY -> getDouble(RESOURCE_CPU_CAPACITY) RESOURCE_STATE_CPU_USAGE -> getDouble(RESOURCE_STATE_CPU_USAGE) RESOURCE_STATE_CPU_USAGE_PCT -> getDouble(RESOURCE_STATE_CPU_USAGE_PCT) - RESOURCE_STATE_MEM_CAPACITY -> getDouble(RESOURCE_STATE_MEM_CAPACITY) + RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_MEM_CAPACITY) RESOURCE_STATE_DISK_READ -> getDouble(RESOURCE_STATE_DISK_READ) RESOURCE_STATE_DISK_WRITE -> getDouble(RESOURCE_STATE_DISK_WRITE) else -> throw IllegalArgumentException("Invalid column") @@ -134,7 +134,7 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR override fun getInt(column: TableColumn): Int { return when (column) { - RESOURCE_STATE_CPU_COUNT -> cpuCores + RESOURCE_CPU_COUNT -> cpuCores else -> throw IllegalArgumentException("Invalid column") } } @@ -145,11 +145,11 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR override fun getDouble(column: TableColumn): Double { return when (column) { - RESOURCE_STATE_CPU_CAPACITY -> cpuCapacity + RESOURCE_CPU_CAPACITY -> cpuCapacity RESOURCE_STATE_CPU_USAGE -> cpuUsage RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsage / cpuCapacity RESOURCE_STATE_CPU_DEMAND -> cpuDemand - RESOURCE_STATE_MEM_CAPACITY -> memCapacity + RESOURCE_MEM_CAPACITY -> memCapacity RESOURCE_STATE_DISK_READ -> diskRead RESOURCE_STATE_DISK_WRITE -> diskWrite else -> throw IllegalArgumentException("Invalid column") diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt index 7241b18b..f68e61dc 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt @@ -48,13 +48,13 @@ internal class BitbrainsResourceStateTable(private val factory: CsvFactory, path override val isSynthetic: Boolean = false override val columns: List> = listOf( - RESOURCE_STATE_ID, + RESOURCE_ID, RESOURCE_STATE_TIMESTAMP, - RESOURCE_STATE_CPU_COUNT, - RESOURCE_STATE_CPU_CAPACITY, + RESOURCE_CPU_COUNT, + RESOURCE_CPU_CAPACITY, RESOURCE_STATE_CPU_USAGE, RESOURCE_STATE_CPU_USAGE_PCT, - RESOURCE_STATE_MEM_CAPACITY, + RESOURCE_MEM_CAPACITY, RESOURCE_STATE_MEM_USAGE, RESOURCE_STATE_DISK_READ, RESOURCE_STATE_DISK_WRITE, diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt index 56e66f5c..54be5dea 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt @@ -113,13 +113,13 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, override fun hasColumn(column: TableColumn<*>): Boolean { return when (column) { - RESOURCE_STATE_ID -> true + RESOURCE_ID -> true RESOURCE_STATE_TIMESTAMP -> true - RESOURCE_STATE_CPU_COUNT -> true - RESOURCE_STATE_CPU_CAPACITY -> true + RESOURCE_CPU_COUNT -> true + RESOURCE_CPU_CAPACITY -> true RESOURCE_STATE_CPU_USAGE -> true RESOURCE_STATE_CPU_USAGE_PCT -> true - RESOURCE_STATE_MEM_CAPACITY -> true + RESOURCE_MEM_CAPACITY -> true RESOURCE_STATE_MEM_USAGE -> true RESOURCE_STATE_DISK_READ -> true RESOURCE_STATE_DISK_WRITE -> true @@ -131,13 +131,13 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, override fun get(column: TableColumn): T { val res: Any? = when (column) { - RESOURCE_STATE_ID -> partition + RESOURCE_ID -> partition RESOURCE_STATE_TIMESTAMP -> timestamp - RESOURCE_STATE_CPU_COUNT -> cpuCores - RESOURCE_STATE_CPU_CAPACITY -> cpuCapacity + RESOURCE_CPU_COUNT -> cpuCores + RESOURCE_CPU_CAPACITY -> cpuCapacity RESOURCE_STATE_CPU_USAGE -> cpuUsage RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct - RESOURCE_STATE_MEM_CAPACITY -> memCapacity + RESOURCE_MEM_CAPACITY -> memCapacity RESOURCE_STATE_MEM_USAGE -> memUsage RESOURCE_STATE_DISK_READ -> diskRead RESOURCE_STATE_DISK_WRITE -> diskWrite @@ -156,7 +156,7 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, override fun getInt(column: TableColumn): Int { return when (column) { - RESOURCE_STATE_CPU_COUNT -> cpuCores + RESOURCE_CPU_COUNT -> cpuCores else -> throw IllegalArgumentException("Invalid column") } } @@ -167,10 +167,10 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, override fun getDouble(column: TableColumn): Double { return when (column) { - RESOURCE_STATE_CPU_CAPACITY -> cpuCapacity + RESOURCE_CPU_CAPACITY -> cpuCapacity RESOURCE_STATE_CPU_USAGE -> cpuUsage RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct - RESOURCE_STATE_MEM_CAPACITY -> memCapacity + RESOURCE_MEM_CAPACITY -> memCapacity RESOURCE_STATE_MEM_USAGE -> memUsage RESOURCE_STATE_DISK_READ -> diskRead RESOURCE_STATE_DISK_WRITE -> diskWrite diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt index c02dc5ae..146c04f0 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt @@ -49,7 +49,7 @@ internal class BitbrainsResourceTableReader(private val factory: CsvFactory, vms continue } - id = reader.get(RESOURCE_STATE_ID) + id = reader.get(RESOURCE_ID) return true } finally { reader.close() diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt index bee4ba7e..39613070 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt @@ -35,10 +35,10 @@ internal class OdcVmResourceStateTable(private val path: Path) : Table { override val isSynthetic: Boolean = false override val columns: List> = listOf( - RESOURCE_STATE_ID, + RESOURCE_ID, RESOURCE_STATE_TIMESTAMP, RESOURCE_STATE_DURATION, - RESOURCE_STATE_CPU_COUNT, + RESOURCE_CPU_COUNT, RESOURCE_STATE_CPU_USAGE, ) diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt index df3bcfa6..e4b18735 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt @@ -57,10 +57,10 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea override fun hasColumn(column: TableColumn<*>): Boolean { return when (column) { - RESOURCE_STATE_ID -> true + RESOURCE_ID -> true RESOURCE_STATE_TIMESTAMP -> true RESOURCE_STATE_DURATION -> true - RESOURCE_STATE_CPU_COUNT -> true + RESOURCE_CPU_COUNT -> true RESOURCE_STATE_CPU_USAGE -> true else -> false } @@ -71,10 +71,10 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea @Suppress("UNCHECKED_CAST") val res: Any = when (column) { - RESOURCE_STATE_ID -> record[COL_ID].toString() + RESOURCE_ID -> record[COL_ID].toString() RESOURCE_STATE_TIMESTAMP -> Instant.ofEpochMilli(record[COL_TIMESTAMP] as Long) RESOURCE_STATE_DURATION -> Duration.ofMillis(record[COL_DURATION] as Long) - RESOURCE_STATE_CPU_COUNT -> getInt(RESOURCE_STATE_CPU_COUNT) + RESOURCE_CPU_COUNT -> getInt(RESOURCE_CPU_COUNT) RESOURCE_STATE_CPU_USAGE -> getDouble(RESOURCE_STATE_CPU_USAGE) else -> throw IllegalArgumentException("Invalid column") } @@ -90,7 +90,7 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea override fun getInt(column: TableColumn): Int { val record = checkNotNull(record) { "Reader in invalid state" } return when (column) { - RESOURCE_STATE_CPU_COUNT -> record[COL_CPU_COUNT] as Int + RESOURCE_CPU_COUNT -> record[COL_CPU_COUNT] as Int else -> throw IllegalArgumentException("Invalid column") } } diff --git a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt index 42eb369e..9fb6028d 100644 --- a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt @@ -111,7 +111,7 @@ internal class OdcVmTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("1019", reader.get(RESOURCE_STATE_ID)) }, + { assertEquals("1019", reader.get(RESOURCE_ID)) }, { assertEquals(1376314846, reader.get(RESOURCE_STATE_TIMESTAMP).epochSecond) }, { assertEquals(0.0, reader.getDouble(RESOURCE_STATE_CPU_USAGE), 0.01) } ) diff --git a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt index 322464cd..0b089904 100644 --- a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt +++ b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt @@ -154,21 +154,21 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { var stopTime = Long.MIN_VALUE do { - id = reader.get(RESOURCE_STATE_ID) + id = reader.get(RESOURCE_ID) val timestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli() startTime = min(startTime, timestamp) stopTime = max(stopTime, timestamp) - numCpus = max(numCpus, reader.getInt(RESOURCE_STATE_CPU_COUNT)) + numCpus = max(numCpus, reader.getInt(RESOURCE_CPU_COUNT)) - memCapacity = max(memCapacity, reader.getDouble(RESOURCE_STATE_MEM_CAPACITY)) + memCapacity = max(memCapacity, reader.getDouble(RESOURCE_MEM_CAPACITY)) if (reader.hasColumn(RESOURCE_STATE_MEM_USAGE)) { memUsage = max(memUsage, reader.getDouble(RESOURCE_STATE_MEM_USAGE)) } hasNextRow = reader.nextRow() - } while (hasNextRow && id == reader.get(RESOURCE_STATE_ID)) + } while (hasNextRow && id == reader.get(RESOURCE_ID)) // Sample only a fraction of the VMs if (random != null && random.nextDouble() > samplingFraction) { @@ -204,14 +204,14 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { var lastTimestamp = 0L while (hasNextRow) { - val id = reader.get(RESOURCE_STATE_ID) + val id = reader.get(RESOURCE_ID) if (id !in selectedVms) { hasNextRow = reader.nextRow() continue } - val cpuCount = reader.getInt(RESOURCE_STATE_CPU_COUNT) + val cpuCount = reader.getInt(RESOURCE_CPU_COUNT) val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE) val startTimestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli() @@ -233,9 +233,9 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { break } - val shouldContinue = id == reader.get(RESOURCE_STATE_ID) && + val shouldContinue = id == reader.get(RESOURCE_ID) && abs(cpuUsage - reader.getDouble(RESOURCE_STATE_CPU_USAGE)) < 0.01 && - cpuCount == reader.getInt(RESOURCE_STATE_CPU_COUNT) + cpuCount == reader.getInt(RESOURCE_CPU_COUNT) } while (shouldContinue) val builder = GenericRecordBuilder(OdcVmTraceFormat.RESOURCE_STATES_SCHEMA) -- cgit v1.2.3 From 768bfa0d2ae763e359d74612385ce43c41afb432 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 20 Sep 2021 15:12:10 +0200 Subject: feat(trace): Support column lookup via index This change adds support for looking up the column value through the column index. This enables faster lookup when processing very large traces. --- .../main/kotlin/org/opendc/trace/TableReader.kt | 107 ++++++++++++++++- .../org/opendc/trace/util/CompositeTableReader.kt | 110 +++++++++++++++++ .../opendc/trace/azure/AzureResourceStateTable.kt | 54 +-------- .../trace/azure/AzureResourceStateTableReader.kt | 48 ++++---- .../opendc/trace/azure/AzureResourceTableReader.kt | 62 +++++----- .../bitbrains/BitbrainsExResourceStateTable.kt | 54 +-------- .../BitbrainsExResourceStateTableReader.kt | 94 +++++++-------- .../trace/bitbrains/BitbrainsResourceStateTable.kt | 54 +-------- .../bitbrains/BitbrainsResourceStateTableReader.kt | 104 ++++++++-------- .../bitbrains/BitbrainsResourceTableReader.kt | 33 +++--- .../org/opendc/trace/gwf/GwfTaskTableReader.kt | 69 ++++++----- .../trace/opendc/OdcVmResourceStateTableReader.kt | 82 +++++++------ .../trace/opendc/OdcVmResourceTableReader.kt | 82 +++++++------ .../org/opendc/trace/swf/SwfTaskTableReader.kt | 72 +++++------ .../trace/wfformat/WfFormatTaskTableReader.kt | 62 +++++----- opendc-trace/opendc-trace-wtf/build.gradle.kts | 2 + .../org/opendc/trace/wtf/WtfTaskTableReader.kt | 132 +++++++++++++++------ 17 files changed, 688 insertions(+), 533 deletions(-) create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/CompositeTableReader.kt diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt index b5e7669f..8a796e6c 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt @@ -33,35 +33,130 @@ public interface TableReader : AutoCloseable { */ public fun nextRow(): Boolean + /** + * Resolve the index of the specified [column] for this reader. + * + * @param column The column to lookup. + * @return The zero-based index of the column or a negative value if the column is not present in this table. + */ + public fun resolve(column: TableColumn<*>): Int + /** * Determine whether the [TableReader] supports the specified [column]. */ - public fun hasColumn(column: TableColumn<*>): Boolean + public fun hasColumn(column: TableColumn<*>): Boolean = resolve(column) >= 0 + + /** + * Determine whether the specified [column] has a `null` value for the current row. + * + * @param index The zero-based index of the column to check for a null value. + * @throws IllegalArgumentException if the column index is not valid for this reader. + * @return `true` if the column value for the current value has a `null` value, `false` otherwise. + */ + public fun isNull(index: Int): Boolean + + /** + * Obtain the object value of the column with the specified [index]. + * + * @param index The zero-based index of the column to obtain the value for. + * @throws IllegalArgumentException if the column index is not valid for this reader. + * @return The object value of the column. + */ + public fun get(index: Int): Any? + + /** + * Obtain the boolean value of the column with the specified [index]. + * + * @param index The zero-based index of the column to obtain the value for. + * @throws IllegalArgumentException if the column index is not valid for this reader. + * @return The boolean value of the column or `false` if the column is `null`. + */ + public fun getBoolean(index: Int): Boolean + + /** + * Obtain the integer value of the column with the specified [index]. + * + * @param index The zero-based index of the column to obtain the value for. + * @throws IllegalArgumentException if the column index is not valid for this reader. + * @return The integer value of the column or `0` if the column is `null`. + */ + public fun getInt(index: Int): Int + + /** + * Obtain the double value of the column with the specified [index]. + * + * @param index The zero-based index of the column to obtain the value for. + * @throws IllegalArgumentException if the column index is not valid for this reader. + * @return The long value of the column or `0` if the column is `null`. + */ + public fun getLong(index: Int): Long + + /** + * Obtain the double value of the column with the specified [index]. + * + * @param index The zero-based index of the column to obtain the value for. + * @throws IllegalArgumentException if the column index is not valid for this reader. + * @return The double value of the column or [Double.NaN] if the column is `null`. + */ + public fun getDouble(index: Int): Double + + /** + * Determine whether the specified [column] has a `null` value for the current row. + * + * @param column The column to lookup. + * @throws IllegalArgumentException if the column is not valid for this table. + * @return `true` if the column value for the current value has a `null` value, `false` otherwise. + */ + public fun isNull(column: TableColumn<*>): Boolean = isNull(resolve(column)) /** * Obtain the value of the current column with type [T]. + * + * @param column The column to obtain the value for. + * @throws IllegalArgumentException if the column is not valid for this reader. + * @return The object value of the column. */ - public fun get(column: TableColumn): T + public fun get(column: TableColumn): T { + // This cast should always succeed since the resolve the index of the typed column + @Suppress("UNCHECKED_CAST") + return get(resolve(column)) as T + } /** * Read the specified [column] as boolean. + * + * @param column The column to obtain the value for. + * @throws IllegalArgumentException if the column is not valid for this reader. + * @return The boolean value of the column or `false` if the column is `null`. */ - public fun getBoolean(column: TableColumn): Boolean + public fun getBoolean(column: TableColumn): Boolean = getBoolean(resolve(column)) /** * Read the specified [column] as integer. + * + * @param column The column to obtain the value for. + * @throws IllegalArgumentException if the column is not valid for this reader. + * @return The integer value of the column or `0` if the column is `null`. */ - public fun getInt(column: TableColumn): Int + public fun getInt(column: TableColumn): Int = getInt(resolve(column)) /** * Read the specified [column] as long. + * + * @param column The column to obtain the value for. + * @throws IllegalArgumentException if the column is not valid for this reader. + * @return The long value of the column or `0` if the column is `null`. */ - public fun getLong(column: TableColumn): Long + public fun getLong(column: TableColumn): Long = getLong(resolve(column)) /** * Read the specified [column] as double. + * + * @param column The column to obtain the value for. + * @throws IllegalArgumentException if the column is not valid for this reader. + * @return The double value of the column or [Double.NaN] if the column is `null`. */ - public fun getDouble(column: TableColumn): Double + public fun getDouble(column: TableColumn): Double = getDouble(resolve(column)) /** * Closes the reader so that no further iteration or data access can be made. diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/CompositeTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/CompositeTableReader.kt new file mode 100644 index 00000000..dafc0798 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/CompositeTableReader.kt @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.util + +import org.opendc.trace.TableColumn +import org.opendc.trace.TableReader + +/** + * A helper class to chain multiple [TableReader]s. + */ +public abstract class CompositeTableReader : TableReader { + /** + * A flag to indicate that the reader has starting, meaning the user called [nextRow] at least once + * (and in turn [nextReader]). + */ + private var hasStarted = false + + /** + * The active [TableReader] instance. + */ + private var delegate: TableReader? = null + + /** + * Obtain the next [TableReader] instance to read from or `null` if there are no more readers to read from. + */ + protected abstract fun nextReader(): TableReader? + + override fun nextRow(): Boolean { + if (!hasStarted) { + assert(delegate == null) { "Duplicate initialization" } + delegate = nextReader() + hasStarted = true + } + + var delegate = delegate + + while (delegate != null) { + if (delegate.nextRow()) { + break + } + + delegate.close() + delegate = nextReader() + this.delegate = delegate + } + + return delegate != null + } + + override fun resolve(column: TableColumn<*>): Int { + val delegate = delegate + return delegate?.resolve(column) ?: -1 + } + + override fun isNull(index: Int): Boolean { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.isNull(index) + } + + override fun get(index: Int): Any? { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.get(index) + } + + override fun getBoolean(index: Int): Boolean { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getBoolean(index) + } + + override fun getInt(index: Int): Int { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getInt(index) + } + + override fun getLong(index: Int): Long { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getLong(index) + } + + override fun getDouble(index: Int): Double { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getDouble(index) + } + + override fun close() { + delegate?.close() + } + + override fun toString(): String = "CompositeTableReader" +} diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt index e6b89465..8f2f5cc9 100644 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt +++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt @@ -24,6 +24,7 @@ package org.opendc.trace.azure import com.fasterxml.jackson.dataformat.csv.CsvFactory import org.opendc.trace.* +import org.opendc.trace.util.CompositeTableReader import java.nio.file.Files import java.nio.file.Path import java.util.stream.Collectors @@ -55,57 +56,8 @@ internal class AzureResourceStateTable(private val factory: CsvFactory, path: Pa override fun newReader(): TableReader { val it = partitions.iterator() - return object : TableReader { - var delegate: TableReader? = nextDelegate() - - override fun nextRow(): Boolean { - var delegate = delegate - - while (delegate != null) { - if (delegate.nextRow()) { - break - } - - delegate.close() - delegate = nextDelegate() - this.delegate = delegate - } - - return delegate != null - } - - override fun hasColumn(column: TableColumn<*>): Boolean = delegate?.hasColumn(column) ?: false - - override fun get(column: TableColumn): T { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.get(column) - } - - override fun getBoolean(column: TableColumn): Boolean { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.getBoolean(column) - } - - override fun getInt(column: TableColumn): Int { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.getInt(column) - } - - override fun getLong(column: TableColumn): Long { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.getLong(column) - } - - override fun getDouble(column: TableColumn): Double { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.getDouble(column) - } - - override fun close() { - delegate?.close() - } - - private fun nextDelegate(): TableReader? { + return object : CompositeTableReader() { + override fun nextReader(): TableReader? { return if (it.hasNext()) { val (_, path) = it.next() return AzureResourceStateTableReader(factory.createParser(path.toFile())) diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt index 6c1cb770..da8181fe 100644 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt @@ -60,42 +60,37 @@ internal class AzureResourceStateTableReader(private val parser: CsvParser) : Ta return true } - override fun hasColumn(column: TableColumn<*>): Boolean { - return when (column) { - RESOURCE_ID -> true - RESOURCE_STATE_TIMESTAMP -> true - RESOURCE_STATE_CPU_USAGE_PCT -> true - else -> false - } + override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + + override fun isNull(index: Int): Boolean { + require(index in 0..columns.size) { "Invalid column index" } + return false } - override fun get(column: TableColumn): T { - val res: Any? = when (column) { - RESOURCE_ID -> id - RESOURCE_STATE_TIMESTAMP -> timestamp - RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct - else -> throw IllegalArgumentException("Invalid column") + override fun get(index: Int): Any? { + return when (index) { + COL_ID -> id + COL_TIMESTAMP -> timestamp + COL_CPU_USAGE_PCT -> cpuUsagePct + else -> throw IllegalArgumentException("Invalid column index") } - - @Suppress("UNCHECKED_CAST") - return res as T } - override fun getBoolean(column: TableColumn): Boolean { + override fun getBoolean(index: Int): Boolean { throw IllegalArgumentException("Invalid column") } - override fun getInt(column: TableColumn): Int { + override fun getInt(index: Int): Int { throw IllegalArgumentException("Invalid column") } - override fun getLong(column: TableColumn): Long { + override fun getLong(index: Int): Long { throw IllegalArgumentException("Invalid column") } - override fun getDouble(column: TableColumn): Double { - return when (column) { - RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct + override fun getDouble(index: Int): Double { + return when (index) { + COL_CPU_USAGE_PCT -> cpuUsagePct else -> throw IllegalArgumentException("Invalid column") } } @@ -133,6 +128,15 @@ internal class AzureResourceStateTableReader(private val parser: CsvParser) : Ta cpuUsagePct = Double.NaN } + private val COL_ID = 0 + private val COL_TIMESTAMP = 1 + private val COL_CPU_USAGE_PCT = 2 + private val columns = mapOf( + RESOURCE_ID to COL_ID, + RESOURCE_STATE_TIMESTAMP to COL_TIMESTAMP, + RESOURCE_STATE_CPU_USAGE_PCT to COL_CPU_USAGE_PCT + ) + companion object { /** * The [CsvSchema] that is used to parse the trace. diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt index 5ea97483..a6352613 100644 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt +++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt @@ -62,49 +62,42 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe return true } - override fun hasColumn(column: TableColumn<*>): Boolean { - return when (column) { - RESOURCE_ID -> true - RESOURCE_START_TIME -> true - RESOURCE_STOP_TIME -> true - RESOURCE_CPU_COUNT -> true - RESOURCE_MEM_CAPACITY -> true - else -> false - } + override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + + override fun isNull(index: Int): Boolean { + require(index in 0..columns.size) { "Invalid column index" } + return false } - override fun get(column: TableColumn): T { - val res: Any? = when (column) { - RESOURCE_ID -> id - RESOURCE_START_TIME -> startTime - RESOURCE_STOP_TIME -> stopTime - RESOURCE_CPU_COUNT -> getInt(RESOURCE_CPU_COUNT) - RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_MEM_CAPACITY) + override fun get(index: Int): Any? { + return when (index) { + COL_ID -> id + COL_START_TIME -> startTime + COL_STOP_TIME -> stopTime + COL_CPU_COUNT -> getInt(index) + COL_MEM_CAPACITY -> getDouble(index) else -> throw IllegalArgumentException("Invalid column") } - - @Suppress("UNCHECKED_CAST") - return res as T } - override fun getBoolean(column: TableColumn): Boolean { + override fun getBoolean(index: Int): Boolean { throw IllegalArgumentException("Invalid column") } - override fun getInt(column: TableColumn): Int { - return when (column) { - RESOURCE_CPU_COUNT -> cpuCores + override fun getInt(index: Int): Int { + return when (index) { + COL_CPU_COUNT -> cpuCores else -> throw IllegalArgumentException("Invalid column") } } - override fun getLong(column: TableColumn): Long { + override fun getLong(index: Int): Long { throw IllegalArgumentException("Invalid column") } - override fun getDouble(column: TableColumn): Double { - return when (column) { - RESOURCE_MEM_CAPACITY -> memCapacity + override fun getDouble(index: Int): Double { + return when (index) { + COL_MEM_CAPACITY -> memCapacity else -> throw IllegalArgumentException("Invalid column") } } @@ -138,7 +131,7 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe /** * Reset the state. */ - fun reset() { + private fun reset() { id = null startTime = null stopTime = null @@ -146,6 +139,19 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe memCapacity = Double.NaN } + private val COL_ID = 0 + private val COL_START_TIME = 1 + private val COL_STOP_TIME = 2 + private val COL_CPU_COUNT = 3 + private val COL_MEM_CAPACITY = 4 + private val columns = mapOf( + RESOURCE_ID to COL_ID, + RESOURCE_START_TIME to COL_START_TIME, + RESOURCE_STOP_TIME to COL_STOP_TIME, + RESOURCE_CPU_COUNT to COL_CPU_COUNT, + RESOURCE_MEM_CAPACITY to COL_MEM_CAPACITY + ) + companion object { /** * The [CsvSchema] that is used to parse the trace. diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt index 44a6c26e..ab768608 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt @@ -23,6 +23,7 @@ package org.opendc.trace.bitbrains import org.opendc.trace.* +import org.opendc.trace.util.CompositeTableReader import java.nio.file.Files import java.nio.file.Path import java.util.stream.Collectors @@ -64,57 +65,8 @@ internal class BitbrainsExResourceStateTable(path: Path) : Table { override fun newReader(): TableReader { val it = partitions.iterator() - return object : TableReader { - var delegate: TableReader? = nextDelegate() - - override fun nextRow(): Boolean { - var delegate = delegate - - while (delegate != null) { - if (delegate.nextRow()) { - break - } - - delegate.close() - delegate = nextDelegate() - this.delegate = delegate - } - - return delegate != null - } - - override fun hasColumn(column: TableColumn<*>): Boolean = delegate?.hasColumn(column) ?: false - - override fun get(column: TableColumn): T { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.get(column) - } - - override fun getBoolean(column: TableColumn): Boolean { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.getBoolean(column) - } - - override fun getInt(column: TableColumn): Int { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.getInt(column) - } - - override fun getLong(column: TableColumn): Long { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.getLong(column) - } - - override fun getDouble(column: TableColumn): Double { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.getDouble(column) - } - - override fun close() { - delegate?.close() - } - - private fun nextDelegate(): TableReader? { + return object : CompositeTableReader() { + override fun nextReader(): TableReader? { return if (it.hasNext()) { val (_, path) = it.next() val reader = path.bufferedReader() diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt index 5619e839..c1b6f5ba 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt @@ -88,70 +88,53 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR return true } - override fun hasColumn(column: TableColumn<*>): Boolean { - return when (column) { - RESOURCE_ID -> true - RESOURCE_CLUSTER_ID -> true - RESOURCE_STATE_TIMESTAMP -> true - RESOURCE_CPU_COUNT -> true - RESOURCE_CPU_CAPACITY -> true - RESOURCE_STATE_CPU_USAGE -> true - RESOURCE_STATE_CPU_USAGE_PCT -> true - RESOURCE_STATE_CPU_DEMAND -> true - RESOURCE_STATE_CPU_READY_PCT -> true - RESOURCE_MEM_CAPACITY -> true - RESOURCE_STATE_DISK_READ -> true - RESOURCE_STATE_DISK_WRITE -> true - else -> false - } + override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + + override fun isNull(index: Int): Boolean { + require(index in 0..COL_MAX) { "Invalid column index" } + return false } - override fun get(column: TableColumn): T { - val res: Any? = when (column) { - RESOURCE_ID -> id - RESOURCE_CLUSTER_ID -> cluster - RESOURCE_STATE_TIMESTAMP -> timestamp - RESOURCE_CPU_COUNT -> getInt(RESOURCE_CPU_COUNT) - RESOURCE_CPU_CAPACITY -> getDouble(RESOURCE_CPU_CAPACITY) - RESOURCE_STATE_CPU_USAGE -> getDouble(RESOURCE_STATE_CPU_USAGE) - RESOURCE_STATE_CPU_USAGE_PCT -> getDouble(RESOURCE_STATE_CPU_USAGE_PCT) - RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_MEM_CAPACITY) - RESOURCE_STATE_DISK_READ -> getDouble(RESOURCE_STATE_DISK_READ) - RESOURCE_STATE_DISK_WRITE -> getDouble(RESOURCE_STATE_DISK_WRITE) + override fun get(index: Int): Any? { + return when (index) { + COL_ID -> id + COL_CLUSTER_ID -> cluster + COL_TIMESTAMP -> timestamp + COL_NCPUS -> getInt(index) + COL_POWERED_ON -> getInt(index) + COL_CPU_CAPACITY, COL_CPU_USAGE, COL_CPU_USAGE_PCT, COL_CPU_READY_PCT, COL_CPU_DEMAND, COL_MEM_CAPACITY, COL_DISK_READ, COL_DISK_WRITE -> getDouble(index) else -> throw IllegalArgumentException("Invalid column") } - - @Suppress("UNCHECKED_CAST") - return res as T } - override fun getBoolean(column: TableColumn): Boolean { - return when (column) { - RESOURCE_STATE_POWERED_ON -> poweredOn + override fun getBoolean(index: Int): Boolean { + return when (index) { + COL_POWERED_ON -> poweredOn else -> throw IllegalArgumentException("Invalid column") } } - override fun getInt(column: TableColumn): Int { - return when (column) { - RESOURCE_CPU_COUNT -> cpuCores + override fun getInt(index: Int): Int { + return when (index) { + COL_NCPUS -> cpuCores else -> throw IllegalArgumentException("Invalid column") } } - override fun getLong(column: TableColumn): Long { + override fun getLong(index: Int): Long { throw IllegalArgumentException("Invalid column") } - override fun getDouble(column: TableColumn): Double { - return when (column) { - RESOURCE_CPU_CAPACITY -> cpuCapacity - RESOURCE_STATE_CPU_USAGE -> cpuUsage - RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsage / cpuCapacity - RESOURCE_STATE_CPU_DEMAND -> cpuDemand - RESOURCE_MEM_CAPACITY -> memCapacity - RESOURCE_STATE_DISK_READ -> diskRead - RESOURCE_STATE_DISK_WRITE -> diskWrite + override fun getDouble(index: Int): Double { + return when (index) { + COL_CPU_CAPACITY -> cpuCapacity + COL_CPU_USAGE -> cpuUsage + COL_CPU_USAGE_PCT -> cpuUsage / cpuCapacity + COL_CPU_READY_PCT -> cpuReadyPct + COL_CPU_DEMAND -> cpuDemand + COL_MEM_CAPACITY -> memCapacity + COL_DISK_READ -> diskRead + COL_DISK_WRITE -> diskWrite else -> throw IllegalArgumentException("Invalid column") } } @@ -209,4 +192,21 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR private val COL_CPU_CAPACITY = 18 private val COL_ID = 19 private val COL_MEM_CAPACITY = 20 + private val COL_CPU_USAGE_PCT = 21 + private val COL_MAX = COL_CPU_USAGE_PCT + 1 + + private val columns = mapOf( + RESOURCE_ID to COL_ID, + RESOURCE_CLUSTER_ID to COL_CLUSTER_ID, + RESOURCE_STATE_TIMESTAMP to COL_TIMESTAMP, + RESOURCE_CPU_COUNT to COL_NCPUS, + RESOURCE_CPU_CAPACITY to COL_CPU_CAPACITY, + RESOURCE_STATE_CPU_USAGE to COL_CPU_USAGE, + RESOURCE_STATE_CPU_USAGE_PCT to COL_CPU_USAGE_PCT, + RESOURCE_STATE_CPU_DEMAND to COL_CPU_DEMAND, + RESOURCE_STATE_CPU_READY_PCT to COL_CPU_READY_PCT, + RESOURCE_MEM_CAPACITY to COL_MEM_CAPACITY, + RESOURCE_STATE_DISK_READ to COL_DISK_READ, + RESOURCE_STATE_DISK_WRITE to COL_DISK_WRITE + ) } diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt index f68e61dc..6b6ac9da 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt @@ -24,6 +24,7 @@ package org.opendc.trace.bitbrains import com.fasterxml.jackson.dataformat.csv.CsvFactory import org.opendc.trace.* +import org.opendc.trace.util.CompositeTableReader import java.nio.file.Files import java.nio.file.Path import java.util.stream.Collectors @@ -65,57 +66,8 @@ internal class BitbrainsResourceStateTable(private val factory: CsvFactory, path override fun newReader(): TableReader { val it = partitions.iterator() - return object : TableReader { - var delegate: TableReader? = nextDelegate() - - override fun nextRow(): Boolean { - var delegate = delegate - - while (delegate != null) { - if (delegate.nextRow()) { - break - } - - delegate.close() - delegate = nextDelegate() - this.delegate = delegate - } - - return delegate != null - } - - override fun hasColumn(column: TableColumn<*>): Boolean = delegate?.hasColumn(column) ?: false - - override fun get(column: TableColumn): T { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.get(column) - } - - override fun getBoolean(column: TableColumn): Boolean { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.getBoolean(column) - } - - override fun getInt(column: TableColumn): Int { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.getInt(column) - } - - override fun getLong(column: TableColumn): Long { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.getLong(column) - } - - override fun getDouble(column: TableColumn): Double { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.getDouble(column) - } - - override fun close() { - delegate?.close() - } - - private fun nextDelegate(): TableReader? { + return object : CompositeTableReader() { + override fun nextReader(): TableReader? { return if (it.hasNext()) { val (partition, path) = it.next() return BitbrainsResourceStateTableReader(partition, factory.createParser(path.toFile())) diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt index 54be5dea..3a8839b4 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt @@ -111,71 +111,49 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, return true } - override fun hasColumn(column: TableColumn<*>): Boolean { - return when (column) { - RESOURCE_ID -> true - RESOURCE_STATE_TIMESTAMP -> true - RESOURCE_CPU_COUNT -> true - RESOURCE_CPU_CAPACITY -> true - RESOURCE_STATE_CPU_USAGE -> true - RESOURCE_STATE_CPU_USAGE_PCT -> true - RESOURCE_MEM_CAPACITY -> true - RESOURCE_STATE_MEM_USAGE -> true - RESOURCE_STATE_DISK_READ -> true - RESOURCE_STATE_DISK_WRITE -> true - RESOURCE_STATE_NET_RX -> true - RESOURCE_STATE_NET_TX -> true - else -> false - } + override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + + override fun isNull(index: Int): Boolean { + check(index in 0..columns.size) { "Invalid column index" } + return false } - override fun get(column: TableColumn): T { - val res: Any? = when (column) { - RESOURCE_ID -> partition - RESOURCE_STATE_TIMESTAMP -> timestamp - RESOURCE_CPU_COUNT -> cpuCores - RESOURCE_CPU_CAPACITY -> cpuCapacity - RESOURCE_STATE_CPU_USAGE -> cpuUsage - RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct - RESOURCE_MEM_CAPACITY -> memCapacity - RESOURCE_STATE_MEM_USAGE -> memUsage - RESOURCE_STATE_DISK_READ -> diskRead - RESOURCE_STATE_DISK_WRITE -> diskWrite - RESOURCE_STATE_NET_RX -> netReceived - RESOURCE_STATE_NET_TX -> netTransmitted + override fun get(index: Int): Any? { + return when (index) { + COL_ID -> partition + COL_TIMESTAMP -> timestamp + COL_CPU_COUNT -> getInt(index) + COL_CPU_CAPACITY, COL_CPU_USAGE, COL_CPU_USAGE_PCT, COL_MEM_CAPACITY, COL_MEM_USAGE, COL_DISK_READ, COL_DISK_WRITE, COL_NET_RX, COL_NET_TX -> getDouble(index) else -> throw IllegalArgumentException("Invalid column") } - - @Suppress("UNCHECKED_CAST") - return res as T } - override fun getBoolean(column: TableColumn): Boolean { + override fun getBoolean(index: Int): Boolean { throw IllegalArgumentException("Invalid column") } - override fun getInt(column: TableColumn): Int { - return when (column) { - RESOURCE_CPU_COUNT -> cpuCores + override fun getInt(index: Int): Int { + return when (index) { + COL_CPU_COUNT -> cpuCores else -> throw IllegalArgumentException("Invalid column") } } - override fun getLong(column: TableColumn): Long { + override fun getLong(index: Int): Long { throw IllegalArgumentException("Invalid column") } - override fun getDouble(column: TableColumn): Double { - return when (column) { - RESOURCE_CPU_CAPACITY -> cpuCapacity - RESOURCE_STATE_CPU_USAGE -> cpuUsage - RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct - RESOURCE_MEM_CAPACITY -> memCapacity - RESOURCE_STATE_MEM_USAGE -> memUsage - RESOURCE_STATE_DISK_READ -> diskRead - RESOURCE_STATE_DISK_WRITE -> diskWrite - RESOURCE_STATE_NET_RX -> netReceived - RESOURCE_STATE_NET_TX -> netTransmitted + override fun getDouble(index: Int): Double { + return when (index) { + COL_CPU_CAPACITY -> cpuCapacity + COL_CPU_USAGE -> cpuUsage + COL_CPU_USAGE_PCT -> cpuUsagePct + COL_MEM_CAPACITY -> memCapacity + COL_MEM_USAGE -> memUsage + COL_DISK_READ -> diskRead + COL_DISK_WRITE -> diskWrite + COL_NET_RX -> netReceived + COL_NET_TX -> netTransmitted else -> throw IllegalArgumentException("Invalid column") } } @@ -249,6 +227,34 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, netTransmitted = Double.NaN } + private val COL_TIMESTAMP = 0 + private val COL_CPU_COUNT = 1 + private val COL_CPU_CAPACITY = 2 + private val COL_CPU_USAGE = 3 + private val COL_CPU_USAGE_PCT = 4 + private val COL_MEM_CAPACITY = 5 + private val COL_MEM_USAGE = 6 + private val COL_DISK_READ = 7 + private val COL_DISK_WRITE = 8 + private val COL_NET_RX = 9 + private val COL_NET_TX = 10 + private val COL_ID = 11 + + private val columns = mapOf( + RESOURCE_ID to COL_ID, + RESOURCE_STATE_TIMESTAMP to COL_TIMESTAMP, + RESOURCE_CPU_COUNT to COL_CPU_COUNT, + RESOURCE_CPU_CAPACITY to COL_CPU_CAPACITY, + RESOURCE_STATE_CPU_USAGE to COL_CPU_USAGE, + RESOURCE_STATE_CPU_USAGE_PCT to COL_CPU_USAGE_PCT, + RESOURCE_MEM_CAPACITY to COL_MEM_CAPACITY, + RESOURCE_STATE_MEM_USAGE to COL_MEM_USAGE, + RESOURCE_STATE_DISK_READ to COL_DISK_READ, + RESOURCE_STATE_DISK_WRITE to COL_DISK_WRITE, + RESOURCE_STATE_NET_RX to COL_NET_RX, + RESOURCE_STATE_NET_TX to COL_NET_TX + ) + /** * The type of the timestamp in the trace. */ diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt index 146c04f0..3701994a 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt @@ -43,13 +43,14 @@ internal class BitbrainsResourceTableReader(private val factory: CsvFactory, vms val parser = factory.createParser(path.toFile()) val reader = BitbrainsResourceStateTableReader(name, parser) + val idCol = reader.resolve(RESOURCE_ID) try { if (!reader.nextRow()) { continue } - id = reader.get(RESOURCE_ID) + id = reader.get(idCol) as String return true } finally { reader.close() @@ -59,36 +60,33 @@ internal class BitbrainsResourceTableReader(private val factory: CsvFactory, vms return false } - override fun hasColumn(column: TableColumn<*>): Boolean { - return when (column) { - RESOURCE_ID -> true - else -> false - } + override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + + override fun isNull(index: Int): Boolean { + check(index in 0..columns.size) { "Invalid column index" } + return false } - override fun get(column: TableColumn): T { - val res: Any? = when (column) { - RESOURCE_ID -> id + override fun get(index: Int): Any? { + return when (index) { + COL_ID -> id else -> throw IllegalArgumentException("Invalid column") } - - @Suppress("UNCHECKED_CAST") - return res as T } - override fun getBoolean(column: TableColumn): Boolean { + override fun getBoolean(index: Int): Boolean { throw IllegalArgumentException("Invalid column") } - override fun getInt(column: TableColumn): Int { + override fun getInt(index: Int): Int { throw IllegalArgumentException("Invalid column") } - override fun getLong(column: TableColumn): Long { + override fun getLong(index: Int): Long { throw IllegalArgumentException("Invalid column") } - override fun getDouble(column: TableColumn): Double { + override fun getDouble(index: Int): Double { throw IllegalArgumentException("Invalid column") } @@ -105,4 +103,7 @@ internal class BitbrainsResourceTableReader(private val factory: CsvFactory, vms private fun reset() { id = null } + + private val COL_ID = 0 + private val columns = mapOf(RESOURCE_ID to COL_ID) } diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt index 39eb5520..aa4c543b 100644 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt @@ -67,52 +67,43 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { return true } - override fun hasColumn(column: TableColumn<*>): Boolean { - return when (column) { - TASK_WORKFLOW_ID -> true - TASK_ID -> true - TASK_SUBMIT_TIME -> true - TASK_RUNTIME -> true - TASK_REQ_NCPUS -> true - TASK_ALLOC_NCPUS -> true - TASK_PARENTS -> true - else -> false - } + override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + + override fun isNull(index: Int): Boolean { + check(index in 0..columns.size) { "Invalid column" } + return false } - override fun get(column: TableColumn): T { - val res: Any? = when (column) { - TASK_WORKFLOW_ID -> workflowId - TASK_ID -> jobId - TASK_SUBMIT_TIME -> submitTime - TASK_RUNTIME -> runtime - TASK_REQ_NCPUS -> nProcs - TASK_ALLOC_NCPUS -> reqNProcs - TASK_PARENTS -> dependencies + override fun get(index: Int): Any? { + return when (index) { + COL_JOB_ID -> jobId + COL_WORKFLOW_ID -> workflowId + COL_SUBMIT_TIME -> submitTime + COL_RUNTIME -> runtime + COL_REQ_NPROC -> getInt(index) + COL_NPROC -> getInt(index) + COL_DEPS -> dependencies else -> throw IllegalArgumentException("Invalid column") } - - @Suppress("UNCHECKED_CAST") - return res as T } - override fun getBoolean(column: TableColumn): Boolean { + override fun getBoolean(index: Int): Boolean { throw IllegalArgumentException("Invalid column") } - override fun getInt(column: TableColumn): Int { - return when (column) { - TASK_REQ_NCPUS -> nProcs - TASK_ALLOC_NCPUS -> reqNProcs + override fun getInt(index: Int): Int { + return when (index) { + COL_REQ_NPROC -> reqNProcs + COL_NPROC -> nProcs else -> throw IllegalArgumentException("Invalid column") } } - override fun getLong(column: TableColumn): Long { + override fun getLong(index: Int): Long { throw IllegalArgumentException("Invalid column") } - override fun getDouble(column: TableColumn): Double { + override fun getDouble(index: Int): Double { throw IllegalArgumentException("Invalid column") } @@ -180,6 +171,24 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { dependencies = emptySet() } + private val COL_WORKFLOW_ID = 0 + private val COL_JOB_ID = 1 + private val COL_SUBMIT_TIME = 2 + private val COL_RUNTIME = 3 + private val COL_NPROC = 4 + private val COL_REQ_NPROC = 5 + private val COL_DEPS = 6 + + private val columns = mapOf( + TASK_ID to COL_JOB_ID, + TASK_WORKFLOW_ID to COL_WORKFLOW_ID, + TASK_SUBMIT_TIME to COL_SUBMIT_TIME, + TASK_RUNTIME to COL_RUNTIME, + TASK_ALLOC_NCPUS to COL_NPROC, + TASK_REQ_NCPUS to COL_REQ_NPROC, + TASK_PARENTS to COL_DEPS + ) + companion object { /** * The [CsvSchema] that is used to parse the trace. diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt index e4b18735..b5043f82 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt @@ -55,54 +55,46 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea return record != null } - override fun hasColumn(column: TableColumn<*>): Boolean { - return when (column) { - RESOURCE_ID -> true - RESOURCE_STATE_TIMESTAMP -> true - RESOURCE_STATE_DURATION -> true - RESOURCE_CPU_COUNT -> true - RESOURCE_STATE_CPU_USAGE -> true - else -> false - } + override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + + override fun isNull(index: Int): Boolean { + check(index in 0..columns.size) { "Invalid column index" } + return get(index) == null } - override fun get(column: TableColumn): T { + override fun get(index: Int): Any? { val record = checkNotNull(record) { "Reader in invalid state" } - @Suppress("UNCHECKED_CAST") - val res: Any = when (column) { - RESOURCE_ID -> record[COL_ID].toString() - RESOURCE_STATE_TIMESTAMP -> Instant.ofEpochMilli(record[COL_TIMESTAMP] as Long) - RESOURCE_STATE_DURATION -> Duration.ofMillis(record[COL_DURATION] as Long) - RESOURCE_CPU_COUNT -> getInt(RESOURCE_CPU_COUNT) - RESOURCE_STATE_CPU_USAGE -> getDouble(RESOURCE_STATE_CPU_USAGE) + return when (index) { + COL_ID -> record[AVRO_COL_ID].toString() + COL_TIMESTAMP -> Instant.ofEpochMilli(record[AVRO_COL_TIMESTAMP] as Long) + COL_DURATION -> Duration.ofMillis(record[AVRO_COL_DURATION] as Long) + COL_CPU_COUNT -> getInt(index) + COL_CPU_USAGE -> getDouble(index) else -> throw IllegalArgumentException("Invalid column") } - - @Suppress("UNCHECKED_CAST") - return res as T } - override fun getBoolean(column: TableColumn): Boolean { + override fun getBoolean(index: Int): Boolean { throw IllegalArgumentException("Invalid column") } - override fun getInt(column: TableColumn): Int { + override fun getInt(index: Int): Int { val record = checkNotNull(record) { "Reader in invalid state" } - return when (column) { - RESOURCE_CPU_COUNT -> record[COL_CPU_COUNT] as Int + return when (index) { + COL_CPU_COUNT -> record[AVRO_COL_CPU_COUNT] as Int else -> throw IllegalArgumentException("Invalid column") } } - override fun getLong(column: TableColumn): Long { + override fun getLong(index: Int): Long { throw IllegalArgumentException("Invalid column") } - override fun getDouble(column: TableColumn): Double { + override fun getDouble(index: Int): Double { val record = checkNotNull(record) { "Reader in invalid state" } - return when (column) { - RESOURCE_STATE_CPU_USAGE -> (record[COL_CPU_USAGE] as Number).toDouble() + return when (index) { + COL_CPU_USAGE -> (record[AVRO_COL_CPU_USAGE] as Number).toDouble() else -> throw IllegalArgumentException("Invalid column") } } @@ -118,20 +110,34 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea */ private fun initColumns(schema: Schema) { try { - COL_ID = schema.getField("id").pos() - COL_TIMESTAMP = (schema.getField("timestamp") ?: schema.getField("time")).pos() - COL_DURATION = schema.getField("duration").pos() - COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("cores")).pos() - COL_CPU_USAGE = (schema.getField("cpu_usage") ?: schema.getField("cpuUsage")).pos() + AVRO_COL_ID = schema.getField("id").pos() + AVRO_COL_TIMESTAMP = (schema.getField("timestamp") ?: schema.getField("time")).pos() + AVRO_COL_DURATION = schema.getField("duration").pos() + AVRO_COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("cores")).pos() + AVRO_COL_CPU_USAGE = (schema.getField("cpu_usage") ?: schema.getField("cpuUsage")).pos() } catch (e: NullPointerException) { // This happens when the field we are trying to access does not exist throw IllegalArgumentException("Invalid schema", e) } } - private var COL_ID = -1 - private var COL_TIMESTAMP = -1 - private var COL_DURATION = -1 - private var COL_CPU_COUNT = -1 - private var COL_CPU_USAGE = -1 + private var AVRO_COL_ID = -1 + private var AVRO_COL_TIMESTAMP = -1 + private var AVRO_COL_DURATION = -1 + private var AVRO_COL_CPU_COUNT = -1 + private var AVRO_COL_CPU_USAGE = -1 + + private val COL_ID = 0 + private val COL_TIMESTAMP = 1 + private val COL_DURATION = 2 + private val COL_CPU_COUNT = 3 + private val COL_CPU_USAGE = 4 + + private val columns = mapOf( + RESOURCE_ID to COL_ID, + RESOURCE_STATE_TIMESTAMP to COL_TIMESTAMP, + RESOURCE_STATE_DURATION to COL_DURATION, + RESOURCE_CPU_COUNT to COL_CPU_COUNT, + RESOURCE_STATE_CPU_USAGE to COL_CPU_USAGE, + ) } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt index c52da62d..d93929aa 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt @@ -54,56 +54,48 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader): Boolean { - return when (column) { - RESOURCE_ID -> true - RESOURCE_START_TIME -> true - RESOURCE_STOP_TIME -> true - RESOURCE_CPU_COUNT -> true - RESOURCE_MEM_CAPACITY -> true - else -> false - } + override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + + override fun isNull(index: Int): Boolean { + check(index in 0..columns.size) { "Invalid column index" } + return get(index) == null } - override fun get(column: TableColumn): T { + override fun get(index: Int): Any? { val record = checkNotNull(record) { "Reader in invalid state" } - @Suppress("UNCHECKED_CAST") - val res: Any = when (column) { - RESOURCE_ID -> record[COL_ID].toString() - RESOURCE_START_TIME -> Instant.ofEpochMilli(record[COL_START_TIME] as Long) - RESOURCE_STOP_TIME -> Instant.ofEpochMilli(record[COL_STOP_TIME] as Long) - RESOURCE_CPU_COUNT -> getInt(RESOURCE_CPU_COUNT) - RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_MEM_CAPACITY) + return when (index) { + COL_ID -> record[AVRO_COL_ID].toString() + COL_START_TIME -> Instant.ofEpochMilli(record[AVRO_COL_START_TIME] as Long) + COL_STOP_TIME -> Instant.ofEpochMilli(record[AVRO_COL_STOP_TIME] as Long) + COL_CPU_COUNT -> getInt(index) + COL_MEM_CAPACITY -> getDouble(index) else -> throw IllegalArgumentException("Invalid column") } - - @Suppress("UNCHECKED_CAST") - return res as T } - override fun getBoolean(column: TableColumn): Boolean { + override fun getBoolean(index: Int): Boolean { throw IllegalArgumentException("Invalid column") } - override fun getInt(column: TableColumn): Int { + override fun getInt(index: Int): Int { val record = checkNotNull(record) { "Reader in invalid state" } - return when (column) { - RESOURCE_CPU_COUNT -> record[COL_CPU_COUNT] as Int + return when (index) { + COL_CPU_COUNT -> record[AVRO_COL_CPU_COUNT] as Int else -> throw IllegalArgumentException("Invalid column") } } - override fun getLong(column: TableColumn): Long { + override fun getLong(index: Int): Long { throw IllegalArgumentException("Invalid column") } - override fun getDouble(column: TableColumn): Double { + override fun getDouble(index: Int): Double { val record = checkNotNull(record) { "Reader in invalid state" } - return when (column) { - RESOURCE_MEM_CAPACITY -> (record[COL_MEM_CAPACITY] as Number).toDouble() + return when (index) { + COL_MEM_CAPACITY -> (record[AVRO_COL_MEM_CAPACITY] as Number).toDouble() else -> throw IllegalArgumentException("Invalid column") } } @@ -119,20 +111,34 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader): Boolean { - return when (column) { - TASK_ID -> true - TASK_SUBMIT_TIME -> true - TASK_WAIT_TIME -> true - TASK_RUNTIME -> true - TASK_REQ_NCPUS -> true - TASK_ALLOC_NCPUS -> true - TASK_PARENTS -> true - TASK_STATUS -> true - TASK_GROUP_ID -> true - TASK_USER_ID -> true - else -> false - } + override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + + override fun isNull(index: Int): Boolean { + require(index in columns.values) { "Invalid column index" } + return false } - override fun get(column: TableColumn): T { - val res: Any = when (column) { - TASK_ID -> fields[COL_JOB_ID] - TASK_SUBMIT_TIME -> Instant.ofEpochSecond(fields[COL_SUBMIT_TIME].toLong(10)) - TASK_WAIT_TIME -> Duration.ofSeconds(fields[COL_WAIT_TIME].toLong(10)) - TASK_RUNTIME -> Duration.ofSeconds(fields[COL_RUN_TIME].toLong(10)) - TASK_REQ_NCPUS -> getInt(TASK_REQ_NCPUS) - TASK_ALLOC_NCPUS -> getInt(TASK_ALLOC_NCPUS) - TASK_PARENTS -> { - val parent = fields[COL_PARENT_JOB].toLong(10) + override fun get(index: Int): Any? { + return when (index) { + COL_JOB_ID -> fields[index] + COL_SUBMIT_TIME -> Instant.ofEpochSecond(fields[index].toLong(10)) + COL_WAIT_TIME, COL_RUN_TIME -> Duration.ofSeconds(fields[index].toLong(10)) + COL_REQ_NCPUS, COL_ALLOC_NCPUS, COL_STATUS, COL_GROUP_ID, COL_USER_ID -> getInt(index) + COL_PARENT_JOB -> { + val parent = fields[index].toLong(10) if (parent < 0) emptySet() else setOf(parent) } - TASK_STATUS -> getInt(TASK_STATUS) - TASK_GROUP_ID -> getInt(TASK_GROUP_ID) - TASK_USER_ID -> getInt(TASK_USER_ID) else -> throw IllegalArgumentException("Invalid column") } - - @Suppress("UNCHECKED_CAST") - return res as T } - override fun getBoolean(column: TableColumn): Boolean { + override fun getBoolean(index: Int): Boolean { throw IllegalArgumentException("Invalid column") } - override fun getInt(column: TableColumn): Int { - return when (column) { - TASK_REQ_NCPUS -> fields[COL_REQ_NCPUS].toInt(10) - TASK_ALLOC_NCPUS -> fields[COL_ALLOC_NCPUS].toInt(10) - TASK_STATUS -> fields[COL_STATUS].toInt(10) - TASK_GROUP_ID -> fields[COL_GROUP_ID].toInt(10) - TASK_USER_ID -> fields[COL_USER_ID].toInt(10) + override fun getInt(index: Int): Int { + return when (index) { + COL_REQ_NCPUS, COL_ALLOC_NCPUS, COL_STATUS, COL_GROUP_ID, COL_USER_ID -> fields[index].toInt(10) else -> throw IllegalArgumentException("Invalid column") } } - override fun getLong(column: TableColumn): Long { + override fun getLong(index: Int): Long { throw IllegalArgumentException("Invalid column") } - override fun getDouble(column: TableColumn): Double { + override fun getDouble(index: Int): Double { throw IllegalArgumentException("Invalid column") } @@ -155,4 +134,17 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea private val COL_PART_NUM = 15 private val COL_PARENT_JOB = 16 private val COL_PARENT_THINK_TIME = 17 + + private val columns = mapOf( + TASK_ID to COL_JOB_ID, + TASK_SUBMIT_TIME to COL_SUBMIT_TIME, + TASK_WAIT_TIME to COL_WAIT_TIME, + TASK_RUNTIME to COL_RUN_TIME, + TASK_ALLOC_NCPUS to COL_ALLOC_NCPUS, + TASK_REQ_NCPUS to COL_REQ_NCPUS, + TASK_STATUS to COL_STATUS, + TASK_USER_ID to COL_USER_ID, + TASK_GROUP_ID to COL_GROUP_ID, + TASK_PARENTS to COL_PARENT_JOB + ) } diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt index 4408ba5c..7f378d80 100644 --- a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt +++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt @@ -94,49 +94,41 @@ internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableRe return hasJob } - override fun hasColumn(column: TableColumn<*>): Boolean { - return when (column) { - TASK_ID -> true - TASK_WORKFLOW_ID -> true - TASK_RUNTIME -> true - TASK_REQ_NCPUS -> true - TASK_PARENTS -> true - TASK_CHILDREN -> true - else -> false - } + override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + + override fun isNull(index: Int): Boolean { + check(index in 0..columns.size) { "Invalid column value" } + return false } - override fun get(column: TableColumn): T { - val res: Any? = when (column) { - TASK_ID -> id - TASK_WORKFLOW_ID -> workflowId - TASK_RUNTIME -> runtime - TASK_PARENTS -> parents - TASK_CHILDREN -> children - TASK_REQ_NCPUS -> getInt(TASK_REQ_NCPUS) + override fun get(index: Int): Any? { + return when (index) { + COL_ID -> id + COL_WORKFLOW_ID -> workflowId + COL_RUNTIME -> runtime + COL_PARENTS -> parents + COL_CHILDREN -> children + COL_NPROC -> getInt(index) else -> throw IllegalArgumentException("Invalid column") } - - @Suppress("UNCHECKED_CAST") - return res as T } - override fun getBoolean(column: TableColumn): Boolean { + override fun getBoolean(index: Int): Boolean { throw IllegalArgumentException("Invalid column") } - override fun getInt(column: TableColumn): Int { - return when (column) { - TASK_REQ_NCPUS -> cores + override fun getInt(index: Int): Int { + return when (index) { + COL_NPROC -> cores else -> throw IllegalArgumentException("Invalid column") } } - override fun getLong(column: TableColumn): Long { + override fun getLong(index: Int): Long { throw IllegalArgumentException("Invalid column") } - override fun getDouble(column: TableColumn): Double { + override fun getDouble(index: Int): Double { throw IllegalArgumentException("Invalid column") } @@ -231,4 +223,20 @@ internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableRe children = null cores = -1 } + + private val COL_ID = 0 + private val COL_WORKFLOW_ID = 1 + private val COL_RUNTIME = 3 + private val COL_NPROC = 4 + private val COL_PARENTS = 5 + private val COL_CHILDREN = 6 + + private val columns = mapOf( + TASK_ID to COL_ID, + TASK_WORKFLOW_ID to COL_WORKFLOW_ID, + TASK_RUNTIME to COL_RUNTIME, + TASK_REQ_NCPUS to COL_NPROC, + TASK_PARENTS to COL_PARENTS, + TASK_CHILDREN to COL_CHILDREN, + ) } diff --git a/opendc-trace/opendc-trace-wtf/build.gradle.kts b/opendc-trace/opendc-trace-wtf/build.gradle.kts index 5051c7b0..e4f0ab3a 100644 --- a/opendc-trace/opendc-trace-wtf/build.gradle.kts +++ b/opendc-trace/opendc-trace-wtf/build.gradle.kts @@ -34,4 +34,6 @@ dependencies { api(projects.opendcTrace.opendcTraceApi) implementation(projects.opendcTrace.opendcTraceParquet) + + testRuntimeOnly(libs.slf4j.simple) } diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt index 5e2463f8..45ec25dd 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt @@ -22,6 +22,7 @@ package org.opendc.trace.wtf +import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord import org.opendc.trace.* import org.opendc.trace.util.parquet.LocalParquetReader @@ -37,73 +38,126 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader): Boolean { - return when (column) { - TASK_ID -> true - TASK_WORKFLOW_ID -> true - TASK_SUBMIT_TIME -> true - TASK_WAIT_TIME -> true - TASK_RUNTIME -> true - TASK_REQ_NCPUS -> true - TASK_PARENTS -> true - TASK_CHILDREN -> true - TASK_GROUP_ID -> true - TASK_USER_ID -> true - else -> false - } + override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + + override fun isNull(index: Int): Boolean { + check(index in 0..columns.size) { "Invalid column index" } + return get(index) == null } - override fun get(column: TableColumn): T { + override fun get(index: Int): Any? { val record = checkNotNull(record) { "Reader in invalid state" } - @Suppress("UNCHECKED_CAST") - val res: Any = when (column) { - TASK_ID -> (record["id"] as Long).toString() - TASK_WORKFLOW_ID -> (record["workflow_id"] as Long).toString() - TASK_SUBMIT_TIME -> Instant.ofEpochMilli(record["ts_submit"] as Long) - TASK_WAIT_TIME -> Duration.ofMillis(record["wait_time"] as Long) - TASK_RUNTIME -> Duration.ofMillis(record["runtime"] as Long) - TASK_REQ_NCPUS -> (record["resource_amount_requested"] as Double).toInt() - TASK_PARENTS -> (record["parents"] as ArrayList).map { it["item"].toString() }.toSet() - TASK_CHILDREN -> (record["children"] as ArrayList).map { it["item"].toString() }.toSet() - TASK_GROUP_ID -> record["group_id"] - TASK_USER_ID -> record["user_id"] + return when (index) { + COL_ID -> (record[AVRO_COL_ID] as Long).toString() + COL_WORKFLOW_ID -> (record[AVRO_COL_WORKFLOW_ID] as Long).toString() + COL_SUBMIT_TIME -> Instant.ofEpochMilli(record[AVRO_COL_SUBMIT_TIME] as Long) + COL_WAIT_TIME -> Duration.ofMillis(record[AVRO_COL_WAIT_TIME] as Long) + COL_RUNTIME -> Duration.ofMillis(record[AVRO_COL_RUNTIME] as Long) + COL_REQ_NCPUS, COL_GROUP_ID, COL_USER_ID -> getInt(index) + COL_PARENTS -> (record[AVRO_COL_PARENTS] as ArrayList).map { it["item"].toString() }.toSet() + COL_CHILDREN -> (record[AVRO_COL_CHILDREN] as ArrayList).map { it["item"].toString() }.toSet() else -> throw IllegalArgumentException("Invalid column") } - - @Suppress("UNCHECKED_CAST") - return res as T } - override fun getBoolean(column: TableColumn): Boolean { + override fun getBoolean(index: Int): Boolean { throw IllegalArgumentException("Invalid column") } - override fun getInt(column: TableColumn): Int { + override fun getInt(index: Int): Int { val record = checkNotNull(record) { "Reader in invalid state" } - return when (column) { - TASK_REQ_NCPUS -> (record["resource_amount_requested"] as Double).toInt() - TASK_GROUP_ID -> record["group_id"] as Int - TASK_USER_ID -> record["user_id"] as Int + return when (index) { + COL_REQ_NCPUS -> (record[AVRO_COL_REQ_NCPUS] as Double).toInt() + COL_GROUP_ID -> record[AVRO_COL_GROUP_ID] as Int + COL_USER_ID -> record[AVRO_COL_USER_ID] as Int else -> throw IllegalArgumentException("Invalid column") } } - override fun getLong(column: TableColumn): Long { + override fun getLong(index: Int): Long { throw IllegalArgumentException("Invalid column") } - override fun getDouble(column: TableColumn): Double { + override fun getDouble(index: Int): Double { throw IllegalArgumentException("Invalid column") } override fun close() { reader.close() } + + /** + * Initialize the columns for the reader based on [schema]. + */ + private fun initColumns(schema: Schema) { + try { + AVRO_COL_ID = schema.getField("id").pos() + AVRO_COL_WORKFLOW_ID = schema.getField("workflow_id").pos() + AVRO_COL_SUBMIT_TIME = schema.getField("ts_submit").pos() + AVRO_COL_WAIT_TIME = schema.getField("wait_time").pos() + AVRO_COL_RUNTIME = schema.getField("runtime").pos() + AVRO_COL_REQ_NCPUS = schema.getField("resource_amount_requested").pos() + AVRO_COL_PARENTS = schema.getField("parents").pos() + AVRO_COL_CHILDREN = schema.getField("children").pos() + AVRO_COL_GROUP_ID = schema.getField("group_id").pos() + AVRO_COL_USER_ID = schema.getField("user_id").pos() + } catch (e: NullPointerException) { + // This happens when the field we are trying to access does not exist + throw IllegalArgumentException("Invalid schema", e) + } + } + + private var AVRO_COL_ID = -1 + private var AVRO_COL_WORKFLOW_ID = -1 + private var AVRO_COL_SUBMIT_TIME = -1 + private var AVRO_COL_WAIT_TIME = -1 + private var AVRO_COL_RUNTIME = -1 + private var AVRO_COL_REQ_NCPUS = -1 + private var AVRO_COL_PARENTS = -1 + private var AVRO_COL_CHILDREN = -1 + private var AVRO_COL_GROUP_ID = -1 + private var AVRO_COL_USER_ID = -1 + + private val COL_ID = 0 + private val COL_WORKFLOW_ID = 1 + private val COL_SUBMIT_TIME = 2 + private val COL_WAIT_TIME = 3 + private val COL_RUNTIME = 4 + private val COL_REQ_NCPUS = 5 + private val COL_PARENTS = 6 + private val COL_CHILDREN = 7 + private val COL_GROUP_ID = 8 + private val COL_USER_ID = 9 + + private val columns = mapOf( + TASK_ID to COL_ID, + TASK_WORKFLOW_ID to COL_WORKFLOW_ID, + TASK_SUBMIT_TIME to COL_SUBMIT_TIME, + TASK_WAIT_TIME to COL_WAIT_TIME, + TASK_RUNTIME to COL_RUNTIME, + TASK_REQ_NCPUS to COL_REQ_NCPUS, + TASK_PARENTS to COL_PARENTS, + TASK_CHILDREN to COL_CHILDREN, + TASK_GROUP_ID to COL_GROUP_ID, + TASK_USER_ID to COL_USER_ID, + ) } -- cgit v1.2.3 From e765316cfc089dd50602759ea7afbcc7a3cd4c00 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 20 Sep 2021 15:17:49 +0200 Subject: perf(compute): Use index lookup in trace loader This change updates the ComputeWorkloadLoader to use index column lookups in order to prevent having to lookup the index for every row. --- .../compute/workload/ComputeWorkloadLoader.kt | 34 +++++++++++++++------- 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt index 8a2585ce..ab7f051f 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt @@ -27,6 +27,8 @@ import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.trace.* import org.opendc.trace.opendc.OdcVmTraceFormat import java.io.File +import java.time.Duration +import java.time.Instant import java.util.* import java.util.concurrent.ConcurrentHashMap import kotlin.math.roundToLong @@ -58,15 +60,21 @@ public class ComputeWorkloadLoader(private val baseDir: File) { private fun parseFragments(trace: Trace): Map> { val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() + val idCol = reader.resolve(RESOURCE_ID) + val timestampCol = reader.resolve(RESOURCE_STATE_TIMESTAMP) + val durationCol = reader.resolve(RESOURCE_STATE_DURATION) + val coresCol = reader.resolve(RESOURCE_CPU_COUNT) + val usageCol = reader.resolve(RESOURCE_STATE_CPU_USAGE) + val fragments = mutableMapOf>() return try { while (reader.nextRow()) { - val id = reader.get(RESOURCE_ID) - val time = reader.get(RESOURCE_STATE_TIMESTAMP) - val duration = reader.get(RESOURCE_STATE_DURATION) - val cores = reader.getInt(RESOURCE_CPU_COUNT) - val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE) + val id = reader.get(idCol) as String + val time = reader.get(timestampCol) as Instant + val duration = reader.get(durationCol) as Duration + val cores = reader.getInt(coresCol) + val cpuUsage = reader.getDouble(usageCol) val fragment = SimTraceWorkload.Fragment( time.toEpochMilli(), @@ -90,21 +98,27 @@ public class ComputeWorkloadLoader(private val baseDir: File) { private fun parseMeta(trace: Trace, fragments: Map>): List { val reader = checkNotNull(trace.getTable(TABLE_RESOURCES)).newReader() + val idCol = reader.resolve(RESOURCE_ID) + val startTimeCol = reader.resolve(RESOURCE_START_TIME) + val stopTimeCol = reader.resolve(RESOURCE_STOP_TIME) + val coresCol = reader.resolve(RESOURCE_CPU_COUNT) + val memCol = reader.resolve(RESOURCE_MEM_CAPACITY) + var counter = 0 val entries = mutableListOf() return try { while (reader.nextRow()) { - val id = reader.get(RESOURCE_ID) + val id = reader.get(idCol) as String if (!fragments.containsKey(id)) { continue } - val submissionTime = reader.get(RESOURCE_START_TIME) - val endTime = reader.get(RESOURCE_STOP_TIME) - val maxCores = reader.getInt(RESOURCE_CPU_COUNT) - val requiredMemory = reader.getDouble(RESOURCE_MEM_CAPACITY) / 1000.0 // Convert from KB to MB + val submissionTime = reader.get(startTimeCol) as Instant + val endTime = reader.get(stopTimeCol) as Instant + val maxCores = reader.getInt(coresCol) + val requiredMemory = reader.getDouble(memCol) / 1000.0 // Convert from KB to MB val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray()) val vmFragments = fragments.getValue(id).asSequence() -- cgit v1.2.3 From 140aafdaa711b0fdeacf99b9c7e70b706b8490f4 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 20 Sep 2021 15:40:13 +0200 Subject: feat(trace): Add property for describing partition keys --- .../opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt | 5 +++++ .../main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt | 2 ++ .../src/main/kotlin/org/opendc/trace/azure/AzureResourceTable.kt | 2 ++ .../org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt | 2 ++ .../kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt | 2 ++ .../main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTable.kt | 2 ++ .../src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt | 2 ++ .../main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt | 2 ++ .../src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt | 2 ++ .../src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt | 2 ++ .../src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt | 2 ++ .../src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt | 2 ++ 12 files changed, 27 insertions(+) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt index 6aca2051..164f5084 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt @@ -41,6 +41,11 @@ public interface Table { */ public val columns: List> + /** + * The columns by which the table is partitioned. + */ + public val partitionKeys: List> + /** * Open a [TableReader] for this table. */ diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt index 8f2f5cc9..285e7216 100644 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt +++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt @@ -53,6 +53,8 @@ internal class AzureResourceStateTable(private val factory: CsvFactory, path: Pa RESOURCE_STATE_CPU_USAGE_PCT ) + override val partitionKeys: List> = listOf(RESOURCE_STATE_TIMESTAMP) + override fun newReader(): TableReader { val it = partitions.iterator() diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTable.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTable.kt index 96ee3158..ff7af172 100644 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTable.kt +++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTable.kt @@ -42,6 +42,8 @@ internal class AzureResourceTable(private val factory: CsvFactory, private val p RESOURCE_MEM_CAPACITY ) + override val partitionKeys: List> = emptyList() + override fun newReader(): TableReader { return AzureResourceTableReader(factory.createParser(path.resolve("vmtable/vmtable.csv").toFile())) } diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt index ab768608..7cb58226 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt @@ -62,6 +62,8 @@ internal class BitbrainsExResourceStateTable(path: Path) : Table { RESOURCE_STATE_DISK_WRITE, ) + override val partitionKeys: List> = listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP) + override fun newReader(): TableReader { val it = partitions.iterator() diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt index 6b6ac9da..7b08b8be 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt @@ -63,6 +63,8 @@ internal class BitbrainsResourceStateTable(private val factory: CsvFactory, path RESOURCE_STATE_NET_TX, ) + override val partitionKeys: List> = listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP) + override fun newReader(): TableReader { val it = partitions.iterator() diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTable.kt index bc4f0b7d..d024af2d 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTable.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTable.kt @@ -49,6 +49,8 @@ internal class BitbrainsResourceTable(private val factory: CsvFactory, path: Pat override val columns: List> = listOf(RESOURCE_ID) + override val partitionKeys: List> = emptyList() + override fun newReader(): TableReader { return BitbrainsResourceTableReader(factory, vms) } diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt index fd7bd068..ca720de4 100644 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt @@ -44,6 +44,8 @@ internal class GwfTaskTable(private val factory: CsvFactory, private val url: UR TASK_PARENTS ) + override val partitionKeys: List> = listOf(TASK_WORKFLOW_ID) + override fun newReader(): TableReader { return GwfTaskTableReader(factory.createParser(url)) } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt index 39613070..caacf192 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt @@ -42,6 +42,8 @@ internal class OdcVmResourceStateTable(private val path: Path) : Table { RESOURCE_STATE_CPU_USAGE, ) + override val partitionKeys: List> = listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP) + override fun newReader(): TableReader { val reader = LocalParquetReader(path.resolve("trace.parquet")) return OdcVmResourceStateTableReader(reader) diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt index b1456560..653b28b8 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt @@ -42,6 +42,8 @@ internal class OdcVmResourceTable(private val path: Path) : Table { RESOURCE_MEM_CAPACITY, ) + override val partitionKeys: List> = emptyList() + override fun newReader(): TableReader { val reader = LocalParquetReader(path.resolve("meta.parquet")) return OdcVmResourceTableReader(reader) diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt index 7ec0d607..4898779d 100644 --- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt +++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt @@ -47,6 +47,8 @@ internal class SwfTaskTable(private val path: Path) : Table { TASK_USER_ID ) + override val partitionKeys: List> = emptyList() + override fun newReader(): TableReader { val reader = path.bufferedReader() return SwfTaskTableReader(reader) diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt index 7b7f979f..17aeee97 100644 --- a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt +++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt @@ -43,6 +43,8 @@ internal class WfFormatTaskTable(private val factory: JsonFactory, private val p TASK_CHILDREN ) + override val partitionKeys: List> = emptyList() + override fun newReader(): TableReader { val parser = factory.createParser(path.toFile()) return WfFormatTaskTableReader(parser) diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt index 74202718..410bb347 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt @@ -48,6 +48,8 @@ internal class WtfTaskTable(private val path: Path) : Table { TASK_USER_ID ) + override val partitionKeys: List> = listOf(TASK_SUBMIT_TIME) + override fun newReader(): TableReader { val reader = LocalParquetReader(path.resolve("tasks/schema-1.0")) return WtfTaskTableReader(reader) -- cgit v1.2.3 From c7fff03408ee3109d0a39a96c043584a2d8f67ca Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 20 Sep 2021 22:04:23 +0200 Subject: refactor(trace): Simplify TraceFormat SPI interface This change simplifies the TraceFormat SPI interface by reducing the number of interfaces that implementors need to implement to only TraceFormat. --- .../compute/workload/ComputeWorkloadLoader.kt | 8 +- .../src/main/kotlin/org/opendc/trace/Table.kt | 10 --- .../src/main/kotlin/org/opendc/trace/Trace.kt | 19 ++--- .../kotlin/org/opendc/trace/internal/TableImpl.kt | 52 ++++++++++++ .../kotlin/org/opendc/trace/internal/TraceImpl.kt | 56 +++++++++++++ .../kotlin/org/opendc/trace/spi/TableDetails.kt | 37 +++++++++ .../kotlin/org/opendc/trace/spi/TraceFormat.kt | 33 ++++++-- .../opendc/trace/azure/AzureResourceStateTable.kt | 81 ------------------- .../org/opendc/trace/azure/AzureResourceTable.kt | 56 ------------- .../kotlin/org/opendc/trace/azure/AzureTrace.kt | 46 ----------- .../org/opendc/trace/azure/AzureTraceFormat.kt | 69 ++++++++++++++-- .../org/opendc/trace/azure/AzureTraceFormatTest.kt | 50 +++--------- .../bitbrains/BitbrainsExResourceStateTable.kt | 92 ---------------------- .../org/opendc/trace/bitbrains/BitbrainsExTrace.kt | 45 ----------- .../trace/bitbrains/BitbrainsExTraceFormat.kt | 69 ++++++++++++++-- .../trace/bitbrains/BitbrainsResourceStateTable.kt | 91 --------------------- .../trace/bitbrains/BitbrainsResourceTable.kt | 63 --------------- .../org/opendc/trace/bitbrains/BitbrainsTrace.kt | 46 ----------- .../opendc/trace/bitbrains/BitbrainsTraceFormat.kt | 76 ++++++++++++++++-- .../trace/bitbrains/BitbrainsExTraceFormatTest.kt | 44 +++-------- .../trace/bitbrains/BitbrainsTraceFormatTest.kt | 55 +++---------- .../kotlin/org/opendc/trace/gwf/GwfTaskTable.kt | 58 -------------- .../main/kotlin/org/opendc/trace/gwf/GwfTrace.kt | 46 ----------- .../kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt | 38 ++++++--- .../org/opendc/trace/gwf/GwfTraceFormatTest.kt | 61 +++----------- .../opendc/trace/opendc/OdcVmResourceStateTable.kt | 55 ------------- .../org/opendc/trace/opendc/OdcVmResourceTable.kt | 55 ------------- .../kotlin/org/opendc/trace/opendc/OdcVmTrace.kt | 49 ------------ .../org/opendc/trace/opendc/OdcVmTraceFormat.kt | 54 ++++++++++--- .../opendc/trace/opendc/OdcVmTraceFormatTest.kt | 47 +++-------- .../kotlin/org/opendc/trace/swf/SwfTaskTable.kt | 62 --------------- .../main/kotlin/org/opendc/trace/swf/SwfTrace.kt | 46 ----------- .../kotlin/org/opendc/trace/swf/SwfTraceFormat.kt | 39 +++++++-- .../org/opendc/trace/swf/SwfTraceFormatTest.kt | 53 +++---------- .../org/opendc/trace/tools/TraceConverter.kt | 11 +-- .../org/opendc/trace/wfformat/WfFormatTaskTable.kt | 58 -------------- .../org/opendc/trace/wfformat/WfFormatTrace.kt | 47 ----------- .../opendc/trace/wfformat/WfFormatTraceFormat.kt | 34 ++++++-- .../trace/wfformat/WfFormatTraceFormatTest.kt | 58 +++----------- .../kotlin/org/opendc/trace/wtf/WtfTaskTable.kt | 63 --------------- .../main/kotlin/org/opendc/trace/wtf/WtfTrace.kt | 47 ----------- .../kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt | 43 ++++++++-- .../org/opendc/trace/wtf/WtfTraceFormatTest.kt | 57 +++----------- .../opendc-workflow-service/build.gradle.kts | 3 +- .../opendc/workflow/service/WorkflowServiceTest.kt | 8 +- 45 files changed, 635 insertions(+), 1555 deletions(-) create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TraceImpl.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TableDetails.kt delete mode 100644 opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt delete mode 100644 opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTable.kt delete mode 100644 opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTrace.kt delete mode 100644 opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt delete mode 100644 opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTrace.kt delete mode 100644 opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt delete mode 100644 opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTable.kt delete mode 100644 opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt delete mode 100644 opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt delete mode 100644 opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTrace.kt delete mode 100644 opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt delete mode 100644 opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt delete mode 100644 opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTrace.kt delete mode 100644 opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt delete mode 100644 opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTrace.kt delete mode 100644 opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt delete mode 100644 opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTrace.kt delete mode 100644 opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt delete mode 100644 opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt index ab7f051f..6dba41e6 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt @@ -25,7 +25,6 @@ package org.opendc.compute.workload import mu.KotlinLogging import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.trace.* -import org.opendc.trace.opendc.OdcVmTraceFormat import java.io.File import java.time.Duration import java.time.Instant @@ -44,11 +43,6 @@ public class ComputeWorkloadLoader(private val baseDir: File) { */ private val logger = KotlinLogging.logger {} - /** - * The [OdcVmTraceFormat] instance to load the traces - */ - private val format = OdcVmTraceFormat() - /** * The cache of workloads. */ @@ -159,7 +153,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) { logger.info { "Loading trace $it at $path" } - val trace = format.open(path.toURI().toURL()) + val trace = Trace.open(path, format = "opendc-vm") val fragments = parseFragments(trace) parseMeta(trace, fragments) } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt index 164f5084..031ee269 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt @@ -31,11 +31,6 @@ public interface Table { */ public val name: String - /** - * A flag to indicate that the table is synthetic (derived from another table). - */ - public val isSynthetic: Boolean - /** * The list of columns supported in this table. */ @@ -50,9 +45,4 @@ public interface Table { * Open a [TableReader] for this table. */ public fun newReader(): TableReader - - /** - * Open a [TableReader] for [partition] of the table. - */ - public fun newReader(partition: String): TableReader } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt index 0ae45e86..6d0014cb 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt @@ -22,9 +22,9 @@ package org.opendc.trace +import org.opendc.trace.internal.TraceImpl import org.opendc.trace.spi.TraceFormat import java.io.File -import java.net.URL import java.nio.file.Path /** @@ -47,32 +47,25 @@ public interface Trace { public fun getTable(name: String): Table? public companion object { - /** - * Open a [Trace] at the specified [url] in the given [format]. - * - * @throws IllegalArgumentException if [format] is not supported. - */ - public fun open(url: URL, format: String): Trace { - val provider = requireNotNull(TraceFormat.byName(format)) { "Unknown format $format" } - return provider.open(url) - } - /** * Open a [Trace] at the specified [path] in the given [format]. * + * @param path The path to the trace. * @throws IllegalArgumentException if [format] is not supported. */ public fun open(path: File, format: String): Trace { - return open(path.toURI().toURL(), format) + return open(path.toPath(), format) } /** * Open a [Trace] at the specified [path] in the given [format]. * + * @param path The [Path] to the trace. * @throws IllegalArgumentException if [format] is not supported. */ public fun open(path: Path, format: String): Trace { - return open(path.toUri().toURL(), format) + val provider = requireNotNull(TraceFormat.byName(format)) { "Unknown format $format" } + return TraceImpl(provider, path) } } } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt new file mode 100644 index 00000000..fd0a0f04 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.internal + +import org.opendc.trace.Table +import org.opendc.trace.TableColumn +import org.opendc.trace.TableReader +import java.util.* + +/** + * Internal implementation of [Table]. + */ +internal class TableImpl(val trace: TraceImpl, override val name: String) : Table { + /** + * The details of this table. + */ + private val details = trace.format.getDetails(trace.path, name) + + override val columns: List> + get() = details.columns + + override val partitionKeys: List> + get() = details.partitionKeys + + override fun newReader(): TableReader = trace.format.newReader(trace.path, name) + + override fun toString(): String = "Table[name=$name]" + + override fun hashCode(): Int = Objects.hash(trace, name) + + override fun equals(other: Any?): Boolean = other is TableImpl && trace == other.trace && name == other.name +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TraceImpl.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TraceImpl.kt new file mode 100644 index 00000000..fd9536ab --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TraceImpl.kt @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.internal + +import org.opendc.trace.Table +import org.opendc.trace.Trace +import org.opendc.trace.spi.TraceFormat +import java.nio.file.Path +import java.util.* +import java.util.concurrent.ConcurrentHashMap + +/** + * Internal implementation of the [Trace] interface. + */ +internal class TraceImpl(val format: TraceFormat, val path: Path) : Trace { + /** + * A map containing the [TableImpl] instances associated with the trace. + */ + private val tableMap = ConcurrentHashMap() + + override val tables: List = format.getTables(path) + + init { + for (table in tables) { + tableMap.computeIfAbsent(table) { TableImpl(this, it) } + } + } + + override fun containsTable(name: String): Boolean = tableMap.containsKey(name) + + override fun getTable(name: String): Table? = tableMap[name] + + override fun hashCode(): Int = Objects.hash(format, path) + + override fun equals(other: Any?): Boolean = other is TraceImpl && format == other.format && path == other.path +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TableDetails.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TableDetails.kt new file mode 100644 index 00000000..1a9b9ee1 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TableDetails.kt @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.spi + +import org.opendc.trace.Table +import org.opendc.trace.TableColumn + +/** + * A class used by the [TraceFormat] interface for describing the metadata of a [Table]. + * + * @param columns The available columns in the table. + * @param partitionKeys The table columns that act as partition keys for the table. + */ +public data class TableDetails( + val columns: List>, + val partitionKeys: List> = emptyList() +) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt index 54029fcf..e04dd948 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt @@ -22,8 +22,8 @@ package org.opendc.trace.spi -import org.opendc.trace.Trace -import java.net.URL +import org.opendc.trace.TableReader +import java.nio.file.Path import java.util.* /** @@ -36,11 +36,32 @@ public interface TraceFormat { public val name: String /** - * Open a new [Trace] with this provider. + * Return the name of the tables available in the trace at the specified [path]. * - * @param url A reference to the trace. + * @param path The path to the trace. + * @return The list of tables available in the trace. */ - public fun open(url: URL): Trace + public fun getTables(path: Path): List + + /** + * Return the details of [table] in the trace at the specified [path]. + * + * @param path The path to the trace. + * @param table The name of the table to obtain the details for. + * @throws IllegalArgumentException If [table] does not exist. + * @return The [TableDetails] for the specified [table]. + */ + public fun getDetails(path: Path, table: String): TableDetails + + /** + * Open a [TableReader] for the specified [table]. + * + * @param path The path to the trace to open. + * @param table The name of the table to open a [TableReader] for. + * @throws IllegalArgumentException If [table] does not exist. + * @return A [TableReader] instance for the table. + */ + public fun newReader(path: Path, table: String): TableReader /** * A helper object for resolving providers. @@ -49,6 +70,7 @@ public interface TraceFormat { /** * A list of [TraceFormat] that are available on this system. */ + @JvmStatic public val installedProviders: List by lazy { val loader = ServiceLoader.load(TraceFormat::class.java) loader.toList() @@ -57,6 +79,7 @@ public interface TraceFormat { /** * Obtain a [TraceFormat] implementation by [name]. */ + @JvmStatic public fun byName(name: String): TraceFormat? = installedProviders.find { it.name == name } } } diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt deleted file mode 100644 index 285e7216..00000000 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.azure - -import com.fasterxml.jackson.dataformat.csv.CsvFactory -import org.opendc.trace.* -import org.opendc.trace.util.CompositeTableReader -import java.nio.file.Files -import java.nio.file.Path -import java.util.stream.Collectors -import kotlin.io.path.extension -import kotlin.io.path.nameWithoutExtension - -/** - * The resource state [Table] for the Azure v1 VM traces. - */ -internal class AzureResourceStateTable(private val factory: CsvFactory, path: Path) : Table { - /** - * The partitions that belong to the table. - */ - private val partitions = Files.walk(path.resolve("vm_cpu_readings"), 1) - .filter { !Files.isDirectory(it) && it.extension == "csv" } - .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) - .toSortedMap() - - override val name: String = TABLE_RESOURCE_STATES - - override val isSynthetic: Boolean = false - - override val columns: List> = listOf( - RESOURCE_ID, - RESOURCE_STATE_TIMESTAMP, - RESOURCE_STATE_CPU_USAGE_PCT - ) - - override val partitionKeys: List> = listOf(RESOURCE_STATE_TIMESTAMP) - - override fun newReader(): TableReader { - val it = partitions.iterator() - - return object : CompositeTableReader() { - override fun nextReader(): TableReader? { - return if (it.hasNext()) { - val (_, path) = it.next() - return AzureResourceStateTableReader(factory.createParser(path.toFile())) - } else { - null - } - } - - override fun toString(): String = "AzureCompositeTableReader" - } - } - - override fun newReader(partition: String): TableReader { - val path = requireNotNull(partitions[partition]) { "Invalid partition $partition" } - return AzureResourceStateTableReader(factory.createParser(path.toFile())) - } - - override fun toString(): String = "AzureResourceStateTable" -} diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTable.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTable.kt deleted file mode 100644 index ff7af172..00000000 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTable.kt +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.azure - -import com.fasterxml.jackson.dataformat.csv.CsvFactory -import org.opendc.trace.* -import java.nio.file.Path - -/** - * The resource [Table] for the Azure v1 VM traces. - */ -internal class AzureResourceTable(private val factory: CsvFactory, private val path: Path) : Table { - override val name: String = TABLE_RESOURCES - - override val isSynthetic: Boolean = false - - override val columns: List> = listOf( - RESOURCE_ID, - RESOURCE_START_TIME, - RESOURCE_STOP_TIME, - RESOURCE_CPU_COUNT, - RESOURCE_MEM_CAPACITY - ) - - override val partitionKeys: List> = emptyList() - - override fun newReader(): TableReader { - return AzureResourceTableReader(factory.createParser(path.resolve("vmtable/vmtable.csv").toFile())) - } - - override fun newReader(partition: String): TableReader { - throw IllegalArgumentException("No partition $partition") - } - - override fun toString(): String = "AzureResourceTable" -} diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTrace.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTrace.kt deleted file mode 100644 index c7e7dc36..00000000 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTrace.kt +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.azure - -import com.fasterxml.jackson.dataformat.csv.CsvFactory -import org.opendc.trace.* -import java.nio.file.Path - -/** - * [Trace] implementation for the Azure v1 VM traces. - */ -public class AzureTrace internal constructor(private val factory: CsvFactory, private val path: Path) : Trace { - override val tables: List = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) - - override fun containsTable(name: String): Boolean = name in tables - - override fun getTable(name: String): Table? { - return when (name) { - TABLE_RESOURCES -> AzureResourceTable(factory, path) - TABLE_RESOURCE_STATES -> AzureResourceStateTable(factory, path) - else -> null - } - } - - override fun toString(): String = "AzureTrace[$path]" -} diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt index 1230d857..77af0d81 100644 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt +++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt @@ -24,10 +24,15 @@ package org.opendc.trace.azure import com.fasterxml.jackson.dataformat.csv.CsvFactory import com.fasterxml.jackson.dataformat.csv.CsvParser +import org.opendc.trace.* +import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat -import java.net.URL -import java.nio.file.Paths -import kotlin.io.path.exists +import org.opendc.trace.util.CompositeTableReader +import java.nio.file.Files +import java.nio.file.Path +import java.util.stream.Collectors +import kotlin.io.path.extension +import kotlin.io.path.nameWithoutExtension /** * A format implementation for the Azure v1 format. @@ -45,12 +50,60 @@ public class AzureTraceFormat : TraceFormat { .enable(CsvParser.Feature.ALLOW_COMMENTS) .enable(CsvParser.Feature.TRIM_SPACES) + override fun getTables(path: Path): List = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) + + override fun getDetails(path: Path, table: String): TableDetails { + return when (table) { + TABLE_RESOURCES -> TableDetails( + listOf( + RESOURCE_ID, + RESOURCE_START_TIME, + RESOURCE_STOP_TIME, + RESOURCE_CPU_COUNT, + RESOURCE_MEM_CAPACITY + ) + ) + TABLE_RESOURCE_STATES -> TableDetails( + listOf( + RESOURCE_ID, + RESOURCE_STATE_TIMESTAMP, + RESOURCE_STATE_CPU_USAGE_PCT + ), + listOf(RESOURCE_STATE_TIMESTAMP) + ) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newReader(path: Path, table: String): TableReader { + return when (table) { + TABLE_RESOURCES -> AzureResourceTableReader(factory.createParser(path.resolve("vmtable/vmtable.csv").toFile())) + TABLE_RESOURCE_STATES -> newResourceStateReader(path) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + /** - * Open the trace file. + * Construct a [TableReader] for reading over all VM CPU readings. */ - override fun open(url: URL): AzureTrace { - val path = Paths.get(url.toURI()) - require(path.exists()) { "URL $url does not exist" } - return AzureTrace(factory, path) + private fun newResourceStateReader(path: Path): TableReader { + val partitions = Files.walk(path.resolve("vm_cpu_readings"), 1) + .filter { !Files.isDirectory(it) && it.extension == "csv" } + .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) + .toSortedMap() + val it = partitions.iterator() + + return object : CompositeTableReader() { + override fun nextReader(): TableReader? { + return if (it.hasNext()) { + val (_, partPath) = it.next() + return AzureResourceStateTableReader(factory.createParser(partPath.toFile())) + } else { + null + } + } + + override fun toString(): String = "AzureCompositeTableReader" + } } } diff --git a/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt b/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt index 2c1a2125..b73bb728 100644 --- a/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt @@ -26,8 +26,7 @@ import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows import org.opendc.trace.* -import java.io.File -import java.net.URL +import java.nio.file.Paths /** * Test suite for the [AzureTraceFormat] class. @@ -35,55 +34,30 @@ import java.net.URL class AzureTraceFormatTest { private val format = AzureTraceFormat() - @Test - fun testTraceExists() { - val url = File("src/test/resources/trace").toURI().toURL() - assertDoesNotThrow { - format.open(url) - } - } - - @Test - fun testTraceDoesNotExists() { - val url = File("src/test/resources/trace").toURI().toURL() - assertThrows { - format.open(URL(url.toString() + "help")) - } - } - @Test fun testTables() { - val url = File("src/test/resources/trace").toURI().toURL() - val trace = format.open(url) + val path = Paths.get("src/test/resources/trace") - assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), trace.tables) + assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), format.getTables(path)) } @Test fun testTableExists() { - val url = File("src/test/resources/trace").toURI().toURL() - val table = format.open(url).getTable(TABLE_RESOURCE_STATES) + val path = Paths.get("src/test/resources/trace") - assertNotNull(table) - assertDoesNotThrow { table!!.newReader() } + assertDoesNotThrow { format.getDetails(path, TABLE_RESOURCE_STATES) } } @Test fun testTableDoesNotExist() { - val url = File("src/test/resources/trace").toURI().toURL() - val trace = format.open(url) - - assertFalse(trace.containsTable("test")) - assertNull(trace.getTable("test")) + val path = Paths.get("src/test/resources/trace") + assertThrows { format.getDetails(path, "test") } } @Test fun testResources() { - val url = File("src/test/resources/trace").toURI().toURL() - val trace = format.open(url) - - val reader = trace.getTable(TABLE_RESOURCES)!!.newReader() - + val path = Paths.get("src/test/resources/trace") + val reader = format.newReader(path, TABLE_RESOURCES) assertAll( { assertTrue(reader.nextRow()) }, { assertEquals("x/XsOfHO4ocsV99i4NluqKDuxctW2MMVmwqOPAlg4wp8mqbBOe3wxBlQo0+Qx+uf", reader.get(RESOURCE_ID)) }, @@ -96,10 +70,8 @@ class AzureTraceFormatTest { @Test fun testSmoke() { - val url = File("src/test/resources/trace").toURI().toURL() - val trace = format.open(url) - - val reader = trace.getTable(TABLE_RESOURCE_STATES)!!.newReader() + val path = Paths.get("src/test/resources/trace") + val reader = format.newReader(path, TABLE_RESOURCE_STATES) assertAll( { assertTrue(reader.nextRow()) }, diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt deleted file mode 100644 index 7cb58226..00000000 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.bitbrains - -import org.opendc.trace.* -import org.opendc.trace.util.CompositeTableReader -import java.nio.file.Files -import java.nio.file.Path -import java.util.stream.Collectors -import kotlin.io.path.bufferedReader -import kotlin.io.path.extension -import kotlin.io.path.nameWithoutExtension - -/** - * The resource state [Table] in the extended Bitbrains format. - */ -internal class BitbrainsExResourceStateTable(path: Path) : Table { - /** - * The partitions that belong to the table. - */ - private val partitions = Files.walk(path, 1) - .filter { !Files.isDirectory(it) && it.extension == "txt" } - .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) - .toSortedMap() - - override val name: String = TABLE_RESOURCE_STATES - - override val isSynthetic: Boolean = false - - override val columns: List> = listOf( - RESOURCE_ID, - RESOURCE_CLUSTER_ID, - RESOURCE_STATE_TIMESTAMP, - RESOURCE_CPU_COUNT, - RESOURCE_CPU_CAPACITY, - RESOURCE_STATE_CPU_USAGE, - RESOURCE_STATE_CPU_USAGE_PCT, - RESOURCE_STATE_CPU_DEMAND, - RESOURCE_STATE_CPU_READY_PCT, - RESOURCE_MEM_CAPACITY, - RESOURCE_STATE_DISK_READ, - RESOURCE_STATE_DISK_WRITE, - ) - - override val partitionKeys: List> = listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP) - - override fun newReader(): TableReader { - val it = partitions.iterator() - - return object : CompositeTableReader() { - override fun nextReader(): TableReader? { - return if (it.hasNext()) { - val (_, path) = it.next() - val reader = path.bufferedReader() - return BitbrainsExResourceStateTableReader(reader) - } else { - null - } - } - - override fun toString(): String = "SvCompositeTableReader" - } - } - - override fun newReader(partition: String): TableReader { - val path = requireNotNull(partitions[partition]) { "Invalid partition $partition" } - val reader = path.bufferedReader() - return BitbrainsExResourceStateTableReader(reader) - } - - override fun toString(): String = "BitbrainsExResourceStateTable" -} diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTrace.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTrace.kt deleted file mode 100644 index f16c493d..00000000 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTrace.kt +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.bitbrains - -import org.opendc.trace.* -import java.nio.file.Path - -/** - * [Trace] implementation for the extended Bitbrains format. - */ -public class BitbrainsExTrace internal constructor(private val path: Path) : Trace { - override val tables: List = listOf(TABLE_RESOURCE_STATES) - - override fun containsTable(name: String): Boolean = TABLE_RESOURCE_STATES == name - - override fun getTable(name: String): Table? { - if (!containsTable(name)) { - return null - } - - return BitbrainsExResourceStateTable(path) - } - - override fun toString(): String = "BitbrainsExTrace[$path]" -} diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt index 06388a84..080b73de 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt @@ -22,10 +22,16 @@ package org.opendc.trace.bitbrains +import org.opendc.trace.* +import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat -import java.net.URL -import java.nio.file.Paths -import kotlin.io.path.exists +import org.opendc.trace.util.CompositeTableReader +import java.nio.file.Files +import java.nio.file.Path +import java.util.stream.Collectors +import kotlin.io.path.bufferedReader +import kotlin.io.path.extension +import kotlin.io.path.nameWithoutExtension /** * A format implementation for the extended Bitbrains trace format. @@ -36,12 +42,59 @@ public class BitbrainsExTraceFormat : TraceFormat { */ override val name: String = "bitbrains-ex" + override fun getTables(path: Path): List = listOf(TABLE_RESOURCE_STATES) + + override fun getDetails(path: Path, table: String): TableDetails { + return when (table) { + TABLE_RESOURCE_STATES -> TableDetails( + listOf( + RESOURCE_ID, + RESOURCE_CLUSTER_ID, + RESOURCE_STATE_TIMESTAMP, + RESOURCE_CPU_COUNT, + RESOURCE_CPU_CAPACITY, + RESOURCE_STATE_CPU_USAGE, + RESOURCE_STATE_CPU_USAGE_PCT, + RESOURCE_STATE_CPU_DEMAND, + RESOURCE_STATE_CPU_READY_PCT, + RESOURCE_MEM_CAPACITY, + RESOURCE_STATE_DISK_READ, + RESOURCE_STATE_DISK_WRITE + ), + listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP) + ) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newReader(path: Path, table: String): TableReader { + return when (table) { + TABLE_RESOURCE_STATES -> newResourceStateReader(path) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + /** - * Open the trace file. + * Construct a [TableReader] for reading over all resource state partitions. */ - override fun open(url: URL): BitbrainsExTrace { - val path = Paths.get(url.toURI()) - require(path.exists()) { "URL $url does not exist" } - return BitbrainsExTrace(path) + private fun newResourceStateReader(path: Path): TableReader { + val partitions = Files.walk(path, 1) + .filter { !Files.isDirectory(it) && it.extension == "txt" } + .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) + .toSortedMap() + val it = partitions.iterator() + + return object : CompositeTableReader() { + override fun nextReader(): TableReader? { + return if (it.hasNext()) { + val (_, partPath) = it.next() + return BitbrainsExResourceStateTableReader(partPath.bufferedReader()) + } else { + null + } + } + + override fun toString(): String = "BitbrainsExCompositeTableReader" + } } } diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt deleted file mode 100644 index 7b08b8be..00000000 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.bitbrains - -import com.fasterxml.jackson.dataformat.csv.CsvFactory -import org.opendc.trace.* -import org.opendc.trace.util.CompositeTableReader -import java.nio.file.Files -import java.nio.file.Path -import java.util.stream.Collectors -import kotlin.io.path.extension -import kotlin.io.path.nameWithoutExtension - -/** - * The resource state [Table] in the Bitbrains format. - */ -internal class BitbrainsResourceStateTable(private val factory: CsvFactory, path: Path) : Table { - /** - * The partitions that belong to the table. - */ - private val partitions = - Files.walk(path, 1) - .filter { !Files.isDirectory(it) && it.extension == "csv" } - .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) - .toSortedMap() - - override val name: String = TABLE_RESOURCE_STATES - - override val isSynthetic: Boolean = false - - override val columns: List> = listOf( - RESOURCE_ID, - RESOURCE_STATE_TIMESTAMP, - RESOURCE_CPU_COUNT, - RESOURCE_CPU_CAPACITY, - RESOURCE_STATE_CPU_USAGE, - RESOURCE_STATE_CPU_USAGE_PCT, - RESOURCE_MEM_CAPACITY, - RESOURCE_STATE_MEM_USAGE, - RESOURCE_STATE_DISK_READ, - RESOURCE_STATE_DISK_WRITE, - RESOURCE_STATE_NET_RX, - RESOURCE_STATE_NET_TX, - ) - - override val partitionKeys: List> = listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP) - - override fun newReader(): TableReader { - val it = partitions.iterator() - - return object : CompositeTableReader() { - override fun nextReader(): TableReader? { - return if (it.hasNext()) { - val (partition, path) = it.next() - return BitbrainsResourceStateTableReader(partition, factory.createParser(path.toFile())) - } else { - null - } - } - - override fun toString(): String = "BitbrainsCompositeTableReader" - } - } - - override fun newReader(partition: String): TableReader { - val path = requireNotNull(partitions[partition]) { "Invalid partition $partition" } - return BitbrainsResourceStateTableReader(partition, factory.createParser(path.toFile())) - } - - override fun toString(): String = "BitbrainsResourceStateTable" -} diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTable.kt deleted file mode 100644 index d024af2d..00000000 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTable.kt +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.bitbrains - -import com.fasterxml.jackson.dataformat.csv.CsvFactory -import org.opendc.trace.* -import java.nio.file.Files -import java.nio.file.Path -import java.util.stream.Collectors -import kotlin.io.path.extension -import kotlin.io.path.nameWithoutExtension - -/** - * The resources [Table] in the Bitbrains format. - */ -internal class BitbrainsResourceTable(private val factory: CsvFactory, path: Path) : Table { - /** - * The VMs that belong to the table. - */ - private val vms = - Files.walk(path, 1) - .filter { !Files.isDirectory(it) && it.extension == "csv" } - .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) - .toSortedMap() - - override val name: String = TABLE_RESOURCES - - override val isSynthetic: Boolean = true - - override val columns: List> = listOf(RESOURCE_ID) - - override val partitionKeys: List> = emptyList() - - override fun newReader(): TableReader { - return BitbrainsResourceTableReader(factory, vms) - } - - override fun newReader(partition: String): TableReader { - throw IllegalArgumentException("Unknown partition $partition") - } - - override fun toString(): String = "BitbrainsResourceTable" -} diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt deleted file mode 100644 index bcd8dd52..00000000 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.bitbrains - -import com.fasterxml.jackson.dataformat.csv.CsvFactory -import org.opendc.trace.* -import java.nio.file.Path - -/** - * [Trace] implementation for the Bitbrains format. - */ -public class BitbrainsTrace internal constructor(private val factory: CsvFactory, private val path: Path) : Trace { - override val tables: List = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) - - override fun containsTable(name: String): Boolean = tables.contains(name) - - override fun getTable(name: String): Table? { - return when (name) { - TABLE_RESOURCES -> BitbrainsResourceTable(factory, path) - TABLE_RESOURCE_STATES -> BitbrainsResourceStateTable(factory, path) - else -> null - } - } - - override fun toString(): String = "BitbrainsTrace[$path]" -} diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt index 55b11fe3..1573726f 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt @@ -24,10 +24,15 @@ package org.opendc.trace.bitbrains import com.fasterxml.jackson.dataformat.csv.CsvFactory import com.fasterxml.jackson.dataformat.csv.CsvParser +import org.opendc.trace.* +import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat -import java.net.URL -import java.nio.file.Paths -import kotlin.io.path.exists +import org.opendc.trace.util.CompositeTableReader +import java.nio.file.Files +import java.nio.file.Path +import java.util.stream.Collectors +import kotlin.io.path.extension +import kotlin.io.path.nameWithoutExtension /** * A format implementation for the GWF trace format. @@ -45,12 +50,67 @@ public class BitbrainsTraceFormat : TraceFormat { .enable(CsvParser.Feature.ALLOW_COMMENTS) .enable(CsvParser.Feature.TRIM_SPACES) + override fun getTables(path: Path): List = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) + + override fun getDetails(path: Path, table: String): TableDetails { + return when (table) { + TABLE_RESOURCES -> TableDetails(listOf(RESOURCE_ID)) + TABLE_RESOURCE_STATES -> TableDetails( + listOf( + RESOURCE_ID, + RESOURCE_STATE_TIMESTAMP, + RESOURCE_CPU_COUNT, + RESOURCE_CPU_CAPACITY, + RESOURCE_STATE_CPU_USAGE, + RESOURCE_STATE_CPU_USAGE_PCT, + RESOURCE_MEM_CAPACITY, + RESOURCE_STATE_MEM_USAGE, + RESOURCE_STATE_DISK_READ, + RESOURCE_STATE_DISK_WRITE, + RESOURCE_STATE_NET_RX, + RESOURCE_STATE_NET_TX, + ), + listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP) + ) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newReader(path: Path, table: String): TableReader { + return when (table) { + TABLE_RESOURCES -> { + val vms = Files.walk(path, 1) + .filter { !Files.isDirectory(it) && it.extension == "csv" } + .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) + .toSortedMap() + BitbrainsResourceTableReader(factory, vms) + } + TABLE_RESOURCE_STATES -> newResourceStateReader(path) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + /** - * Open a Bitbrains trace. + * Construct a [TableReader] for reading over all resource state partitions. */ - override fun open(url: URL): BitbrainsTrace { - val path = Paths.get(url.toURI()) - require(path.exists()) { "URL $url does not exist" } - return BitbrainsTrace(factory, path) + private fun newResourceStateReader(path: Path): TableReader { + val partitions = Files.walk(path, 1) + .filter { !Files.isDirectory(it) && it.extension == "csv" } + .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) + .toSortedMap() + val it = partitions.iterator() + + return object : CompositeTableReader() { + override fun nextReader(): TableReader? { + return if (it.hasNext()) { + val (partition, partPath) = it.next() + return BitbrainsResourceStateTableReader(partition, factory.createParser(partPath.toFile())) + } else { + null + } + } + + override fun toString(): String = "BitbrainsCompositeTableReader" + } } } diff --git a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt index 2e4f176a..d734cf5f 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt @@ -26,62 +26,38 @@ import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows import org.opendc.trace.* -import java.net.URL +import java.nio.file.Paths /** * Test suite for the [BitbrainsExTraceFormat] class. */ -class BitbrainsExTraceFormatTest { +internal class BitbrainsExTraceFormatTest { private val format = BitbrainsExTraceFormat() - @Test - fun testTraceExists() { - val url = checkNotNull(BitbrainsExTraceFormatTest::class.java.getResource("/vm.txt")) - assertDoesNotThrow { - format.open(url) - } - } - - @Test - fun testTraceDoesNotExists() { - val url = checkNotNull(BitbrainsExTraceFormatTest::class.java.getResource("/vm.txt")) - assertThrows { - format.open(URL(url.toString() + "help")) - } - } - @Test fun testTables() { - val url = checkNotNull(BitbrainsExTraceFormatTest::class.java.getResource("/vm.txt")) - val trace = format.open(url) + val path = Paths.get("src/test/resources/vm.txt") - assertEquals(listOf(TABLE_RESOURCE_STATES), trace.tables) + assertEquals(listOf(TABLE_RESOURCE_STATES), format.getTables(path)) } @Test fun testTableExists() { - val url = checkNotNull(BitbrainsExTraceFormatTest::class.java.getResource("/vm.txt")) - val table = format.open(url).getTable(TABLE_RESOURCE_STATES) + val path = Paths.get("src/test/resources/vm.txt") - assertNotNull(table) - assertDoesNotThrow { table!!.newReader() } + assertDoesNotThrow { format.getDetails(path, TABLE_RESOURCE_STATES) } } @Test fun testTableDoesNotExist() { - val url = checkNotNull(BitbrainsExTraceFormatTest::class.java.getResource("/vm.txt")) - val trace = format.open(url) - - assertFalse(trace.containsTable("test")) - assertNull(trace.getTable("test")) + val path = Paths.get("src/test/resources/vm.txt") + assertThrows { format.getDetails(path, "test") } } @Test fun testSmoke() { - val url = checkNotNull(BitbrainsExTraceFormatTest::class.java.getResource("/vm.txt")) - val trace = format.open(url) - - val reader = trace.getTable(TABLE_RESOURCE_STATES)!!.newReader() + val path = Paths.get("src/test/resources/vm.txt") + val reader = format.newReader(path, TABLE_RESOURCE_STATES) assertAll( { assertTrue(reader.nextRow()) }, diff --git a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt index ff4a33f8..41e7def2 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt @@ -26,66 +26,38 @@ import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows import org.opendc.trace.* -import java.net.URL +import java.nio.file.Paths /** * Test suite for the [BitbrainsTraceFormat] class. */ class BitbrainsTraceFormatTest { - @Test - fun testTraceExists() { - val format = BitbrainsTraceFormat() - val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv")) - assertDoesNotThrow { - format.open(url) - } - } - - @Test - fun testTraceDoesNotExists() { - val format = BitbrainsTraceFormat() - val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv")) - assertThrows { - format.open(URL(url.toString() + "help")) - } - } + private val format = BitbrainsTraceFormat() @Test fun testTables() { - val format = BitbrainsTraceFormat() - val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv")) - val trace = format.open(url) + val path = Paths.get("src/test/resources/bitbrains.csv") - assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), trace.tables) + assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), format.getTables(path)) } @Test fun testTableExists() { - val format = BitbrainsTraceFormat() - val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv")) - val table = format.open(url).getTable(TABLE_RESOURCE_STATES) + val path = Paths.get("src/test/resources/bitbrains.csv") - assertNotNull(table) - assertDoesNotThrow { table!!.newReader() } + assertDoesNotThrow { format.getDetails(path, TABLE_RESOURCE_STATES) } } @Test fun testTableDoesNotExist() { - val format = BitbrainsTraceFormat() - val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv")) - val trace = format.open(url) - - assertFalse(trace.containsTable("test")) - assertNull(trace.getTable("test")) + val path = Paths.get("src/test/resources/bitbrains.csv") + assertThrows { format.getDetails(path, "test") } } @Test fun testResources() { - val format = BitbrainsTraceFormat() - val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv")) - val trace = format.open(url) - - val reader = trace.getTable(TABLE_RESOURCES)!!.newReader() + val path = Paths.get("src/test/resources/bitbrains.csv") + val reader = format.newReader(path, TABLE_RESOURCES) assertAll( { assertTrue(reader.nextRow()) }, @@ -98,11 +70,8 @@ class BitbrainsTraceFormatTest { @Test fun testSmoke() { - val format = BitbrainsTraceFormat() - val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv")) - val trace = format.open(url) - - val reader = trace.getTable(TABLE_RESOURCE_STATES)!!.newReader() + val path = Paths.get("src/test/resources/bitbrains.csv") + val reader = format.newReader(path, TABLE_RESOURCE_STATES) assertAll( { assertTrue(reader.nextRow()) }, diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt deleted file mode 100644 index ca720de4..00000000 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.gwf - -import com.fasterxml.jackson.dataformat.csv.CsvFactory -import org.opendc.trace.* -import java.net.URL - -/** - * A [Table] containing the tasks in a GWF trace. - */ -internal class GwfTaskTable(private val factory: CsvFactory, private val url: URL) : Table { - override val name: String = TABLE_TASKS - - override val isSynthetic: Boolean = false - - override val columns: List> = listOf( - TASK_WORKFLOW_ID, - TASK_ID, - TASK_SUBMIT_TIME, - TASK_RUNTIME, - TASK_REQ_NCPUS, - TASK_ALLOC_NCPUS, - TASK_PARENTS - ) - - override val partitionKeys: List> = listOf(TASK_WORKFLOW_ID) - - override fun newReader(): TableReader { - return GwfTaskTableReader(factory.createParser(url)) - } - - override fun newReader(partition: String): TableReader { - throw IllegalArgumentException("Invalid partition $partition") - } - - override fun toString(): String = "GwfTaskTable" -} diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTrace.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTrace.kt deleted file mode 100644 index 166c1e56..00000000 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTrace.kt +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.gwf - -import com.fasterxml.jackson.dataformat.csv.CsvFactory -import org.opendc.trace.* -import java.net.URL - -/** - * [Trace] implementation for the GWF format. - */ -public class GwfTrace internal constructor(private val factory: CsvFactory, private val url: URL) : Trace { - override val tables: List = listOf(TABLE_TASKS) - - override fun containsTable(name: String): Boolean = TABLE_TASKS == name - - override fun getTable(name: String): Table? { - if (!containsTable(name)) { - return null - } - - return GwfTaskTable(factory, url) - } - - override fun toString(): String = "GwfTrace[$url]" -} diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt index 6d542503..0f7b9d6e 100644 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt @@ -24,10 +24,10 @@ package org.opendc.trace.gwf import com.fasterxml.jackson.dataformat.csv.CsvFactory import com.fasterxml.jackson.dataformat.csv.CsvParser +import org.opendc.trace.* +import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat -import java.net.URL -import java.nio.file.Paths -import kotlin.io.path.exists +import java.nio.file.Path /** * A [TraceFormat] implementation for the GWF trace format. @@ -45,12 +45,30 @@ public class GwfTraceFormat : TraceFormat { .enable(CsvParser.Feature.ALLOW_COMMENTS) .enable(CsvParser.Feature.TRIM_SPACES) - /** - * Read the tasks in the GWF trace. - */ - public override fun open(url: URL): GwfTrace { - val path = Paths.get(url.toURI()) - require(path.exists()) { "URL $url does not exist" } - return GwfTrace(factory, url) + override fun getTables(path: Path): List = listOf(TABLE_TASKS) + + override fun getDetails(path: Path, table: String): TableDetails { + return when (table) { + TABLE_TASKS -> TableDetails( + listOf( + TASK_WORKFLOW_ID, + TASK_ID, + TASK_SUBMIT_TIME, + TASK_RUNTIME, + TASK_REQ_NCPUS, + TASK_ALLOC_NCPUS, + TASK_PARENTS, + ), + listOf(TASK_WORKFLOW_ID) + ) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newReader(path: Path, table: String): TableReader { + return when (table) { + TABLE_TASKS -> GwfTaskTableReader(factory.createParser(path.toFile())) + else -> throw IllegalArgumentException("Table $table not supported") + } } } diff --git a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt index b209b979..7fe403b2 100644 --- a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt @@ -22,13 +22,10 @@ package org.opendc.trace.gwf +import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.* -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertAll -import org.junit.jupiter.api.assertDoesNotThrow -import org.junit.jupiter.api.assertThrows import org.opendc.trace.* -import java.net.URL +import java.nio.file.Paths import java.time.Duration import java.time.Instant @@ -36,59 +33,32 @@ import java.time.Instant * Test suite for the [GwfTraceFormat] class. */ internal class GwfTraceFormatTest { - @Test - fun testTraceExists() { - val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")) - val format = GwfTraceFormat() - assertDoesNotThrow { - format.open(input) - } - } - - @Test - fun testTraceDoesNotExists() { - val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")) - val format = GwfTraceFormat() - assertThrows { - format.open(URL(input.toString() + "help")) - } - } + private val format = GwfTraceFormat() @Test fun testTables() { - val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")) - val format = GwfTraceFormat() - val trace = format.open(input) + val path = Paths.get(checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")).toURI()) - assertEquals(listOf(TABLE_TASKS), trace.tables) + assertEquals(listOf(TABLE_TASKS), format.getTables(path)) } @Test fun testTableExists() { - val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")) - val format = GwfTraceFormat() - val table = format.open(input).getTable(TABLE_TASKS) - - assertNotNull(table) - assertDoesNotThrow { table!!.newReader() } + val path = Paths.get(checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")).toURI()) + assertDoesNotThrow { format.getDetails(path, TABLE_TASKS) } } @Test fun testTableDoesNotExist() { - val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")) - val format = GwfTraceFormat() - val trace = format.open(input) + val path = Paths.get(checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")).toURI()) - assertFalse(trace.containsTable("test")) - assertNull(trace.getTable("test")) + assertThrows { format.getDetails(path, "test") } } @Test fun testTableReader() { - val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")) - val format = GwfTraceFormat() - val table = format.open(input).getTable(TABLE_TASKS)!! - val reader = table.newReader() + val path = Paths.get(checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")).toURI()) + val reader = format.newReader(path, TABLE_TASKS) assertAll( { assertTrue(reader.nextRow()) }, @@ -99,13 +69,4 @@ internal class GwfTraceFormatTest { { assertEquals(emptySet(), reader.get(TASK_PARENTS)) }, ) } - - @Test - fun testTableReaderPartition() { - val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")) - val format = GwfTraceFormat() - val table = format.open(input).getTable(TABLE_TASKS)!! - - assertThrows { table.newReader("test") } - } } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt deleted file mode 100644 index caacf192..00000000 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.opendc - -import org.apache.avro.generic.GenericRecord -import org.opendc.trace.* -import org.opendc.trace.util.parquet.LocalParquetReader -import java.nio.file.Path - -/** - * The resource state [Table] in the OpenDC virtual machine trace format. - */ -internal class OdcVmResourceStateTable(private val path: Path) : Table { - override val name: String = TABLE_RESOURCE_STATES - override val isSynthetic: Boolean = false - - override val columns: List> = listOf( - RESOURCE_ID, - RESOURCE_STATE_TIMESTAMP, - RESOURCE_STATE_DURATION, - RESOURCE_CPU_COUNT, - RESOURCE_STATE_CPU_USAGE, - ) - - override val partitionKeys: List> = listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP) - - override fun newReader(): TableReader { - val reader = LocalParquetReader(path.resolve("trace.parquet")) - return OdcVmResourceStateTableReader(reader) - } - - override fun newReader(partition: String): TableReader { - throw IllegalArgumentException("Unknown partition $partition") - } -} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt deleted file mode 100644 index 653b28b8..00000000 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.opendc - -import org.apache.avro.generic.GenericRecord -import org.opendc.trace.* -import org.opendc.trace.util.parquet.LocalParquetReader -import java.nio.file.Path - -/** - * The resource [Table] for the OpenDC virtual machine trace format. - */ -internal class OdcVmResourceTable(private val path: Path) : Table { - override val name: String = TABLE_RESOURCES - override val isSynthetic: Boolean = false - - override val columns: List> = listOf( - RESOURCE_ID, - RESOURCE_START_TIME, - RESOURCE_STOP_TIME, - RESOURCE_CPU_COUNT, - RESOURCE_MEM_CAPACITY, - ) - - override val partitionKeys: List> = emptyList() - - override fun newReader(): TableReader { - val reader = LocalParquetReader(path.resolve("meta.parquet")) - return OdcVmResourceTableReader(reader) - } - - override fun newReader(partition: String): TableReader { - throw IllegalArgumentException("Unknown partition $partition") - } -} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTrace.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTrace.kt deleted file mode 100644 index 3e5029b4..00000000 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTrace.kt +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.opendc - -import org.opendc.trace.TABLE_RESOURCES -import org.opendc.trace.TABLE_RESOURCE_STATES -import org.opendc.trace.Table -import org.opendc.trace.Trace -import java.nio.file.Path - -/** - * A [Trace] in the OpenDC virtual machine trace format. - */ -public class OdcVmTrace internal constructor(private val path: Path) : Trace { - override val tables: List = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) - - override fun containsTable(name: String): Boolean = - name == TABLE_RESOURCES || name == TABLE_RESOURCE_STATES - - override fun getTable(name: String): Table? { - return when (name) { - TABLE_RESOURCES -> OdcVmResourceTable(path) - TABLE_RESOURCE_STATES -> OdcVmResourceStateTable(path) - else -> null - } - } - - override fun toString(): String = "OdcVmTrace[$path]" -} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt index 8edba725..29818147 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt @@ -24,11 +24,13 @@ package org.opendc.trace.opendc import org.apache.avro.Schema import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericRecord +import org.opendc.trace.* +import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat +import org.opendc.trace.util.parquet.LocalParquetReader import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA -import java.net.URL -import java.nio.file.Paths -import kotlin.io.path.exists +import java.nio.file.Path /** * A [TraceFormat] implementation of the OpenDC virtual machine trace format. @@ -39,13 +41,45 @@ public class OdcVmTraceFormat : TraceFormat { */ override val name: String = "opendc-vm" - /** - * Open a Bitbrains Parquet trace. - */ - override fun open(url: URL): OdcVmTrace { - val path = Paths.get(url.toURI()) - require(path.exists()) { "URL $url does not exist" } - return OdcVmTrace(path) + override fun getTables(path: Path): List = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) + + override fun getDetails(path: Path, table: String): TableDetails { + return when (table) { + TABLE_RESOURCES -> TableDetails( + listOf( + RESOURCE_ID, + RESOURCE_START_TIME, + RESOURCE_STOP_TIME, + RESOURCE_CPU_COUNT, + RESOURCE_MEM_CAPACITY, + ) + ) + TABLE_RESOURCE_STATES -> TableDetails( + listOf( + RESOURCE_ID, + RESOURCE_STATE_TIMESTAMP, + RESOURCE_STATE_DURATION, + RESOURCE_CPU_COUNT, + RESOURCE_STATE_CPU_USAGE, + ), + listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP) + ) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newReader(path: Path, table: String): TableReader { + return when (table) { + TABLE_RESOURCES -> { + val reader = LocalParquetReader(path.resolve("meta.parquet")) + OdcVmResourceTableReader(reader) + } + TABLE_RESOURCE_STATES -> { + val reader = LocalParquetReader(path.resolve("trace.parquet")) + OdcVmResourceStateTableReader(reader) + } + else -> throw IllegalArgumentException("Table $table not supported") + } } public companion object { diff --git a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt index 9fb6028d..bfe0f881 100644 --- a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt @@ -29,8 +29,7 @@ import org.junit.jupiter.api.assertThrows import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource import org.opendc.trace.* -import java.io.File -import java.net.URL +import java.nio.file.Paths /** * Test suite for the [OdcVmTraceFormat] implementation. @@ -38,53 +37,31 @@ import java.net.URL internal class OdcVmTraceFormatTest { private val format = OdcVmTraceFormat() - @Test - fun testTraceExists() { - val url = File("src/test/resources/trace-v2.1").toURI().toURL() - assertDoesNotThrow { format.open(url) } - } - - @Test - fun testTraceDoesNotExists() { - val url = File("src/test/resources/trace-v2.1").toURI().toURL() - assertThrows { - format.open(URL(url.toString() + "help")) - } - } - @Test fun testTables() { - val url = File("src/test/resources/trace-v2.1").toURI().toURL() - val trace = format.open(url) + val path = Paths.get("src/test/resources/trace-v2.1") - assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), trace.tables) + assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), format.getTables(path)) } @Test fun testTableExists() { - val url = File("src/test/resources/trace-v2.1").toURI().toURL() - val table = format.open(url).getTable(TABLE_RESOURCE_STATES) + val path = Paths.get("src/test/resources/trace-v2.1") - assertNotNull(table) - assertDoesNotThrow { table!!.newReader() } + assertDoesNotThrow { format.getDetails(path, TABLE_RESOURCE_STATES) } } @Test fun testTableDoesNotExist() { - val url = File("src/test/resources/trace-v2.1").toURI().toURL() - val trace = format.open(url) - - assertFalse(trace.containsTable("test")) - assertNull(trace.getTable("test")) + val path = Paths.get("src/test/resources/trace-v2.1") + assertThrows { format.getDetails(path, "test") } } @ParameterizedTest @ValueSource(strings = ["trace-v2.0", "trace-v2.1"]) fun testResources(name: String) { - val url = File("src/test/resources/$name").toURI().toURL() - val trace = format.open(url) - - val reader = trace.getTable(TABLE_RESOURCES)!!.newReader() + val path = Paths.get("src/test/resources/$name") + val reader = format.newReader(path, TABLE_RESOURCES) assertAll( { assertTrue(reader.nextRow()) }, @@ -104,10 +81,8 @@ internal class OdcVmTraceFormatTest { @ParameterizedTest @ValueSource(strings = ["trace-v2.0", "trace-v2.1"]) fun testSmoke(name: String) { - val url = File("src/test/resources/$name").toURI().toURL() - val trace = format.open(url) - - val reader = trace.getTable(TABLE_RESOURCE_STATES)!!.newReader() + val path = Paths.get("src/test/resources/$name") + val reader = format.newReader(path, TABLE_RESOURCE_STATES) assertAll( { assertTrue(reader.nextRow()) }, diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt deleted file mode 100644 index 4898779d..00000000 --- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.swf - -import org.opendc.trace.* -import java.nio.file.Path -import kotlin.io.path.bufferedReader - -/** - * A [Table] containing the tasks in a SWF trace. - */ -internal class SwfTaskTable(private val path: Path) : Table { - override val name: String = TABLE_TASKS - - override val isSynthetic: Boolean = false - - override val columns: List> = listOf( - TASK_ID, - TASK_SUBMIT_TIME, - TASK_WAIT_TIME, - TASK_RUNTIME, - TASK_REQ_NCPUS, - TASK_ALLOC_NCPUS, - TASK_PARENTS, - TASK_STATUS, - TASK_GROUP_ID, - TASK_USER_ID - ) - - override val partitionKeys: List> = emptyList() - - override fun newReader(): TableReader { - val reader = path.bufferedReader() - return SwfTaskTableReader(reader) - } - - override fun newReader(partition: String): TableReader { - throw IllegalArgumentException("Invalid partition $partition") - } - - override fun toString(): String = "SwfTaskTable" -} diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTrace.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTrace.kt deleted file mode 100644 index d4da735e..00000000 --- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTrace.kt +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.swf - -import org.opendc.trace.TABLE_TASKS -import org.opendc.trace.Table -import org.opendc.trace.Trace -import java.nio.file.Path - -/** - * [Trace] implementation for the SWF format. - */ -public class SwfTrace internal constructor(private val path: Path) : Trace { - override val tables: List = listOf(TABLE_TASKS) - - override fun containsTable(name: String): Boolean = TABLE_TASKS == name - - override fun getTable(name: String): Table? { - if (!containsTable(name)) { - return null - } - return SwfTaskTable(path) - } - - override fun toString(): String = "SwfTrace[$path]" -} diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt index 36c3122e..4cb7e49e 100644 --- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt +++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt @@ -22,10 +22,11 @@ package org.opendc.trace.swf +import org.opendc.trace.* +import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat -import java.net.URL -import java.nio.file.Paths -import kotlin.io.path.exists +import java.nio.file.Path +import kotlin.io.path.bufferedReader /** * Support for the Standard Workload Format (SWF) in OpenDC. @@ -35,9 +36,33 @@ import kotlin.io.path.exists public class SwfTraceFormat : TraceFormat { override val name: String = "swf" - override fun open(url: URL): SwfTrace { - val path = Paths.get(url.toURI()) - require(path.exists()) { "URL $url does not exist" } - return SwfTrace(path) + override fun getTables(path: Path): List = listOf(TABLE_TASKS) + + override fun getDetails(path: Path, table: String): TableDetails { + return when (table) { + TABLE_TASKS -> TableDetails( + listOf( + TASK_ID, + TASK_SUBMIT_TIME, + TASK_WAIT_TIME, + TASK_RUNTIME, + TASK_REQ_NCPUS, + TASK_ALLOC_NCPUS, + TASK_PARENTS, + TASK_STATUS, + TASK_GROUP_ID, + TASK_USER_ID + ), + emptyList() + ) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newReader(path: Path, table: String): TableReader { + return when (table) { + TABLE_TASKS -> SwfTaskTableReader(path.bufferedReader()) + else -> throw IllegalArgumentException("Table $table not supported") + } } } diff --git a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt index 828c2bfa..4dcd43f6 100644 --- a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt @@ -27,61 +27,38 @@ import org.junit.jupiter.api.Assertions.* import org.opendc.trace.TABLE_TASKS import org.opendc.trace.TASK_ALLOC_NCPUS import org.opendc.trace.TASK_ID -import java.net.URL +import java.nio.file.Paths /** * Test suite for the [SwfTraceFormat] class. */ internal class SwfTraceFormatTest { - @Test - fun testTraceExists() { - val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")) - val format = SwfTraceFormat() - assertDoesNotThrow { - format.open(input) - } - } - - @Test - fun testTraceDoesNotExists() { - val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")) - val format = SwfTraceFormat() - assertThrows { - format.open(URL(input.toString() + "help")) - } - } + private val format = SwfTraceFormat() @Test fun testTables() { - val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")) - val trace = SwfTraceFormat().open(input) + val path = Paths.get(checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")).toURI()) - assertEquals(listOf(TABLE_TASKS), trace.tables) + assertEquals(listOf(TABLE_TASKS), format.getTables(path)) } @Test fun testTableExists() { - val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")) - val table = SwfTraceFormat().open(input).getTable(TABLE_TASKS) - - assertNotNull(table) - assertDoesNotThrow { table!!.newReader() } + val path = Paths.get(checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")).toURI()) + assertDoesNotThrow { format.getDetails(path, TABLE_TASKS) } } @Test fun testTableDoesNotExist() { - val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")) - val trace = SwfTraceFormat().open(input) + val path = Paths.get(checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")).toURI()) - assertFalse(trace.containsTable("test")) - assertNull(trace.getTable("test")) + assertThrows { format.getDetails(path, "test") } } @Test fun testReader() { - val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")) - val trace = SwfTraceFormat().open(input) - val reader = trace.getTable(TABLE_TASKS)!!.newReader() + val path = Paths.get(checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")).toURI()) + val reader = format.newReader(path, TABLE_TASKS) assertAll( { assertTrue(reader.nextRow()) }, @@ -94,14 +71,4 @@ internal class SwfTraceFormatTest { reader.close() } - - @Test - fun testReaderPartition() { - val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")) - val trace = SwfTraceFormat().open(input) - - assertThrows { - trace.getTable(TABLE_TASKS)!!.newReader("test") - } - } } diff --git a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt index 0b089904..cd5d287f 100644 --- a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt +++ b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt @@ -35,9 +35,6 @@ import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.opendc.trace.* -import org.opendc.trace.azure.AzureTraceFormat -import org.opendc.trace.bitbrains.BitbrainsExTraceFormat -import org.opendc.trace.bitbrains.BitbrainsTraceFormat import org.opendc.trace.opendc.OdcVmTraceFormat import org.opendc.trace.util.parquet.LocalOutputFile import java.io.File @@ -78,11 +75,7 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { * The input format of the trace. */ private val format by option("-f", "--format", help = "input format of trace") - .choice( - "solvinity" to BitbrainsExTraceFormat(), - "bitbrains" to BitbrainsTraceFormat(), - "azure" to AzureTraceFormat() - ) + .choice("bitbrains-ex", "bitbrains", "azure") .required() /** @@ -101,7 +94,7 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { traceParquet.delete() } - val trace = format.open(input.toURI().toURL()) + val trace = Trace.open(input, format = format) logger.info { "Building resources table" } diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt deleted file mode 100644 index 17aeee97..00000000 --- a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.wfformat - -import com.fasterxml.jackson.core.JsonFactory -import org.opendc.trace.* -import java.nio.file.Path - -/** - * A [Table] containing the tasks in a WfCommons workload trace. - */ -internal class WfFormatTaskTable(private val factory: JsonFactory, private val path: Path) : Table { - override val name: String = TABLE_TASKS - - override val isSynthetic: Boolean = false - - override val columns: List> = listOf( - TASK_ID, - TASK_WORKFLOW_ID, - TASK_RUNTIME, - TASK_REQ_NCPUS, - TASK_PARENTS, - TASK_CHILDREN - ) - - override val partitionKeys: List> = emptyList() - - override fun newReader(): TableReader { - val parser = factory.createParser(path.toFile()) - return WfFormatTaskTableReader(parser) - } - - override fun newReader(partition: String): TableReader { - throw IllegalArgumentException("Invalid partition $partition") - } - - override fun toString(): String = "WfFormatTaskTable" -} diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTrace.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTrace.kt deleted file mode 100644 index 2d9c79fb..00000000 --- a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTrace.kt +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.wfformat - -import com.fasterxml.jackson.core.JsonFactory -import org.opendc.trace.TABLE_TASKS -import org.opendc.trace.Table -import org.opendc.trace.Trace -import java.nio.file.Path - -/** - * [Trace] implementation for the WfCommons workload trace format. - */ -public class WfFormatTrace internal constructor(private val factory: JsonFactory, private val path: Path) : Trace { - override val tables: List = listOf(TABLE_TASKS) - - override fun containsTable(name: String): Boolean = TABLE_TASKS == name - - override fun getTable(name: String): Table? { - return when (name) { - TABLE_TASKS -> WfFormatTaskTable(factory, path) - else -> null - } - } - - override fun toString(): String = "WfFormatTrace[$path]" -} diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt index ff8d054c..825c3d6d 100644 --- a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt +++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt @@ -23,10 +23,10 @@ package org.opendc.trace.wfformat import com.fasterxml.jackson.core.JsonFactory +import org.opendc.trace.* +import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat -import java.net.URL -import java.nio.file.Paths -import kotlin.io.path.exists +import java.nio.file.Path /** * A [TraceFormat] implementation for the WfCommons workload trace format. @@ -39,9 +39,29 @@ public class WfFormatTraceFormat : TraceFormat { override val name: String = "wfformat" - override fun open(url: URL): WfFormatTrace { - val path = Paths.get(url.toURI()) - require(path.exists()) { "URL $url does not exist" } - return WfFormatTrace(factory, path) + override fun getTables(path: Path): List = listOf(TABLE_TASKS) + + override fun getDetails(path: Path, table: String): TableDetails { + return when (table) { + TABLE_TASKS -> TableDetails( + listOf( + TASK_ID, + TASK_WORKFLOW_ID, + TASK_RUNTIME, + TASK_REQ_NCPUS, + TASK_PARENTS, + TASK_CHILDREN + ), + emptyList() + ) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newReader(path: Path, table: String): TableReader { + return when (table) { + TABLE_TASKS -> WfFormatTaskTableReader(factory.createParser(path.toFile())) + else -> throw IllegalArgumentException("Table $table not supported") + } } } diff --git a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt index 0bfc8840..217b175d 100644 --- a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt @@ -22,59 +22,38 @@ package org.opendc.trace.wfformat +import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow import org.junit.jupiter.api.assertThrows import org.opendc.trace.* -import java.io.File -import java.net.URL +import java.nio.file.Paths /** * Test suite for the [WfFormatTraceFormat] class. */ class WfFormatTraceFormatTest { - @Test - fun testTraceExists() { - val input = File("src/test/resources/trace.json").toURI().toURL() - val format = WfFormatTraceFormat() - assertDoesNotThrow { format.open(input) } - } - - @Test - fun testTraceDoesNotExists() { - val input = File("src/test/resources/trace.json").toURI().toURL() - val format = WfFormatTraceFormat() - assertThrows { format.open(URL(input.toString() + "help")) } - } + private val format = WfFormatTraceFormat() @Test fun testTables() { - val input = File("src/test/resources/trace.json").toURI().toURL() - val format = WfFormatTraceFormat() - val trace = format.open(input) + val path = Paths.get("src/test/resources/trace.json") - assertEquals(listOf(TABLE_TASKS), trace.tables) + assertEquals(listOf(TABLE_TASKS), format.getTables(path)) } @Test fun testTableExists() { - val input = File("src/test/resources/trace.json").toURI().toURL() - val format = WfFormatTraceFormat() - val table = format.open(input).getTable(TABLE_TASKS) - - assertNotNull(table) - assertDoesNotThrow { table!!.newReader() } + val path = Paths.get("src/test/resources/trace.json") + Assertions.assertDoesNotThrow { format.getDetails(path, TABLE_TASKS) } } @Test fun testTableDoesNotExist() { - val input = File("src/test/resources/trace.json").toURI().toURL() - val format = WfFormatTraceFormat() - val trace = format.open(input) + val path = Paths.get("src/test/resources/trace.json") - assertFalse(trace.containsTable("test")) - assertNull(trace.getTable("test")) + assertThrows { format.getDetails(path, "test") } } /** @@ -82,9 +61,8 @@ class WfFormatTraceFormatTest { */ @Test fun testTableReader() { - val input = File("src/test/resources/trace.json").toURI().toURL() - val trace = WfFormatTraceFormat().open(input) - val reader = trace.getTable(TABLE_TASKS)!!.newReader() + val path = Paths.get("src/test/resources/trace.json") + val reader = format.newReader(path, TABLE_TASKS) assertAll( { assertTrue(reader.nextRow()) }, @@ -110,9 +88,8 @@ class WfFormatTraceFormatTest { */ @Test fun testTableReaderFull() { - val input = File("src/test/resources/trace.json").toURI().toURL() - val trace = WfFormatTraceFormat().open(input) - val reader = trace.getTable(TABLE_TASKS)!!.newReader() + val path = Paths.get("src/test/resources/trace.json") + val reader = format.newReader(path, TABLE_TASKS) assertDoesNotThrow { while (reader.nextRow()) { @@ -121,13 +98,4 @@ class WfFormatTraceFormatTest { reader.close() } } - - @Test - fun testTableReaderPartition() { - val input = File("src/test/resources/trace.json").toURI().toURL() - val format = WfFormatTraceFormat() - val table = format.open(input).getTable(TABLE_TASKS)!! - - assertThrows { table.newReader("test") } - } } diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt deleted file mode 100644 index 410bb347..00000000 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.wtf - -import org.apache.avro.generic.GenericRecord -import org.opendc.trace.* -import org.opendc.trace.util.parquet.LocalParquetReader -import java.nio.file.Path - -/** - * A [Table] containing the tasks in a GWF trace. - */ -internal class WtfTaskTable(private val path: Path) : Table { - override val name: String = TABLE_TASKS - - override val isSynthetic: Boolean = false - - override val columns: List> = listOf( - TASK_ID, - TASK_WORKFLOW_ID, - TASK_SUBMIT_TIME, - TASK_WAIT_TIME, - TASK_RUNTIME, - TASK_REQ_NCPUS, - TASK_PARENTS, - TASK_CHILDREN, - TASK_GROUP_ID, - TASK_USER_ID - ) - - override val partitionKeys: List> = listOf(TASK_SUBMIT_TIME) - - override fun newReader(): TableReader { - val reader = LocalParquetReader(path.resolve("tasks/schema-1.0")) - return WtfTaskTableReader(reader) - } - - override fun newReader(partition: String): TableReader { - throw IllegalArgumentException("Invalid partition $partition") - } - - override fun toString(): String = "WtfTaskTable" -} diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt deleted file mode 100644 index a755a107..00000000 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.wtf - -import org.opendc.trace.TABLE_TASKS -import org.opendc.trace.Table -import org.opendc.trace.Trace -import java.nio.file.Path - -/** - * [Trace] implementation for the WTF format. - */ -public class WtfTrace internal constructor(private val path: Path) : Trace { - override val tables: List = listOf(TABLE_TASKS) - - override fun containsTable(name: String): Boolean = TABLE_TASKS == name - - override fun getTable(name: String): Table? { - if (!containsTable(name)) { - return null - } - - return WtfTaskTable(path) - } - - override fun toString(): String = "WtfTrace[$path]" -} diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt index 781cb335..2f17694f 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt @@ -22,10 +22,12 @@ package org.opendc.trace.wtf +import org.apache.avro.generic.GenericRecord +import org.opendc.trace.* +import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat -import java.net.URL -import java.nio.file.Paths -import kotlin.io.path.exists +import org.opendc.trace.util.parquet.LocalParquetReader +import java.nio.file.Path /** * A [TraceFormat] implementation for the Workflow Trace Format (WTF). @@ -33,9 +35,36 @@ import kotlin.io.path.exists public class WtfTraceFormat : TraceFormat { override val name: String = "wtf" - override fun open(url: URL): WtfTrace { - val path = Paths.get(url.toURI()) - require(path.exists()) { "URL $url does not exist" } - return WtfTrace(path) + override fun getTables(path: Path): List = listOf(TABLE_TASKS) + + override fun getDetails(path: Path, table: String): TableDetails { + return when (table) { + TABLE_TASKS -> TableDetails( + listOf( + TASK_ID, + TASK_WORKFLOW_ID, + TASK_SUBMIT_TIME, + TASK_WAIT_TIME, + TASK_RUNTIME, + TASK_REQ_NCPUS, + TASK_PARENTS, + TASK_CHILDREN, + TASK_GROUP_ID, + TASK_USER_ID + ), + listOf(TASK_SUBMIT_TIME) + ) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newReader(path: Path, table: String): TableReader { + return when (table) { + TABLE_TASKS -> { + val reader = LocalParquetReader(path.resolve("tasks/schema-1.0")) + WtfTaskTableReader(reader) + } + else -> throw IllegalArgumentException("Table $table not supported") + } } } diff --git a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt index b155f265..09c3703a 100644 --- a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt @@ -26,8 +26,7 @@ import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows import org.opendc.trace.* -import java.io.File -import java.net.URL +import java.nio.file.Paths import java.time.Duration import java.time.Instant @@ -35,51 +34,25 @@ import java.time.Instant * Test suite for the [WtfTraceFormat] class. */ class WtfTraceFormatTest { - @Test - fun testTraceExists() { - val input = File("src/test/resources/wtf-trace").toURI().toURL() - val format = WtfTraceFormat() - org.junit.jupiter.api.assertDoesNotThrow { - format.open(input) - } - } - - @Test - fun testTraceDoesNotExists() { - val input = File("src/test/resources/wtf-trace").toURI().toURL() - val format = WtfTraceFormat() - assertThrows { - format.open(URL(input.toString() + "help")) - } - } + private val format = WtfTraceFormat() @Test fun testTables() { - val input = File("src/test/resources/wtf-trace").toURI().toURL() - val format = WtfTraceFormat() - val trace = format.open(input) - - assertEquals(listOf(TABLE_TASKS), trace.tables) + val path = Paths.get("src/test/resources/wtf-trace") + assertEquals(listOf(TABLE_TASKS), format.getTables(path)) } @Test fun testTableExists() { - val input = File("src/test/resources/wtf-trace").toURI().toURL() - val format = WtfTraceFormat() - val table = format.open(input).getTable(TABLE_TASKS) - - assertNotNull(table) - org.junit.jupiter.api.assertDoesNotThrow { table!!.newReader() } + val path = Paths.get("src/test/resources/wtf-trace") + assertDoesNotThrow { format.getDetails(path, TABLE_TASKS) } } @Test fun testTableDoesNotExist() { - val input = File("src/test/resources/wtf-trace").toURI().toURL() - val format = WtfTraceFormat() - val trace = format.open(input) + val path = Paths.get("src/test/resources/wtf-trace") - assertFalse(trace.containsTable("test")) - assertNull(trace.getTable("test")) + assertThrows { format.getDetails(path, "test") } } /** @@ -87,9 +60,8 @@ class WtfTraceFormatTest { */ @Test fun testTableReader() { - val input = File("src/test/resources/wtf-trace") - val trace = WtfTraceFormat().open(input.toURI().toURL()) - val reader = trace.getTable(TABLE_TASKS)!!.newReader() + val path = Paths.get("src/test/resources/wtf-trace") + val reader = format.newReader(path, TABLE_TASKS) assertAll( { assertTrue(reader.nextRow()) }, @@ -111,13 +83,4 @@ class WtfTraceFormatTest { reader.close() } - - @Test - fun testTableReaderPartition() { - val input = File("src/test/resources/wtf-trace").toURI().toURL() - val format = WtfTraceFormat() - val table = format.open(input).getTable(TABLE_TASKS)!! - - assertThrows { table.newReader("test") } - } } diff --git a/opendc-workflow/opendc-workflow-service/build.gradle.kts b/opendc-workflow/opendc-workflow-service/build.gradle.kts index 941202d2..43b64b15 100644 --- a/opendc-workflow/opendc-workflow-service/build.gradle.kts +++ b/opendc-workflow/opendc-workflow-service/build.gradle.kts @@ -39,7 +39,8 @@ dependencies { testImplementation(projects.opendcSimulator.opendcSimulatorCore) testImplementation(projects.opendcCompute.opendcComputeSimulator) - testImplementation(projects.opendcTrace.opendcTraceGwf) + testImplementation(projects.opendcTrace.opendcTraceApi) testImplementation(projects.opendcTelemetry.opendcTelemetrySdk) + testRuntimeOnly(projects.opendcTrace.opendcTraceGwf) testRuntimeOnly(libs.log4j.slf4j) } diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt index 74316437..728dfd99 100644 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt @@ -44,13 +44,14 @@ import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.SimResourceInterpreter import org.opendc.telemetry.sdk.toOtelClock -import org.opendc.trace.gwf.GwfTraceFormat +import org.opendc.trace.Trace import org.opendc.workflow.service.internal.WorkflowServiceImpl import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy +import java.nio.file.Paths import java.time.Duration import java.util.* @@ -105,7 +106,10 @@ internal class WorkflowServiceTest { taskOrderPolicy = SubmissionTimeTaskOrderPolicy(), ) - val trace = GwfTraceFormat().open(checkNotNull(WorkflowServiceTest::class.java.getResource("/trace.gwf"))) + val trace = Trace.open( + Paths.get(checkNotNull(WorkflowServiceTest::class.java.getResource("/trace.gwf")).toURI()), + format = "gwf" + ) val replayer = TraceReplayer(trace) replayer.replay(clock, scheduler) -- cgit v1.2.3 From 68ef3700ed2f69bcf0118bb69eda71e6b1f4d54f Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 21 Sep 2021 11:34:34 +0200 Subject: feat(trace): Add support for writing traces This change adds a new API for writing traces in a trace format. Currently, writing is only supported by the OpenDC VM format, but over time the other formats will also have support for writing added. --- .../opendc-compute-workload/build.gradle.kts | 2 +- .../opendc/compute/workload/ComputeSchedulers.kt | 86 ++++++++++++ .../compute/workload/ComputeWorkloadLoader.kt | 6 +- .../opendc/compute/workload/ComputeWorkloads.kt | 2 +- .../workload/internal/TraceComputeWorkload.kt | 4 +- .../opendc-experiments-capelin/build.gradle.kts | 4 +- .../org/opendc/experiments/capelin/Portfolio.kt | 2 +- .../experiments/capelin/util/ComputeSchedulers.kt | 86 ------------ .../src/main/kotlin/org/opendc/trace/Table.kt | 7 + .../main/kotlin/org/opendc/trace/TableWriter.kt | 151 +++++++++++++++++++++ .../src/main/kotlin/org/opendc/trace/Trace.kt | 30 +++- .../kotlin/org/opendc/trace/internal/TableImpl.kt | 3 + .../kotlin/org/opendc/trace/spi/TraceFormat.kt | 21 +++ .../org/opendc/trace/azure/AzureTraceFormat.kt | 8 ++ .../trace/bitbrains/BitbrainsExTraceFormat.kt | 8 ++ .../opendc/trace/bitbrains/BitbrainsTraceFormat.kt | 8 ++ .../kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt | 8 ++ .../trace/opendc/OdcVmResourceStateTableWriter.kt | 123 +++++++++++++++++ .../trace/opendc/OdcVmResourceTableWriter.kt | 106 +++++++++++++++ .../org/opendc/trace/opendc/OdcVmTraceFormat.kt | 43 ++++++ .../kotlin/org/opendc/trace/swf/SwfTraceFormat.kt | 8 ++ opendc-trace/opendc-trace-tools/build.gradle.kts | 11 +- .../org/opendc/trace/tools/TraceConverter.kt | 82 +++++------ .../opendc/trace/wfformat/WfFormatTraceFormat.kt | 8 ++ .../kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt | 8 ++ opendc-web/opendc-web-runner/build.gradle.kts | 4 +- .../src/main/kotlin/org/opendc/web/runner/Main.kt | 6 +- 27 files changed, 677 insertions(+), 158 deletions(-) create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeSchedulers.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeSchedulers.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt create mode 100644 opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt create mode 100644 opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt diff --git a/opendc-compute/opendc-compute-workload/build.gradle.kts b/opendc-compute/opendc-compute-workload/build.gradle.kts index e82cf203..28a5e1da 100644 --- a/opendc-compute/opendc-compute-workload/build.gradle.kts +++ b/opendc-compute/opendc-compute-workload/build.gradle.kts @@ -32,7 +32,7 @@ dependencies { api(platform(projects.opendcPlatform)) api(projects.opendcCompute.opendcComputeSimulator) - implementation(projects.opendcTrace.opendcTraceOpendc) + implementation(projects.opendcTrace.opendcTraceApi) implementation(projects.opendcTrace.opendcTraceParquet) implementation(projects.opendcSimulator.opendcSimulatorCore) implementation(projects.opendcSimulator.opendcSimulatorCompute) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeSchedulers.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeSchedulers.kt new file mode 100644 index 00000000..c94f30e4 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeSchedulers.kt @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("ComputeSchedulers") +package org.opendc.compute.workload + +import org.opendc.compute.service.scheduler.ComputeScheduler +import org.opendc.compute.service.scheduler.FilterScheduler +import org.opendc.compute.service.scheduler.ReplayScheduler +import org.opendc.compute.service.scheduler.filters.ComputeFilter +import org.opendc.compute.service.scheduler.filters.RamFilter +import org.opendc.compute.service.scheduler.filters.VCpuFilter +import org.opendc.compute.service.scheduler.weights.CoreRamWeigher +import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher +import org.opendc.compute.service.scheduler.weights.RamWeigher +import org.opendc.compute.service.scheduler.weights.VCpuWeigher +import java.util.* + +/** + * Create a [ComputeScheduler] for the experiment. + */ +public fun createComputeScheduler(name: String, seeder: Random, placements: Map = emptyMap()): ComputeScheduler { + val cpuAllocationRatio = 16.0 + val ramAllocationRatio = 1.5 + return when (name) { + "mem" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(RamWeigher(multiplier = 1.0)) + ) + "mem-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(RamWeigher(multiplier = -1.0)) + ) + "core-mem" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(CoreRamWeigher(multiplier = 1.0)) + ) + "core-mem-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(CoreRamWeigher(multiplier = -1.0)) + ) + "active-servers" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(InstanceCountWeigher(multiplier = -1.0)) + ) + "active-servers-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(InstanceCountWeigher(multiplier = 1.0)) + ) + "provisioned-cores" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = 1.0)) + ) + "provisioned-cores-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = -1.0)) + ) + "random" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = emptyList(), + subsetSize = Int.MAX_VALUE, + random = Random(seeder.nextLong()) + ) + "replay" -> ReplayScheduler(placements) + else -> throw IllegalArgumentException("Unknown policy $name") + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt index 6dba41e6..7c579e39 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt @@ -145,15 +145,15 @@ public class ComputeWorkloadLoader(private val baseDir: File) { } /** - * Load the trace with the specified [name]. + * Load the trace with the specified [name] and [format]. */ - public fun get(name: String): List { + public fun get(name: String, format: String): List { return cache.computeIfAbsent(name) { val path = baseDir.resolve(it) logger.info { "Loading trace $it at $path" } - val trace = Trace.open(path, format = "opendc-vm") + val trace = Trace.open(path, format) val fragments = parseFragments(trace) parseMeta(trace, fragments) } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt index f58ce587..2f4935ca 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt @@ -31,7 +31,7 @@ import org.opendc.compute.workload.internal.TraceComputeWorkload /** * Construct a workload from a trace. */ -public fun trace(name: String): ComputeWorkload = TraceComputeWorkload(name) +public fun trace(name: String, format: String = "opendc-vm"): ComputeWorkload = TraceComputeWorkload(name, format) /** * Construct a composite workload with the specified fractions. diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt index d657ff01..c20cb8f3 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt @@ -30,8 +30,8 @@ import java.util.* /** * A [ComputeWorkload] from a trace. */ -internal class TraceComputeWorkload(val name: String) : ComputeWorkload { +internal class TraceComputeWorkload(val name: String, val format: String) : ComputeWorkload { override fun resolve(loader: ComputeWorkloadLoader, random: Random): List { - return loader.get(name) + return loader.get(name, format) } } diff --git a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts index 4bcbaf61..23a3e4a7 100644 --- a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts @@ -33,8 +33,6 @@ dependencies { api(projects.opendcHarness.opendcHarnessApi) api(projects.opendcCompute.opendcComputeWorkload) - implementation(projects.opendcTrace.opendcTraceParquet) - implementation(projects.opendcTrace.opendcTraceBitbrains) implementation(projects.opendcSimulator.opendcSimulatorCore) implementation(projects.opendcSimulator.opendcSimulatorCompute) implementation(projects.opendcCompute.opendcComputeSimulator) @@ -49,5 +47,7 @@ dependencies { implementation(kotlin("reflect")) implementation(libs.opentelemetry.semconv) + runtimeOnly(projects.opendcTrace.opendcTraceOpendc) + testImplementation(libs.log4j.slf4j) } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 630b76c4..2201a6b4 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -26,6 +26,7 @@ import com.typesafe.config.ConfigFactory import mu.KotlinLogging import org.opendc.compute.workload.ComputeWorkloadLoader import org.opendc.compute.workload.ComputeWorkloadRunner +import org.opendc.compute.workload.createComputeScheduler import org.opendc.compute.workload.export.parquet.ParquetExportMonitor import org.opendc.compute.workload.grid5000 import org.opendc.compute.workload.topology.apply @@ -34,7 +35,6 @@ import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload import org.opendc.experiments.capelin.topology.clusterTopology -import org.opendc.experiments.capelin.util.createComputeScheduler import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeSchedulers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeSchedulers.kt deleted file mode 100644 index 3b7c3f0f..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeSchedulers.kt +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -@file:JvmName("ComputeSchedulers") -package org.opendc.experiments.capelin.util - -import org.opendc.compute.service.scheduler.ComputeScheduler -import org.opendc.compute.service.scheduler.FilterScheduler -import org.opendc.compute.service.scheduler.ReplayScheduler -import org.opendc.compute.service.scheduler.filters.ComputeFilter -import org.opendc.compute.service.scheduler.filters.RamFilter -import org.opendc.compute.service.scheduler.filters.VCpuFilter -import org.opendc.compute.service.scheduler.weights.CoreRamWeigher -import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher -import org.opendc.compute.service.scheduler.weights.RamWeigher -import org.opendc.compute.service.scheduler.weights.VCpuWeigher -import java.util.* - -/** - * Create a [ComputeScheduler] for the experiment. - */ -fun createComputeScheduler(allocationPolicy: String, seeder: Random, vmPlacements: Map = emptyMap()): ComputeScheduler { - val cpuAllocationRatio = 16.0 - val ramAllocationRatio = 1.5 - return when (allocationPolicy) { - "mem" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(RamWeigher(multiplier = 1.0)) - ) - "mem-inv" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(RamWeigher(multiplier = -1.0)) - ) - "core-mem" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(CoreRamWeigher(multiplier = 1.0)) - ) - "core-mem-inv" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(CoreRamWeigher(multiplier = -1.0)) - ) - "active-servers" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(InstanceCountWeigher(multiplier = -1.0)) - ) - "active-servers-inv" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(InstanceCountWeigher(multiplier = 1.0)) - ) - "provisioned-cores" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = 1.0)) - ) - "provisioned-cores-inv" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = -1.0)) - ) - "random" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = emptyList(), - subsetSize = Int.MAX_VALUE, - random = Random(seeder.nextLong()) - ) - "replay" -> ReplayScheduler(vmPlacements) - else -> throw IllegalArgumentException("Unknown policy $allocationPolicy") - } -} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt index 031ee269..b0181cbc 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt @@ -45,4 +45,11 @@ public interface Table { * Open a [TableReader] for this table. */ public fun newReader(): TableReader + + /** + * Open a [TableWriter] for this table. + * + * @throws UnsupportedOperationException if writing is not supported by the table. + */ + public fun newWriter(): TableWriter } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt new file mode 100644 index 00000000..423ce86a --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt @@ -0,0 +1,151 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace + +/** + * Base class for writing workload traces. + */ +public interface TableWriter : AutoCloseable { + /** + * Start a new row in the table. + */ + public fun startRow() + + /** + * Flush the current row to the table. + */ + public fun endRow() + + /** + * Resolve the index of the specified [column] for this writer. + * + * @param column The column to lookup. + * @return The zero-based index of the column or a negative value if the column is not present in this table. + */ + public fun resolve(column: TableColumn<*>): Int + + /** + * Determine whether the [TableReader] supports the specified [column]. + */ + public fun hasColumn(column: TableColumn<*>): Boolean = resolve(column) >= 0 + + /** + * Set [column] to [value]. + * + * @param index The zero-based index of the column to set the value for. + * @param value The value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun set(index: Int, value: Any) + + /** + * Set [column] to boolean [value]. + * + * @param index The zero-based index of the column to set the value for. + * @param value The boolean value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setBoolean(index: Int, value: Boolean) + + /** + * Set [column] to integer [value]. + * + * @param index The zero-based index of the column to set the value for. + * @param value The integer value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setInt(index: Int, value: Int) + + /** + * Set [column] to long [value]. + * + * @param index The zero-based index of the column to set the value for. + * @param value The long value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setLong(index: Int, value: Long) + + /** + * Set [column] to double [value]. + * + * @param index The zero-based index of the column to set the value for. + * @param value The double value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setDouble(index: Int, value: Double) + + /** + * Set [column] to [value]. + * + * @param column The column to set the value for. + * @param value The value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun set(column: TableColumn, value: T): Unit = set(resolve(column), value) + + /** + * Set [column] to boolean [value]. + * + * @param column The column to set the value for. + * @param value The boolean value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setBoolean(column: TableColumn, value: Boolean): Unit = setBoolean(resolve(column), value) + + /** + * Set [column] to integer [value]. + * + * @param column The column to set the value for. + * @param value The integer value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setInt(column: TableColumn, value: Int): Unit = setInt(resolve(column), value) + + /** + * Set [column] to long [value]. + * + * @param column The column to set the value for. + * @param value The long value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setLong(column: TableColumn, value: Long): Unit = setLong(resolve(column), value) + + /** + * Set [column] to double [value]. + * + * @param column The column to set the value for. + * @param value The double value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setDouble(column: TableColumn, value: Double): Unit = setDouble(resolve(column), value) + + /** + * Flush any buffered content to the underlying target. + */ + public fun flush() + + /** + * Close the writer so that no more rows can be written. + */ + public override fun close() +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt index 6d0014cb..64e8f272 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt @@ -51,21 +51,45 @@ public interface Trace { * Open a [Trace] at the specified [path] in the given [format]. * * @param path The path to the trace. + * @param format The format of the trace to open. * @throws IllegalArgumentException if [format] is not supported. */ - public fun open(path: File, format: String): Trace { - return open(path.toPath(), format) - } + @JvmStatic + public fun open(path: File, format: String): Trace = open(path.toPath(), format) /** * Open a [Trace] at the specified [path] in the given [format]. * * @param path The [Path] to the trace. + * @param format The format of the trace to open. * @throws IllegalArgumentException if [format] is not supported. */ + @JvmStatic public fun open(path: Path, format: String): Trace { val provider = requireNotNull(TraceFormat.byName(format)) { "Unknown format $format" } return TraceImpl(provider, path) } + + /** + * Create a [Trace] at the specified [path] in the given [format]. + * + * @param path The [Path] to the trace. + * @param format The format of the trace to create. + */ + @JvmStatic + public fun create(path: File, format: String): Trace = create(path.toPath(), format) + + /** + * Create a [Trace] at the specified [path] in the given [format]. + * + * @param path The [Path] to the trace. + * @param format The format of the trace to create. + */ + @JvmStatic + public fun create(path: Path, format: String): Trace { + val provider = requireNotNull(TraceFormat.byName(format)) { "Unknown format $format" } + provider.create(path) + return TraceImpl(provider, path) + } } } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt index fd0a0f04..24551edb 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt @@ -25,6 +25,7 @@ package org.opendc.trace.internal import org.opendc.trace.Table import org.opendc.trace.TableColumn import org.opendc.trace.TableReader +import org.opendc.trace.TableWriter import java.util.* /** @@ -44,6 +45,8 @@ internal class TableImpl(val trace: TraceImpl, override val name: String) : Tabl override fun newReader(): TableReader = trace.format.newReader(trace.path, name) + override fun newWriter(): TableWriter = trace.format.newWriter(trace.path, name) + override fun toString(): String = "Table[name=$name]" override fun hashCode(): Int = Objects.hash(trace, name) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt index e04dd948..f2e610db 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt @@ -23,6 +23,7 @@ package org.opendc.trace.spi import org.opendc.trace.TableReader +import org.opendc.trace.TableWriter import java.nio.file.Path import java.util.* @@ -35,6 +36,15 @@ public interface TraceFormat { */ public val name: String + /** + * Construct an empty trace at [path]. + * + * @param path The path where to create the empty trace. + * @throws IllegalArgumentException If [path] is invalid. + * @throws UnsupportedOperationException If the table does not support trace creation. + */ + public fun create(path: Path) + /** * Return the name of the tables available in the trace at the specified [path]. * @@ -63,6 +73,17 @@ public interface TraceFormat { */ public fun newReader(path: Path, table: String): TableReader + /** + * Open a [TableWriter] for the specified [table]. + * + * @param path The path to the trace to open. + * @param table The name of the table to open a [TableWriter] for. + * @throws IllegalArgumentException If [table] does not exist. + * @throws UnsupportedOperationException If the format does not support writing. + * @return A [TableWriter] instance for the table. + */ + public fun newWriter(path: Path, table: String): TableWriter + /** * A helper object for resolving providers. */ diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt index 77af0d81..253c7057 100644 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt +++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt @@ -50,6 +50,10 @@ public class AzureTraceFormat : TraceFormat { .enable(CsvParser.Feature.ALLOW_COMMENTS) .enable(CsvParser.Feature.TRIM_SPACES) + override fun create(path: Path) { + throw UnsupportedOperationException("Writing not supported for this format") + } + override fun getTables(path: Path): List = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) override fun getDetails(path: Path, table: String): TableDetails { @@ -83,6 +87,10 @@ public class AzureTraceFormat : TraceFormat { } } + override fun newWriter(path: Path, table: String): TableWriter { + throw UnsupportedOperationException("Writing not supported for this format") + } + /** * Construct a [TableReader] for reading over all VM CPU readings. */ diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt index 080b73de..20222c8a 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt @@ -42,6 +42,10 @@ public class BitbrainsExTraceFormat : TraceFormat { */ override val name: String = "bitbrains-ex" + override fun create(path: Path) { + throw UnsupportedOperationException("Writing not supported for this format") + } + override fun getTables(path: Path): List = listOf(TABLE_RESOURCE_STATES) override fun getDetails(path: Path, table: String): TableDetails { @@ -74,6 +78,10 @@ public class BitbrainsExTraceFormat : TraceFormat { } } + override fun newWriter(path: Path, table: String): TableWriter { + throw UnsupportedOperationException("Writing not supported for this format") + } + /** * Construct a [TableReader] for reading over all resource state partitions. */ diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt index 1573726f..3885c931 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt @@ -50,6 +50,10 @@ public class BitbrainsTraceFormat : TraceFormat { .enable(CsvParser.Feature.ALLOW_COMMENTS) .enable(CsvParser.Feature.TRIM_SPACES) + override fun create(path: Path) { + throw UnsupportedOperationException("Writing not supported for this format") + } + override fun getTables(path: Path): List = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) override fun getDetails(path: Path, table: String): TableDetails { @@ -90,6 +94,10 @@ public class BitbrainsTraceFormat : TraceFormat { } } + override fun newWriter(path: Path, table: String): TableWriter { + throw UnsupportedOperationException("Writing not supported for this format") + } + /** * Construct a [TableReader] for reading over all resource state partitions. */ diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt index 0f7b9d6e..d4287420 100644 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt @@ -45,6 +45,10 @@ public class GwfTraceFormat : TraceFormat { .enable(CsvParser.Feature.ALLOW_COMMENTS) .enable(CsvParser.Feature.TRIM_SPACES) + override fun create(path: Path) { + throw UnsupportedOperationException("Writing not supported for this format") + } + override fun getTables(path: Path): List = listOf(TABLE_TASKS) override fun getDetails(path: Path, table: String): TableDetails { @@ -71,4 +75,8 @@ public class GwfTraceFormat : TraceFormat { else -> throw IllegalArgumentException("Table $table not supported") } } + + override fun newWriter(path: Path, table: String): TableWriter { + throw UnsupportedOperationException("Writing not supported for this format") + } } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt new file mode 100644 index 00000000..15a8cb85 --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.opendc + +import org.apache.avro.Schema +import org.apache.avro.generic.GenericRecord +import org.apache.avro.generic.GenericRecordBuilder +import org.apache.parquet.hadoop.ParquetWriter +import org.opendc.trace.* +import java.time.Duration +import java.time.Instant + +/** + * A [TableWriter] implementation for the OpenDC virtual machine trace format. + */ +internal class OdcVmResourceStateTableWriter( + private val writer: ParquetWriter, + private val schema: Schema +) : TableWriter { + /** + * The current builder for the record that is being written. + */ + private var builder: GenericRecordBuilder? = null + + /** + * The fields belonging to the resource state schema. + */ + private val fields = schema.fields + + override fun startRow() { + builder = GenericRecordBuilder(schema) + } + + override fun endRow() { + val builder = checkNotNull(builder) { "No active row" } + this.builder = null + + val record = builder.build() + val id = record[COL_ID] as String + val timestamp = record[COL_TIMESTAMP] as Long + + check(lastId != id || timestamp >= lastTimestamp) { "Records need to be ordered by (id, timestamp)" } + + writer.write(builder.build()) + + lastId = id + lastTimestamp = timestamp + } + + override fun resolve(column: TableColumn<*>): Int { + val schema = schema + return when (column) { + RESOURCE_ID -> schema.getField("id").pos() + RESOURCE_STATE_TIMESTAMP -> (schema.getField("timestamp") ?: schema.getField("time")).pos() + RESOURCE_STATE_DURATION -> schema.getField("duration").pos() + RESOURCE_CPU_COUNT -> (schema.getField("cpu_count") ?: schema.getField("cores")).pos() + RESOURCE_STATE_CPU_USAGE -> (schema.getField("cpu_usage") ?: schema.getField("cpuUsage")).pos() + else -> -1 + } + } + + override fun set(index: Int, value: Any) { + val builder = checkNotNull(builder) { "No active row" } + + builder.set( + fields[index], + when (index) { + COL_TIMESTAMP -> (value as Instant).toEpochMilli() + COL_DURATION -> (value as Duration).toMillis() + else -> value + } + ) + } + + override fun setBoolean(index: Int, value: Boolean) = set(index, value) + + override fun setInt(index: Int, value: Int) = set(index, value) + + override fun setLong(index: Int, value: Long) = set(index, value) + + override fun setDouble(index: Int, value: Double) = set(index, value) + + override fun flush() { + // Not available + } + + override fun close() { + writer.close() + } + + /** + * Last column values that are used to check for correct partitioning. + */ + private var lastId: String? = null + private var lastTimestamp: Long = Long.MIN_VALUE + + /** + * Columns with special behavior. + */ + private val COL_ID = resolve(RESOURCE_ID) + private val COL_TIMESTAMP = resolve(RESOURCE_STATE_TIMESTAMP) + private val COL_DURATION = resolve(RESOURCE_STATE_DURATION) +} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt new file mode 100644 index 00000000..9cc6ca7d --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt @@ -0,0 +1,106 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.opendc + +import org.apache.avro.Schema +import org.apache.avro.generic.GenericRecord +import org.apache.avro.generic.GenericRecordBuilder +import org.apache.parquet.hadoop.ParquetWriter +import org.opendc.trace.* +import java.time.Instant +import kotlin.math.roundToLong + +/** + * A [TableWriter] implementation for the OpenDC virtual machine trace format. + */ +internal class OdcVmResourceTableWriter( + private val writer: ParquetWriter, + private val schema: Schema +) : TableWriter { + /** + * The current builder for the record that is being written. + */ + private var builder: GenericRecordBuilder? = null + + /** + * The fields belonging to the resource schema. + */ + private val fields = schema.fields + + override fun startRow() { + builder = GenericRecordBuilder(schema) + } + + override fun endRow() { + val builder = checkNotNull(builder) { "No active row" } + this.builder = null + writer.write(builder.build()) + } + + override fun resolve(column: TableColumn<*>): Int { + val schema = schema + return when (column) { + RESOURCE_ID -> schema.getField("id").pos() + RESOURCE_START_TIME -> (schema.getField("start_time") ?: schema.getField("submissionTime")).pos() + RESOURCE_STOP_TIME -> (schema.getField("stop_time") ?: schema.getField("endTime")).pos() + RESOURCE_CPU_COUNT -> (schema.getField("cpu_count") ?: schema.getField("maxCores")).pos() + RESOURCE_MEM_CAPACITY -> (schema.getField("mem_capacity") ?: schema.getField("requiredMemory")).pos() + else -> -1 + } + } + + override fun set(index: Int, value: Any) { + val builder = checkNotNull(builder) { "No active row" } + builder.set( + fields[index], + when (index) { + COL_START_TIME, COL_STOP_TIME -> (value as Instant).toEpochMilli() + COL_MEM_CAPACITY -> (value as Double).roundToLong() + else -> value + } + ) + } + + override fun setBoolean(index: Int, value: Boolean) = set(index, value) + + override fun setInt(index: Int, value: Int) = set(index, value) + + override fun setLong(index: Int, value: Long) = set(index, value) + + override fun setDouble(index: Int, value: Double) = set(index, value) + + override fun flush() { + // Not available + } + + override fun close() { + writer.close() + } + + /** + * Columns with special behavior. + */ + private val COL_START_TIME = resolve(RESOURCE_START_TIME) + private val COL_STOP_TIME = resolve(RESOURCE_STOP_TIME) + private val COL_MEM_CAPACITY = resolve(RESOURCE_MEM_CAPACITY) +} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt index 29818147..9b32f8fd 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt @@ -25,11 +25,16 @@ package org.opendc.trace.opendc import org.apache.avro.Schema import org.apache.avro.SchemaBuilder import org.apache.avro.generic.GenericRecord +import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.ParquetFileWriter +import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.opendc.trace.* import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat +import org.opendc.trace.util.parquet.LocalOutputFile import org.opendc.trace.util.parquet.LocalParquetReader import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA +import java.nio.file.Files import java.nio.file.Path /** @@ -41,6 +46,18 @@ public class OdcVmTraceFormat : TraceFormat { */ override val name: String = "opendc-vm" + override fun create(path: Path) { + // Construct directory containing the trace files + Files.createDirectory(path) + + val tables = getTables(path) + + for (table in tables) { + val writer = newWriter(path, table) + writer.close() + } + } + override fun getTables(path: Path): List = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) override fun getDetails(path: Path, table: String): TableDetails { @@ -82,6 +99,32 @@ public class OdcVmTraceFormat : TraceFormat { } } + override fun newWriter(path: Path, table: String): TableWriter { + return when (table) { + TABLE_RESOURCES -> { + val schema = RESOURCES_SCHEMA + val writer = AvroParquetWriter.builder(LocalOutputFile(path.resolve("meta.parquet"))) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.ZSTD) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .build() + OdcVmResourceTableWriter(writer, schema) + } + TABLE_RESOURCE_STATES -> { + val schema = RESOURCE_STATES_SCHEMA + val writer = AvroParquetWriter.builder(LocalOutputFile(path.resolve("trace.parquet"))) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.ZSTD) + .withDictionaryEncoding("id", true) + .withBloomFilterEnabled("id", true) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .build() + OdcVmResourceStateTableWriter(writer, schema) + } + else -> throw IllegalArgumentException("Table $table not supported") + } + } + public companion object { /** * Schema for the resources table in the trace. diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt index 4cb7e49e..1fd076d5 100644 --- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt +++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt @@ -36,6 +36,10 @@ import kotlin.io.path.bufferedReader public class SwfTraceFormat : TraceFormat { override val name: String = "swf" + override fun create(path: Path) { + throw UnsupportedOperationException("Writing not supported for this format") + } + override fun getTables(path: Path): List = listOf(TABLE_TASKS) override fun getDetails(path: Path, table: String): TableDetails { @@ -65,4 +69,8 @@ public class SwfTraceFormat : TraceFormat { else -> throw IllegalArgumentException("Table $table not supported") } } + + override fun newWriter(path: Path, table: String): TableWriter { + throw UnsupportedOperationException("Writing not supported for this format") + } } diff --git a/opendc-trace/opendc-trace-tools/build.gradle.kts b/opendc-trace/opendc-trace-tools/build.gradle.kts index 35190dba..14a0fc7c 100644 --- a/opendc-trace/opendc-trace-tools/build.gradle.kts +++ b/opendc-trace/opendc-trace-tools/build.gradle.kts @@ -29,19 +29,18 @@ plugins { } application { - mainClass.set("org.opendc.trace.tools.TraceConverterKt") + mainClass.set("org.opendc.trace.tools.TraceConverter") } dependencies { api(platform(projects.opendcPlatform)) - implementation(projects.opendcTrace.opendcTraceParquet) - implementation(projects.opendcTrace.opendcTraceOpendc) - implementation(projects.opendcTrace.opendcTraceAzure) - implementation(projects.opendcTrace.opendcTraceBitbrains) - + implementation(projects.opendcTrace.opendcTraceApi) implementation(libs.kotlin.logging) implementation(libs.clikt) + runtimeOnly(projects.opendcTrace.opendcTraceOpendc) + runtimeOnly(projects.opendcTrace.opendcTraceBitbrains) + runtimeOnly(projects.opendcTrace.opendcTraceAzure) runtimeOnly(libs.log4j.slf4j) } diff --git a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt index cd5d287f..6fad43be 100644 --- a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt +++ b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt @@ -20,6 +20,7 @@ * SOFTWARE. */ +@file:JvmName("TraceConverter") package org.opendc.trace.tools import com.github.ajalt.clikt.core.CliktCommand @@ -29,25 +30,19 @@ import com.github.ajalt.clikt.parameters.groups.cooccurring import com.github.ajalt.clikt.parameters.options.* import com.github.ajalt.clikt.parameters.types.* import mu.KotlinLogging -import org.apache.avro.generic.GenericData -import org.apache.avro.generic.GenericRecordBuilder -import org.apache.parquet.avro.AvroParquetWriter -import org.apache.parquet.hadoop.ParquetWriter -import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.opendc.trace.* -import org.opendc.trace.opendc.OdcVmTraceFormat -import org.opendc.trace.util.parquet.LocalOutputFile import java.io.File +import java.time.Duration +import java.time.Instant import java.util.* import kotlin.math.abs import kotlin.math.max import kotlin.math.min -import kotlin.math.roundToLong /** * A script to convert a trace in text format into a Parquet trace. */ -public fun main(args: Array): Unit = TraceConverterCli().main(args) +fun main(args: Array): Unit = TraceConverterCli().main(args) /** * Represents the command for converting traces @@ -74,10 +69,15 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { /** * The input format of the trace. */ - private val format by option("-f", "--format", help = "input format of trace") - .choice("bitbrains-ex", "bitbrains", "azure") + private val inputFormat by option("-f", "--input-format", help = "format of output trace") .required() + /** + * The format of the output trace. + */ + private val outputFormat by option("--output-format", help = "format of output trace") + .default("opendc-vm") + /** * The sampling options. */ @@ -94,17 +94,14 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { traceParquet.delete() } - val trace = Trace.open(input, format = format) + val inputTrace = Trace.open(input, format = inputFormat) + val outputTrace = Trace.create(output, format = outputFormat) logger.info { "Building resources table" } - val metaWriter = AvroParquetWriter.builder(LocalOutputFile(metaParquet)) - .withSchema(OdcVmTraceFormat.RESOURCES_SCHEMA) - .withCompressionCodec(CompressionCodecName.ZSTD) - .enablePageWriteChecksum() - .build() + val metaWriter = outputTrace.getTable(TABLE_RESOURCES)!!.newWriter() - val selectedVms = metaWriter.use { convertResources(trace, it) } + val selectedVms = metaWriter.use { convertResources(inputTrace, it) } if (selectedVms.isEmpty()) { logger.warn { "No VMs selected" } @@ -114,23 +111,16 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { logger.info { "Wrote ${selectedVms.size} rows" } logger.info { "Building resource states table" } - val writer = AvroParquetWriter.builder(LocalOutputFile(traceParquet)) - .withSchema(OdcVmTraceFormat.RESOURCE_STATES_SCHEMA) - .withCompressionCodec(CompressionCodecName.ZSTD) - .withDictionaryEncoding("id", true) - .withBloomFilterEnabled("id", true) - .withBloomFilterNDV("id", selectedVms.size.toLong()) - .enableValidation() - .build() + val writer = outputTrace.getTable(TABLE_RESOURCE_STATES)!!.newWriter() - val statesCount = writer.use { convertResourceStates(trace, it, selectedVms) } + val statesCount = writer.use { convertResourceStates(inputTrace, it, selectedVms) } logger.info { "Wrote $statesCount rows" } } /** * Convert the resources table for the trace. */ - private fun convertResources(trace: Trace, writer: ParquetWriter): Set { + private fun convertResources(trace: Trace, writer: TableWriter): Set { val random = samplingOptions?.let { Random(it.seed) } val samplingFraction = samplingOptions?.fraction ?: 1.0 val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() @@ -168,18 +158,16 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { continue } - val builder = GenericRecordBuilder(OdcVmTraceFormat.RESOURCES_SCHEMA) - - builder["id"] = id - builder["start_time"] = startTime - builder["stop_time"] = stopTime - builder["cpu_count"] = numCpus - builder["mem_capacity"] = max(memCapacity, memUsage).roundToLong() - logger.info { "Selecting VM $id" } - - writer.write(builder.build()) selectedVms.add(id) + + writer.startRow() + writer.set(RESOURCE_ID, id) + writer.set(RESOURCE_START_TIME, Instant.ofEpochMilli(startTime)) + writer.set(RESOURCE_STOP_TIME, Instant.ofEpochMilli(stopTime)) + writer.setInt(RESOURCE_CPU_COUNT, numCpus) + writer.setDouble(RESOURCE_MEM_CAPACITY, max(memCapacity, memUsage)) + writer.endRow() } return selectedVms @@ -188,7 +176,7 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { /** * Convert the resource states table for the trace. */ - private fun convertResourceStates(trace: Trace, writer: ParquetWriter, selectedVms: Set): Int { + private fun convertResourceStates(trace: Trace, writer: TableWriter, selectedVms: Set): Int { val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() var hasNextRow = reader.nextRow() @@ -231,15 +219,13 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { cpuCount == reader.getInt(RESOURCE_CPU_COUNT) } while (shouldContinue) - val builder = GenericRecordBuilder(OdcVmTraceFormat.RESOURCE_STATES_SCHEMA) - - builder["id"] = id - builder["timestamp"] = startTimestamp - builder["duration"] = duration - builder["cpu_count"] = cpuCount - builder["cpu_usage"] = cpuUsage - - writer.write(builder.build()) + writer.startRow() + writer.set(RESOURCE_ID, id) + writer.set(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(startTimestamp)) + writer.set(RESOURCE_STATE_DURATION, Duration.ofMillis(duration)) + writer.setInt(RESOURCE_CPU_COUNT, cpuCount) + writer.setDouble(RESOURCE_STATE_CPU_USAGE, cpuUsage) + writer.endRow() count++ diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt index 825c3d6d..c75e3cbb 100644 --- a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt +++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt @@ -39,6 +39,10 @@ public class WfFormatTraceFormat : TraceFormat { override val name: String = "wfformat" + override fun create(path: Path) { + throw UnsupportedOperationException("Writing not supported for this format") + } + override fun getTables(path: Path): List = listOf(TABLE_TASKS) override fun getDetails(path: Path, table: String): TableDetails { @@ -64,4 +68,8 @@ public class WfFormatTraceFormat : TraceFormat { else -> throw IllegalArgumentException("Table $table not supported") } } + + override fun newWriter(path: Path, table: String): TableWriter { + throw UnsupportedOperationException("Writing not supported for this format") + } } diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt index 2f17694f..ef88d295 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt @@ -35,6 +35,10 @@ import java.nio.file.Path public class WtfTraceFormat : TraceFormat { override val name: String = "wtf" + override fun create(path: Path) { + throw UnsupportedOperationException("Writing not supported for this format") + } + override fun getTables(path: Path): List = listOf(TABLE_TASKS) override fun getDetails(path: Path, table: String): TableDetails { @@ -67,4 +71,8 @@ public class WtfTraceFormat : TraceFormat { else -> throw IllegalArgumentException("Table $table not supported") } } + + override fun newWriter(path: Path, table: String): TableWriter { + throw UnsupportedOperationException("Writing not supported for this format") + } } diff --git a/opendc-web/opendc-web-runner/build.gradle.kts b/opendc-web/opendc-web-runner/build.gradle.kts index bfbb1687..810f512f 100644 --- a/opendc-web/opendc-web-runner/build.gradle.kts +++ b/opendc-web/opendc-web-runner/build.gradle.kts @@ -36,10 +36,11 @@ application { dependencies { api(platform(projects.opendcPlatform)) implementation(projects.opendcCompute.opendcComputeSimulator) - implementation(projects.opendcExperiments.opendcExperimentsCapelin) + implementation(projects.opendcCompute.opendcComputeWorkload) implementation(projects.opendcSimulator.opendcSimulatorCore) implementation(projects.opendcTelemetry.opendcTelemetrySdk) implementation(projects.opendcTelemetry.opendcTelemetryCompute) + implementation(projects.opendcTrace.opendcTraceApi) implementation(libs.kotlin.logging) implementation(libs.clikt) @@ -50,6 +51,7 @@ dependencies { implementation(libs.jackson.datatype.jsr310) implementation(kotlin("reflect")) + runtimeOnly(projects.opendcTrace.opendcTraceOpendc) runtimeOnly(libs.log4j.slf4j) testImplementation(libs.ktor.client.mock) diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt index 1b518fee..40a7ea62 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt @@ -33,8 +33,6 @@ import org.opendc.compute.workload.topology.HostSpec import org.opendc.compute.workload.topology.Topology import org.opendc.compute.workload.topology.apply import org.opendc.compute.workload.util.PerformanceInterferenceReader -import org.opendc.experiments.capelin.model.Workload -import org.opendc.experiments.capelin.util.createComputeScheduler import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit @@ -181,7 +179,7 @@ class RunnerCli : CliktCommand(name = "runner") { val operational = scenario.operationalPhenomena val computeScheduler = createComputeScheduler(operational.schedulerName, seeder) - val workload = Workload(workloadName, trace(workloadName).sampleByLoad(workloadFraction)) + val workload = trace(workloadName).sampleByLoad(workloadFraction) val failureModel = if (operational.failuresEnabled) @@ -203,7 +201,7 @@ class RunnerCli : CliktCommand(name = "runner") { // Instantiate the topology onto the simulator simulator.apply(topology) // Run workload trace - simulator.run(workload.source.resolve(workloadLoader, seeder), seeder.nextLong()) + simulator.run(workload.resolve(workloadLoader, seeder), seeder.nextLong()) } finally { simulator.close() metricReader.close() -- cgit v1.2.3