diff options
Diffstat (limited to 'opendc-trace')
12 files changed, 27 insertions, 0 deletions
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt index 6aca2051..164f5084 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt @@ -42,6 +42,11 @@ public interface Table { public val columns: List<TableColumn<*>> /** + * The columns by which the table is partitioned. + */ + public val partitionKeys: List<TableColumn<*>> + + /** * Open a [TableReader] for this table. */ public fun newReader(): TableReader diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt index 8f2f5cc9..285e7216 100644 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt +++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt @@ -53,6 +53,8 @@ internal class AzureResourceStateTable(private val factory: CsvFactory, path: Pa RESOURCE_STATE_CPU_USAGE_PCT ) + override val partitionKeys: List<TableColumn<*>> = listOf(RESOURCE_STATE_TIMESTAMP) + override fun newReader(): TableReader { val it = partitions.iterator() diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTable.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTable.kt index 96ee3158..ff7af172 100644 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTable.kt +++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTable.kt @@ -42,6 +42,8 @@ internal class AzureResourceTable(private val factory: CsvFactory, private val p RESOURCE_MEM_CAPACITY ) + override val partitionKeys: List<TableColumn<*>> = emptyList() + override fun newReader(): TableReader { return AzureResourceTableReader(factory.createParser(path.resolve("vmtable/vmtable.csv").toFile())) } diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt index ab768608..7cb58226 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt @@ -62,6 +62,8 @@ internal class BitbrainsExResourceStateTable(path: Path) : Table { RESOURCE_STATE_DISK_WRITE, ) + override val partitionKeys: List<TableColumn<*>> = listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP) + override fun newReader(): TableReader { val it = partitions.iterator() diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt index 6b6ac9da..7b08b8be 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt @@ -63,6 +63,8 @@ internal class BitbrainsResourceStateTable(private val factory: CsvFactory, path RESOURCE_STATE_NET_TX, ) + override val partitionKeys: List<TableColumn<*>> = listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP) + override fun newReader(): TableReader { val it = partitions.iterator() diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTable.kt index bc4f0b7d..d024af2d 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTable.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTable.kt @@ -49,6 +49,8 @@ internal class BitbrainsResourceTable(private val factory: CsvFactory, path: Pat override val columns: List<TableColumn<*>> = listOf(RESOURCE_ID) + override val partitionKeys: List<TableColumn<*>> = emptyList() + override fun newReader(): TableReader { return BitbrainsResourceTableReader(factory, vms) } diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt index fd7bd068..ca720de4 100644 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt @@ -44,6 +44,8 @@ internal class GwfTaskTable(private val factory: CsvFactory, private val url: UR TASK_PARENTS ) + override val partitionKeys: List<TableColumn<*>> = listOf(TASK_WORKFLOW_ID) + override fun newReader(): TableReader { return GwfTaskTableReader(factory.createParser(url)) } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt index 39613070..caacf192 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt @@ -42,6 +42,8 @@ internal class OdcVmResourceStateTable(private val path: Path) : Table { RESOURCE_STATE_CPU_USAGE, ) + override val partitionKeys: List<TableColumn<*>> = listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP) + override fun newReader(): TableReader { val reader = LocalParquetReader<GenericRecord>(path.resolve("trace.parquet")) return OdcVmResourceStateTableReader(reader) diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt index b1456560..653b28b8 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt @@ -42,6 +42,8 @@ internal class OdcVmResourceTable(private val path: Path) : Table { RESOURCE_MEM_CAPACITY, ) + override val partitionKeys: List<TableColumn<*>> = emptyList() + override fun newReader(): TableReader { val reader = LocalParquetReader<GenericRecord>(path.resolve("meta.parquet")) return OdcVmResourceTableReader(reader) diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt index 7ec0d607..4898779d 100644 --- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt +++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt @@ -47,6 +47,8 @@ internal class SwfTaskTable(private val path: Path) : Table { TASK_USER_ID ) + override val partitionKeys: List<TableColumn<*>> = emptyList() + override fun newReader(): TableReader { val reader = path.bufferedReader() return SwfTaskTableReader(reader) diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt index 7b7f979f..17aeee97 100644 --- a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt +++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt @@ -43,6 +43,8 @@ internal class WfFormatTaskTable(private val factory: JsonFactory, private val p TASK_CHILDREN ) + override val partitionKeys: List<TableColumn<*>> = emptyList() + override fun newReader(): TableReader { val parser = factory.createParser(path.toFile()) return WfFormatTaskTableReader(parser) diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt index 74202718..410bb347 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt @@ -48,6 +48,8 @@ internal class WtfTaskTable(private val path: Path) : Table { TASK_USER_ID ) + override val partitionKeys: List<TableColumn<*>> = listOf(TASK_SUBMIT_TIME) + override fun newReader(): TableReader { val reader = LocalParquetReader<GenericRecord>(path.resolve("tasks/schema-1.0")) return WtfTaskTableReader(reader) |
