diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-09-20 22:04:23 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-09-20 22:04:23 +0200 |
| commit | c7fff03408ee3109d0a39a96c043584a2d8f67ca (patch) | |
| tree | 8c4a877e0f00f14a9091f9c26fdb0e85cad94904 | |
| parent | 140aafdaa711b0fdeacf99b9c7e70b706b8490f4 (diff) | |
refactor(trace): Simplify TraceFormat SPI interface
This change simplifies the TraceFormat SPI interface by reducing the
number of interfaces that implementors need to implement to only
TraceFormat.
42 files changed, 542 insertions, 1462 deletions
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt index ab7f051f..6dba41e6 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt @@ -25,7 +25,6 @@ package org.opendc.compute.workload import mu.KotlinLogging import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.trace.* -import org.opendc.trace.opendc.OdcVmTraceFormat import java.io.File import java.time.Duration import java.time.Instant @@ -45,11 +44,6 @@ public class ComputeWorkloadLoader(private val baseDir: File) { private val logger = KotlinLogging.logger {} /** - * The [OdcVmTraceFormat] instance to load the traces - */ - private val format = OdcVmTraceFormat() - - /** * The cache of workloads. */ private val cache = ConcurrentHashMap<String, List<VirtualMachine>>() @@ -159,7 +153,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) { logger.info { "Loading trace $it at $path" } - val trace = format.open(path.toURI().toURL()) + val trace = Trace.open(path, format = "opendc-vm") val fragments = parseFragments(trace) parseMeta(trace, fragments) } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt index 164f5084..031ee269 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt @@ -32,11 +32,6 @@ public interface Table { public val name: String /** - * A flag to indicate that the table is synthetic (derived from another table). - */ - public val isSynthetic: Boolean - - /** * The list of columns supported in this table. */ public val columns: List<TableColumn<*>> @@ -50,9 +45,4 @@ public interface Table { * Open a [TableReader] for this table. */ public fun newReader(): TableReader - - /** - * Open a [TableReader] for [partition] of the table. - */ - public fun newReader(partition: String): TableReader } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt index 0ae45e86..6d0014cb 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt @@ -22,9 +22,9 @@ package org.opendc.trace +import org.opendc.trace.internal.TraceImpl import org.opendc.trace.spi.TraceFormat import java.io.File -import java.net.URL import java.nio.file.Path /** @@ -48,31 +48,24 @@ public interface Trace { public companion object { /** - * Open a [Trace] at the specified [url] in the given [format]. - * - * @throws IllegalArgumentException if [format] is not supported. - */ - public fun open(url: URL, format: String): Trace { - val provider = requireNotNull(TraceFormat.byName(format)) { "Unknown format $format" } - return provider.open(url) - } - - /** * Open a [Trace] at the specified [path] in the given [format]. * + * @param path The path to the trace. * @throws IllegalArgumentException if [format] is not supported. */ public fun open(path: File, format: String): Trace { - return open(path.toURI().toURL(), format) + return open(path.toPath(), format) } /** * Open a [Trace] at the specified [path] in the given [format]. * + * @param path The [Path] to the trace. * @throws IllegalArgumentException if [format] is not supported. */ public fun open(path: Path, format: String): Trace { - return open(path.toUri().toURL(), format) + val provider = requireNotNull(TraceFormat.byName(format)) { "Unknown format $format" } + return TraceImpl(provider, path) } } } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTrace.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt index 3e5029b4..fd0a0f04 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTrace.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt @@ -20,30 +20,33 @@ * SOFTWARE. */ -package org.opendc.trace.opendc +package org.opendc.trace.internal -import org.opendc.trace.TABLE_RESOURCES -import org.opendc.trace.TABLE_RESOURCE_STATES import org.opendc.trace.Table -import org.opendc.trace.Trace -import java.nio.file.Path +import org.opendc.trace.TableColumn +import org.opendc.trace.TableReader +import java.util.* /** - * A [Trace] in the OpenDC virtual machine trace format. + * Internal implementation of [Table]. */ -public class OdcVmTrace internal constructor(private val path: Path) : Trace { - override val tables: List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) +internal class TableImpl(val trace: TraceImpl, override val name: String) : Table { + /** + * The details of this table. + */ + private val details = trace.format.getDetails(trace.path, name) - override fun containsTable(name: String): Boolean = - name == TABLE_RESOURCES || name == TABLE_RESOURCE_STATES + override val columns: List<TableColumn<*>> + get() = details.columns - override fun getTable(name: String): Table? { - return when (name) { - TABLE_RESOURCES -> OdcVmResourceTable(path) - TABLE_RESOURCE_STATES -> OdcVmResourceStateTable(path) - else -> null - } - } + override val partitionKeys: List<TableColumn<*>> + get() = details.partitionKeys - override fun toString(): String = "OdcVmTrace[$path]" + override fun newReader(): TableReader = trace.format.newReader(trace.path, name) + + override fun toString(): String = "Table[name=$name]" + + override fun hashCode(): Int = Objects.hash(trace, name) + + override fun equals(other: Any?): Boolean = other is TableImpl && trace == other.trace && name == other.name } diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TraceImpl.kt index a755a107..fd9536ab 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TraceImpl.kt @@ -20,28 +20,37 @@ * SOFTWARE. */ -package org.opendc.trace.wtf +package org.opendc.trace.internal -import org.opendc.trace.TABLE_TASKS import org.opendc.trace.Table import org.opendc.trace.Trace +import org.opendc.trace.spi.TraceFormat import java.nio.file.Path +import java.util.* +import java.util.concurrent.ConcurrentHashMap /** - * [Trace] implementation for the WTF format. + * Internal implementation of the [Trace] interface. */ -public class WtfTrace internal constructor(private val path: Path) : Trace { - override val tables: List<String> = listOf(TABLE_TASKS) +internal class TraceImpl(val format: TraceFormat, val path: Path) : Trace { + /** + * A map containing the [TableImpl] instances associated with the trace. + */ + private val tableMap = ConcurrentHashMap<String, TableImpl>() - override fun containsTable(name: String): Boolean = TABLE_TASKS == name + override val tables: List<String> = format.getTables(path) - override fun getTable(name: String): Table? { - if (!containsTable(name)) { - return null + init { + for (table in tables) { + tableMap.computeIfAbsent(table) { TableImpl(this, it) } } - - return WtfTaskTable(path) } - override fun toString(): String = "WtfTrace[$path]" + override fun containsTable(name: String): Boolean = tableMap.containsKey(name) + + override fun getTable(name: String): Table? = tableMap[name] + + override fun hashCode(): Int = Objects.hash(format, path) + + override fun equals(other: Any?): Boolean = other is TraceImpl && format == other.format && path == other.path } diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTrace.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TableDetails.kt index d4da735e..1a9b9ee1 100644 --- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTrace.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TableDetails.kt @@ -20,27 +20,18 @@ * SOFTWARE. */ -package org.opendc.trace.swf +package org.opendc.trace.spi -import org.opendc.trace.TABLE_TASKS import org.opendc.trace.Table -import org.opendc.trace.Trace -import java.nio.file.Path +import org.opendc.trace.TableColumn /** - * [Trace] implementation for the SWF format. + * A class used by the [TraceFormat] interface for describing the metadata of a [Table]. + * + * @param columns The available columns in the table. + * @param partitionKeys The table columns that act as partition keys for the table. */ -public class SwfTrace internal constructor(private val path: Path) : Trace { - override val tables: List<String> = listOf(TABLE_TASKS) - - override fun containsTable(name: String): Boolean = TABLE_TASKS == name - - override fun getTable(name: String): Table? { - if (!containsTable(name)) { - return null - } - return SwfTaskTable(path) - } - - override fun toString(): String = "SwfTrace[$path]" -} +public data class TableDetails( + val columns: List<TableColumn<*>>, + val partitionKeys: List<TableColumn<*>> = emptyList() +) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt index 54029fcf..e04dd948 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt @@ -22,8 +22,8 @@ package org.opendc.trace.spi -import org.opendc.trace.Trace -import java.net.URL +import org.opendc.trace.TableReader +import java.nio.file.Path import java.util.* /** @@ -36,11 +36,32 @@ public interface TraceFormat { public val name: String /** - * Open a new [Trace] with this provider. + * Return the name of the tables available in the trace at the specified [path]. * - * @param url A reference to the trace. + * @param path The path to the trace. + * @return The list of tables available in the trace. */ - public fun open(url: URL): Trace + public fun getTables(path: Path): List<String> + + /** + * Return the details of [table] in the trace at the specified [path]. + * + * @param path The path to the trace. + * @param table The name of the table to obtain the details for. + * @throws IllegalArgumentException If [table] does not exist. + * @return The [TableDetails] for the specified [table]. + */ + public fun getDetails(path: Path, table: String): TableDetails + + /** + * Open a [TableReader] for the specified [table]. + * + * @param path The path to the trace to open. + * @param table The name of the table to open a [TableReader] for. + * @throws IllegalArgumentException If [table] does not exist. + * @return A [TableReader] instance for the table. + */ + public fun newReader(path: Path, table: String): TableReader /** * A helper object for resolving providers. @@ -49,6 +70,7 @@ public interface TraceFormat { /** * A list of [TraceFormat] that are available on this system. */ + @JvmStatic public val installedProviders: List<TraceFormat> by lazy { val loader = ServiceLoader.load(TraceFormat::class.java) loader.toList() @@ -57,6 +79,7 @@ public interface TraceFormat { /** * Obtain a [TraceFormat] implementation by [name]. */ + @JvmStatic public fun byName(name: String): TraceFormat? = installedProviders.find { it.name == name } } } diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt deleted file mode 100644 index 285e7216..00000000 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.azure - -import com.fasterxml.jackson.dataformat.csv.CsvFactory -import org.opendc.trace.* -import org.opendc.trace.util.CompositeTableReader -import java.nio.file.Files -import java.nio.file.Path -import java.util.stream.Collectors -import kotlin.io.path.extension -import kotlin.io.path.nameWithoutExtension - -/** - * The resource state [Table] for the Azure v1 VM traces. - */ -internal class AzureResourceStateTable(private val factory: CsvFactory, path: Path) : Table { - /** - * The partitions that belong to the table. - */ - private val partitions = Files.walk(path.resolve("vm_cpu_readings"), 1) - .filter { !Files.isDirectory(it) && it.extension == "csv" } - .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) - .toSortedMap() - - override val name: String = TABLE_RESOURCE_STATES - - override val isSynthetic: Boolean = false - - override val columns: List<TableColumn<*>> = listOf( - RESOURCE_ID, - RESOURCE_STATE_TIMESTAMP, - RESOURCE_STATE_CPU_USAGE_PCT - ) - - override val partitionKeys: List<TableColumn<*>> = listOf(RESOURCE_STATE_TIMESTAMP) - - override fun newReader(): TableReader { - val it = partitions.iterator() - - return object : CompositeTableReader() { - override fun nextReader(): TableReader? { - return if (it.hasNext()) { - val (_, path) = it.next() - return AzureResourceStateTableReader(factory.createParser(path.toFile())) - } else { - null - } - } - - override fun toString(): String = "AzureCompositeTableReader" - } - } - - override fun newReader(partition: String): TableReader { - val path = requireNotNull(partitions[partition]) { "Invalid partition $partition" } - return AzureResourceStateTableReader(factory.createParser(path.toFile())) - } - - override fun toString(): String = "AzureResourceStateTable" -} diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTable.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTable.kt deleted file mode 100644 index ff7af172..00000000 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTable.kt +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.azure - -import com.fasterxml.jackson.dataformat.csv.CsvFactory -import org.opendc.trace.* -import java.nio.file.Path - -/** - * The resource [Table] for the Azure v1 VM traces. - */ -internal class AzureResourceTable(private val factory: CsvFactory, private val path: Path) : Table { - override val name: String = TABLE_RESOURCES - - override val isSynthetic: Boolean = false - - override val columns: List<TableColumn<*>> = listOf( - RESOURCE_ID, - RESOURCE_START_TIME, - RESOURCE_STOP_TIME, - RESOURCE_CPU_COUNT, - RESOURCE_MEM_CAPACITY - ) - - override val partitionKeys: List<TableColumn<*>> = emptyList() - - override fun newReader(): TableReader { - return AzureResourceTableReader(factory.createParser(path.resolve("vmtable/vmtable.csv").toFile())) - } - - override fun newReader(partition: String): TableReader { - throw IllegalArgumentException("No partition $partition") - } - - override fun toString(): String = "AzureResourceTable" -} diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTrace.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTrace.kt deleted file mode 100644 index c7e7dc36..00000000 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTrace.kt +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.azure - -import com.fasterxml.jackson.dataformat.csv.CsvFactory -import org.opendc.trace.* -import java.nio.file.Path - -/** - * [Trace] implementation for the Azure v1 VM traces. - */ -public class AzureTrace internal constructor(private val factory: CsvFactory, private val path: Path) : Trace { - override val tables: List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) - - override fun containsTable(name: String): Boolean = name in tables - - override fun getTable(name: String): Table? { - return when (name) { - TABLE_RESOURCES -> AzureResourceTable(factory, path) - TABLE_RESOURCE_STATES -> AzureResourceStateTable(factory, path) - else -> null - } - } - - override fun toString(): String = "AzureTrace[$path]" -} diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt index 1230d857..77af0d81 100644 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt +++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt @@ -24,10 +24,15 @@ package org.opendc.trace.azure import com.fasterxml.jackson.dataformat.csv.CsvFactory import com.fasterxml.jackson.dataformat.csv.CsvParser +import org.opendc.trace.* +import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat -import java.net.URL -import java.nio.file.Paths -import kotlin.io.path.exists +import org.opendc.trace.util.CompositeTableReader +import java.nio.file.Files +import java.nio.file.Path +import java.util.stream.Collectors +import kotlin.io.path.extension +import kotlin.io.path.nameWithoutExtension /** * A format implementation for the Azure v1 format. @@ -45,12 +50,60 @@ public class AzureTraceFormat : TraceFormat { .enable(CsvParser.Feature.ALLOW_COMMENTS) .enable(CsvParser.Feature.TRIM_SPACES) + override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) + + override fun getDetails(path: Path, table: String): TableDetails { + return when (table) { + TABLE_RESOURCES -> TableDetails( + listOf( + RESOURCE_ID, + RESOURCE_START_TIME, + RESOURCE_STOP_TIME, + RESOURCE_CPU_COUNT, + RESOURCE_MEM_CAPACITY + ) + ) + TABLE_RESOURCE_STATES -> TableDetails( + listOf( + RESOURCE_ID, + RESOURCE_STATE_TIMESTAMP, + RESOURCE_STATE_CPU_USAGE_PCT + ), + listOf(RESOURCE_STATE_TIMESTAMP) + ) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newReader(path: Path, table: String): TableReader { + return when (table) { + TABLE_RESOURCES -> AzureResourceTableReader(factory.createParser(path.resolve("vmtable/vmtable.csv").toFile())) + TABLE_RESOURCE_STATES -> newResourceStateReader(path) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + /** - * Open the trace file. + * Construct a [TableReader] for reading over all VM CPU readings. */ - override fun open(url: URL): AzureTrace { - val path = Paths.get(url.toURI()) - require(path.exists()) { "URL $url does not exist" } - return AzureTrace(factory, path) + private fun newResourceStateReader(path: Path): TableReader { + val partitions = Files.walk(path.resolve("vm_cpu_readings"), 1) + .filter { !Files.isDirectory(it) && it.extension == "csv" } + .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) + .toSortedMap() + val it = partitions.iterator() + + return object : CompositeTableReader() { + override fun nextReader(): TableReader? { + return if (it.hasNext()) { + val (_, partPath) = it.next() + return AzureResourceStateTableReader(factory.createParser(partPath.toFile())) + } else { + null + } + } + + override fun toString(): String = "AzureCompositeTableReader" + } } } diff --git a/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt b/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt index 2c1a2125..b73bb728 100644 --- a/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt @@ -26,8 +26,7 @@ import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows import org.opendc.trace.* -import java.io.File -import java.net.URL +import java.nio.file.Paths /** * Test suite for the [AzureTraceFormat] class. @@ -36,54 +35,29 @@ class AzureTraceFormatTest { private val format = AzureTraceFormat() @Test - fun testTraceExists() { - val url = File("src/test/resources/trace").toURI().toURL() - assertDoesNotThrow { - format.open(url) - } - } - - @Test - fun testTraceDoesNotExists() { - val url = File("src/test/resources/trace").toURI().toURL() - assertThrows<IllegalArgumentException> { - format.open(URL(url.toString() + "help")) - } - } - - @Test fun testTables() { - val url = File("src/test/resources/trace").toURI().toURL() - val trace = format.open(url) + val path = Paths.get("src/test/resources/trace") - assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), trace.tables) + assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), format.getTables(path)) } @Test fun testTableExists() { - val url = File("src/test/resources/trace").toURI().toURL() - val table = format.open(url).getTable(TABLE_RESOURCE_STATES) + val path = Paths.get("src/test/resources/trace") - assertNotNull(table) - assertDoesNotThrow { table!!.newReader() } + assertDoesNotThrow { format.getDetails(path, TABLE_RESOURCE_STATES) } } @Test fun testTableDoesNotExist() { - val url = File("src/test/resources/trace").toURI().toURL() - val trace = format.open(url) - - assertFalse(trace.containsTable("test")) - assertNull(trace.getTable("test")) + val path = Paths.get("src/test/resources/trace") + assertThrows<IllegalArgumentException> { format.getDetails(path, "test") } } @Test fun testResources() { - val url = File("src/test/resources/trace").toURI().toURL() - val trace = format.open(url) - - val reader = trace.getTable(TABLE_RESOURCES)!!.newReader() - + val path = Paths.get("src/test/resources/trace") + val reader = format.newReader(path, TABLE_RESOURCES) assertAll( { assertTrue(reader.nextRow()) }, { assertEquals("x/XsOfHO4ocsV99i4NluqKDuxctW2MMVmwqOPAlg4wp8mqbBOe3wxBlQo0+Qx+uf", reader.get(RESOURCE_ID)) }, @@ -96,10 +70,8 @@ class AzureTraceFormatTest { @Test fun testSmoke() { - val url = File("src/test/resources/trace").toURI().toURL() - val trace = format.open(url) - - val reader = trace.getTable(TABLE_RESOURCE_STATES)!!.newReader() + val path = Paths.get("src/test/resources/trace") + val reader = format.newReader(path, TABLE_RESOURCE_STATES) assertAll( { assertTrue(reader.nextRow()) }, diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt deleted file mode 100644 index 7cb58226..00000000 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.bitbrains - -import org.opendc.trace.* -import org.opendc.trace.util.CompositeTableReader -import java.nio.file.Files -import java.nio.file.Path -import java.util.stream.Collectors -import kotlin.io.path.bufferedReader -import kotlin.io.path.extension -import kotlin.io.path.nameWithoutExtension - -/** - * The resource state [Table] in the extended Bitbrains format. - */ -internal class BitbrainsExResourceStateTable(path: Path) : Table { - /** - * The partitions that belong to the table. - */ - private val partitions = Files.walk(path, 1) - .filter { !Files.isDirectory(it) && it.extension == "txt" } - .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) - .toSortedMap() - - override val name: String = TABLE_RESOURCE_STATES - - override val isSynthetic: Boolean = false - - override val columns: List<TableColumn<*>> = listOf( - RESOURCE_ID, - RESOURCE_CLUSTER_ID, - RESOURCE_STATE_TIMESTAMP, - RESOURCE_CPU_COUNT, - RESOURCE_CPU_CAPACITY, - RESOURCE_STATE_CPU_USAGE, - RESOURCE_STATE_CPU_USAGE_PCT, - RESOURCE_STATE_CPU_DEMAND, - RESOURCE_STATE_CPU_READY_PCT, - RESOURCE_MEM_CAPACITY, - RESOURCE_STATE_DISK_READ, - RESOURCE_STATE_DISK_WRITE, - ) - - override val partitionKeys: List<TableColumn<*>> = listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP) - - override fun newReader(): TableReader { - val it = partitions.iterator() - - return object : CompositeTableReader() { - override fun nextReader(): TableReader? { - return if (it.hasNext()) { - val (_, path) = it.next() - val reader = path.bufferedReader() - return BitbrainsExResourceStateTableReader(reader) - } else { - null - } - } - - override fun toString(): String = "SvCompositeTableReader" - } - } - - override fun newReader(partition: String): TableReader { - val path = requireNotNull(partitions[partition]) { "Invalid partition $partition" } - val reader = path.bufferedReader() - return BitbrainsExResourceStateTableReader(reader) - } - - override fun toString(): String = "BitbrainsExResourceStateTable" -} diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTrace.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTrace.kt deleted file mode 100644 index f16c493d..00000000 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTrace.kt +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.bitbrains - -import org.opendc.trace.* -import java.nio.file.Path - -/** - * [Trace] implementation for the extended Bitbrains format. - */ -public class BitbrainsExTrace internal constructor(private val path: Path) : Trace { - override val tables: List<String> = listOf(TABLE_RESOURCE_STATES) - - override fun containsTable(name: String): Boolean = TABLE_RESOURCE_STATES == name - - override fun getTable(name: String): Table? { - if (!containsTable(name)) { - return null - } - - return BitbrainsExResourceStateTable(path) - } - - override fun toString(): String = "BitbrainsExTrace[$path]" -} diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt index 06388a84..080b73de 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt @@ -22,10 +22,16 @@ package org.opendc.trace.bitbrains +import org.opendc.trace.* +import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat -import java.net.URL -import java.nio.file.Paths -import kotlin.io.path.exists +import org.opendc.trace.util.CompositeTableReader +import java.nio.file.Files +import java.nio.file.Path +import java.util.stream.Collectors +import kotlin.io.path.bufferedReader +import kotlin.io.path.extension +import kotlin.io.path.nameWithoutExtension /** * A format implementation for the extended Bitbrains trace format. @@ -36,12 +42,59 @@ public class BitbrainsExTraceFormat : TraceFormat { */ override val name: String = "bitbrains-ex" + override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCE_STATES) + + override fun getDetails(path: Path, table: String): TableDetails { + return when (table) { + TABLE_RESOURCE_STATES -> TableDetails( + listOf( + RESOURCE_ID, + RESOURCE_CLUSTER_ID, + RESOURCE_STATE_TIMESTAMP, + RESOURCE_CPU_COUNT, + RESOURCE_CPU_CAPACITY, + RESOURCE_STATE_CPU_USAGE, + RESOURCE_STATE_CPU_USAGE_PCT, + RESOURCE_STATE_CPU_DEMAND, + RESOURCE_STATE_CPU_READY_PCT, + RESOURCE_MEM_CAPACITY, + RESOURCE_STATE_DISK_READ, + RESOURCE_STATE_DISK_WRITE + ), + listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP) + ) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newReader(path: Path, table: String): TableReader { + return when (table) { + TABLE_RESOURCE_STATES -> newResourceStateReader(path) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + /** - * Open the trace file. + * Construct a [TableReader] for reading over all resource state partitions. */ - override fun open(url: URL): BitbrainsExTrace { - val path = Paths.get(url.toURI()) - require(path.exists()) { "URL $url does not exist" } - return BitbrainsExTrace(path) + private fun newResourceStateReader(path: Path): TableReader { + val partitions = Files.walk(path, 1) + .filter { !Files.isDirectory(it) && it.extension == "txt" } + .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) + .toSortedMap() + val it = partitions.iterator() + + return object : CompositeTableReader() { + override fun nextReader(): TableReader? { + return if (it.hasNext()) { + val (_, partPath) = it.next() + return BitbrainsExResourceStateTableReader(partPath.bufferedReader()) + } else { + null + } + } + + override fun toString(): String = "BitbrainsExCompositeTableReader" + } } } diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt deleted file mode 100644 index 7b08b8be..00000000 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.bitbrains - -import com.fasterxml.jackson.dataformat.csv.CsvFactory -import org.opendc.trace.* -import org.opendc.trace.util.CompositeTableReader -import java.nio.file.Files -import java.nio.file.Path -import java.util.stream.Collectors -import kotlin.io.path.extension -import kotlin.io.path.nameWithoutExtension - -/** - * The resource state [Table] in the Bitbrains format. - */ -internal class BitbrainsResourceStateTable(private val factory: CsvFactory, path: Path) : Table { - /** - * The partitions that belong to the table. - */ - private val partitions = - Files.walk(path, 1) - .filter { !Files.isDirectory(it) && it.extension == "csv" } - .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) - .toSortedMap() - - override val name: String = TABLE_RESOURCE_STATES - - override val isSynthetic: Boolean = false - - override val columns: List<TableColumn<*>> = listOf( - RESOURCE_ID, - RESOURCE_STATE_TIMESTAMP, - RESOURCE_CPU_COUNT, - RESOURCE_CPU_CAPACITY, - RESOURCE_STATE_CPU_USAGE, - RESOURCE_STATE_CPU_USAGE_PCT, - RESOURCE_MEM_CAPACITY, - RESOURCE_STATE_MEM_USAGE, - RESOURCE_STATE_DISK_READ, - RESOURCE_STATE_DISK_WRITE, - RESOURCE_STATE_NET_RX, - RESOURCE_STATE_NET_TX, - ) - - override val partitionKeys: List<TableColumn<*>> = listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP) - - override fun newReader(): TableReader { - val it = partitions.iterator() - - return object : CompositeTableReader() { - override fun nextReader(): TableReader? { - return if (it.hasNext()) { - val (partition, path) = it.next() - return BitbrainsResourceStateTableReader(partition, factory.createParser(path.toFile())) - } else { - null - } - } - - override fun toString(): String = "BitbrainsCompositeTableReader" - } - } - - override fun newReader(partition: String): TableReader { - val path = requireNotNull(partitions[partition]) { "Invalid partition $partition" } - return BitbrainsResourceStateTableReader(partition, factory.createParser(path.toFile())) - } - - override fun toString(): String = "BitbrainsResourceStateTable" -} diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTable.kt deleted file mode 100644 index d024af2d..00000000 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTable.kt +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.bitbrains - -import com.fasterxml.jackson.dataformat.csv.CsvFactory -import org.opendc.trace.* -import java.nio.file.Files -import java.nio.file.Path -import java.util.stream.Collectors -import kotlin.io.path.extension -import kotlin.io.path.nameWithoutExtension - -/** - * The resources [Table] in the Bitbrains format. - */ -internal class BitbrainsResourceTable(private val factory: CsvFactory, path: Path) : Table { - /** - * The VMs that belong to the table. - */ - private val vms = - Files.walk(path, 1) - .filter { !Files.isDirectory(it) && it.extension == "csv" } - .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) - .toSortedMap() - - override val name: String = TABLE_RESOURCES - - override val isSynthetic: Boolean = true - - override val columns: List<TableColumn<*>> = listOf(RESOURCE_ID) - - override val partitionKeys: List<TableColumn<*>> = emptyList() - - override fun newReader(): TableReader { - return BitbrainsResourceTableReader(factory, vms) - } - - override fun newReader(partition: String): TableReader { - throw IllegalArgumentException("Unknown partition $partition") - } - - override fun toString(): String = "BitbrainsResourceTable" -} diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt deleted file mode 100644 index bcd8dd52..00000000 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.bitbrains - -import com.fasterxml.jackson.dataformat.csv.CsvFactory -import org.opendc.trace.* -import java.nio.file.Path - -/** - * [Trace] implementation for the Bitbrains format. - */ -public class BitbrainsTrace internal constructor(private val factory: CsvFactory, private val path: Path) : Trace { - override val tables: List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) - - override fun containsTable(name: String): Boolean = tables.contains(name) - - override fun getTable(name: String): Table? { - return when (name) { - TABLE_RESOURCES -> BitbrainsResourceTable(factory, path) - TABLE_RESOURCE_STATES -> BitbrainsResourceStateTable(factory, path) - else -> null - } - } - - override fun toString(): String = "BitbrainsTrace[$path]" -} diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt index 55b11fe3..1573726f 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt @@ -24,10 +24,15 @@ package org.opendc.trace.bitbrains import com.fasterxml.jackson.dataformat.csv.CsvFactory import com.fasterxml.jackson.dataformat.csv.CsvParser +import org.opendc.trace.* +import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat -import java.net.URL -import java.nio.file.Paths -import kotlin.io.path.exists +import org.opendc.trace.util.CompositeTableReader +import java.nio.file.Files +import java.nio.file.Path +import java.util.stream.Collectors +import kotlin.io.path.extension +import kotlin.io.path.nameWithoutExtension /** * A format implementation for the GWF trace format. @@ -45,12 +50,67 @@ public class BitbrainsTraceFormat : TraceFormat { .enable(CsvParser.Feature.ALLOW_COMMENTS) .enable(CsvParser.Feature.TRIM_SPACES) + override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) + + override fun getDetails(path: Path, table: String): TableDetails { + return when (table) { + TABLE_RESOURCES -> TableDetails(listOf(RESOURCE_ID)) + TABLE_RESOURCE_STATES -> TableDetails( + listOf( + RESOURCE_ID, + RESOURCE_STATE_TIMESTAMP, + RESOURCE_CPU_COUNT, + RESOURCE_CPU_CAPACITY, + RESOURCE_STATE_CPU_USAGE, + RESOURCE_STATE_CPU_USAGE_PCT, + RESOURCE_MEM_CAPACITY, + RESOURCE_STATE_MEM_USAGE, + RESOURCE_STATE_DISK_READ, + RESOURCE_STATE_DISK_WRITE, + RESOURCE_STATE_NET_RX, + RESOURCE_STATE_NET_TX, + ), + listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP) + ) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newReader(path: Path, table: String): TableReader { + return when (table) { + TABLE_RESOURCES -> { + val vms = Files.walk(path, 1) + .filter { !Files.isDirectory(it) && it.extension == "csv" } + .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) + .toSortedMap() + BitbrainsResourceTableReader(factory, vms) + } + TABLE_RESOURCE_STATES -> newResourceStateReader(path) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + /** - * Open a Bitbrains trace. + * Construct a [TableReader] for reading over all resource state partitions. */ - override fun open(url: URL): BitbrainsTrace { - val path = Paths.get(url.toURI()) - require(path.exists()) { "URL $url does not exist" } - return BitbrainsTrace(factory, path) + private fun newResourceStateReader(path: Path): TableReader { + val partitions = Files.walk(path, 1) + .filter { !Files.isDirectory(it) && it.extension == "csv" } + .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) + .toSortedMap() + val it = partitions.iterator() + + return object : CompositeTableReader() { + override fun nextReader(): TableReader? { + return if (it.hasNext()) { + val (partition, partPath) = it.next() + return BitbrainsResourceStateTableReader(partition, factory.createParser(partPath.toFile())) + } else { + null + } + } + + override fun toString(): String = "BitbrainsCompositeTableReader" + } } } diff --git a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt index 2e4f176a..d734cf5f 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt @@ -26,62 +26,38 @@ import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows import org.opendc.trace.* -import java.net.URL +import java.nio.file.Paths /** * Test suite for the [BitbrainsExTraceFormat] class. */ -class BitbrainsExTraceFormatTest { +internal class BitbrainsExTraceFormatTest { private val format = BitbrainsExTraceFormat() @Test - fun testTraceExists() { - val url = checkNotNull(BitbrainsExTraceFormatTest::class.java.getResource("/vm.txt")) - assertDoesNotThrow { - format.open(url) - } - } - - @Test - fun testTraceDoesNotExists() { - val url = checkNotNull(BitbrainsExTraceFormatTest::class.java.getResource("/vm.txt")) - assertThrows<IllegalArgumentException> { - format.open(URL(url.toString() + "help")) - } - } - - @Test fun testTables() { - val url = checkNotNull(BitbrainsExTraceFormatTest::class.java.getResource("/vm.txt")) - val trace = format.open(url) + val path = Paths.get("src/test/resources/vm.txt") - assertEquals(listOf(TABLE_RESOURCE_STATES), trace.tables) + assertEquals(listOf(TABLE_RESOURCE_STATES), format.getTables(path)) } @Test fun testTableExists() { - val url = checkNotNull(BitbrainsExTraceFormatTest::class.java.getResource("/vm.txt")) - val table = format.open(url).getTable(TABLE_RESOURCE_STATES) + val path = Paths.get("src/test/resources/vm.txt") - assertNotNull(table) - assertDoesNotThrow { table!!.newReader() } + assertDoesNotThrow { format.getDetails(path, TABLE_RESOURCE_STATES) } } @Test fun testTableDoesNotExist() { - val url = checkNotNull(BitbrainsExTraceFormatTest::class.java.getResource("/vm.txt")) - val trace = format.open(url) - - assertFalse(trace.containsTable("test")) - assertNull(trace.getTable("test")) + val path = Paths.get("src/test/resources/vm.txt") + assertThrows<IllegalArgumentException> { format.getDetails(path, "test") } } @Test fun testSmoke() { - val url = checkNotNull(BitbrainsExTraceFormatTest::class.java.getResource("/vm.txt")) - val trace = format.open(url) - - val reader = trace.getTable(TABLE_RESOURCE_STATES)!!.newReader() + val path = Paths.get("src/test/resources/vm.txt") + val reader = format.newReader(path, TABLE_RESOURCE_STATES) assertAll( { assertTrue(reader.nextRow()) }, diff --git a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt index ff4a33f8..41e7def2 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt @@ -26,66 +26,38 @@ import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows import org.opendc.trace.* -import java.net.URL +import java.nio.file.Paths /** * Test suite for the [BitbrainsTraceFormat] class. */ class BitbrainsTraceFormatTest { - @Test - fun testTraceExists() { - val format = BitbrainsTraceFormat() - val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv")) - assertDoesNotThrow { - format.open(url) - } - } - - @Test - fun testTraceDoesNotExists() { - val format = BitbrainsTraceFormat() - val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv")) - assertThrows<IllegalArgumentException> { - format.open(URL(url.toString() + "help")) - } - } + private val format = BitbrainsTraceFormat() @Test fun testTables() { - val format = BitbrainsTraceFormat() - val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv")) - val trace = format.open(url) + val path = Paths.get("src/test/resources/bitbrains.csv") - assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), trace.tables) + assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), format.getTables(path)) } @Test fun testTableExists() { - val format = BitbrainsTraceFormat() - val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv")) - val table = format.open(url).getTable(TABLE_RESOURCE_STATES) + val path = Paths.get("src/test/resources/bitbrains.csv") - assertNotNull(table) - assertDoesNotThrow { table!!.newReader() } + assertDoesNotThrow { format.getDetails(path, TABLE_RESOURCE_STATES) } } @Test fun testTableDoesNotExist() { - val format = BitbrainsTraceFormat() - val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv")) - val trace = format.open(url) - - assertFalse(trace.containsTable("test")) - assertNull(trace.getTable("test")) + val path = Paths.get("src/test/resources/bitbrains.csv") + assertThrows<IllegalArgumentException> { format.getDetails(path, "test") } } @Test fun testResources() { - val format = BitbrainsTraceFormat() - val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv")) - val trace = format.open(url) - - val reader = trace.getTable(TABLE_RESOURCES)!!.newReader() + val path = Paths.get("src/test/resources/bitbrains.csv") + val reader = format.newReader(path, TABLE_RESOURCES) assertAll( { assertTrue(reader.nextRow()) }, @@ -98,11 +70,8 @@ class BitbrainsTraceFormatTest { @Test fun testSmoke() { - val format = BitbrainsTraceFormat() - val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv")) - val trace = format.open(url) - - val reader = trace.getTable(TABLE_RESOURCE_STATES)!!.newReader() + val path = Paths.get("src/test/resources/bitbrains.csv") + val reader = format.newReader(path, TABLE_RESOURCE_STATES) assertAll( { assertTrue(reader.nextRow()) }, diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt deleted file mode 100644 index ca720de4..00000000 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.gwf - -import com.fasterxml.jackson.dataformat.csv.CsvFactory -import org.opendc.trace.* -import java.net.URL - -/** - * A [Table] containing the tasks in a GWF trace. - */ -internal class GwfTaskTable(private val factory: CsvFactory, private val url: URL) : Table { - override val name: String = TABLE_TASKS - - override val isSynthetic: Boolean = false - - override val columns: List<TableColumn<*>> = listOf( - TASK_WORKFLOW_ID, - TASK_ID, - TASK_SUBMIT_TIME, - TASK_RUNTIME, - TASK_REQ_NCPUS, - TASK_ALLOC_NCPUS, - TASK_PARENTS - ) - - override val partitionKeys: List<TableColumn<*>> = listOf(TASK_WORKFLOW_ID) - - override fun newReader(): TableReader { - return GwfTaskTableReader(factory.createParser(url)) - } - - override fun newReader(partition: String): TableReader { - throw IllegalArgumentException("Invalid partition $partition") - } - - override fun toString(): String = "GwfTaskTable" -} diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTrace.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTrace.kt deleted file mode 100644 index 166c1e56..00000000 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTrace.kt +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.gwf - -import com.fasterxml.jackson.dataformat.csv.CsvFactory -import org.opendc.trace.* -import java.net.URL - -/** - * [Trace] implementation for the GWF format. - */ -public class GwfTrace internal constructor(private val factory: CsvFactory, private val url: URL) : Trace { - override val tables: List<String> = listOf(TABLE_TASKS) - - override fun containsTable(name: String): Boolean = TABLE_TASKS == name - - override fun getTable(name: String): Table? { - if (!containsTable(name)) { - return null - } - - return GwfTaskTable(factory, url) - } - - override fun toString(): String = "GwfTrace[$url]" -} diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt index 6d542503..0f7b9d6e 100644 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt @@ -24,10 +24,10 @@ package org.opendc.trace.gwf import com.fasterxml.jackson.dataformat.csv.CsvFactory import com.fasterxml.jackson.dataformat.csv.CsvParser +import org.opendc.trace.* +import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat -import java.net.URL -import java.nio.file.Paths -import kotlin.io.path.exists +import java.nio.file.Path /** * A [TraceFormat] implementation for the GWF trace format. @@ -45,12 +45,30 @@ public class GwfTraceFormat : TraceFormat { .enable(CsvParser.Feature.ALLOW_COMMENTS) .enable(CsvParser.Feature.TRIM_SPACES) - /** - * Read the tasks in the GWF trace. - */ - public override fun open(url: URL): GwfTrace { - val path = Paths.get(url.toURI()) - require(path.exists()) { "URL $url does not exist" } - return GwfTrace(factory, url) + override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS) + + override fun getDetails(path: Path, table: String): TableDetails { + return when (table) { + TABLE_TASKS -> TableDetails( + listOf( + TASK_WORKFLOW_ID, + TASK_ID, + TASK_SUBMIT_TIME, + TASK_RUNTIME, + TASK_REQ_NCPUS, + TASK_ALLOC_NCPUS, + TASK_PARENTS, + ), + listOf(TASK_WORKFLOW_ID) + ) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newReader(path: Path, table: String): TableReader { + return when (table) { + TABLE_TASKS -> GwfTaskTableReader(factory.createParser(path.toFile())) + else -> throw IllegalArgumentException("Table $table not supported") + } } } diff --git a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt index b209b979..7fe403b2 100644 --- a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt @@ -22,13 +22,10 @@ package org.opendc.trace.gwf +import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.* -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertAll -import org.junit.jupiter.api.assertDoesNotThrow -import org.junit.jupiter.api.assertThrows import org.opendc.trace.* -import java.net.URL +import java.nio.file.Paths import java.time.Duration import java.time.Instant @@ -36,59 +33,32 @@ import java.time.Instant * Test suite for the [GwfTraceFormat] class. */ internal class GwfTraceFormatTest { - @Test - fun testTraceExists() { - val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")) - val format = GwfTraceFormat() - assertDoesNotThrow { - format.open(input) - } - } - - @Test - fun testTraceDoesNotExists() { - val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")) - val format = GwfTraceFormat() - assertThrows<IllegalArgumentException> { - format.open(URL(input.toString() + "help")) - } - } + private val format = GwfTraceFormat() @Test fun testTables() { - val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")) - val format = GwfTraceFormat() - val trace = format.open(input) + val path = Paths.get(checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")).toURI()) - assertEquals(listOf(TABLE_TASKS), trace.tables) + assertEquals(listOf(TABLE_TASKS), format.getTables(path)) } @Test fun testTableExists() { - val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")) - val format = GwfTraceFormat() - val table = format.open(input).getTable(TABLE_TASKS) - - assertNotNull(table) - assertDoesNotThrow { table!!.newReader() } + val path = Paths.get(checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")).toURI()) + assertDoesNotThrow { format.getDetails(path, TABLE_TASKS) } } @Test fun testTableDoesNotExist() { - val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")) - val format = GwfTraceFormat() - val trace = format.open(input) + val path = Paths.get(checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")).toURI()) - assertFalse(trace.containsTable("test")) - assertNull(trace.getTable("test")) + assertThrows<IllegalArgumentException> { format.getDetails(path, "test") } } @Test fun testTableReader() { - val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")) - val format = GwfTraceFormat() - val table = format.open(input).getTable(TABLE_TASKS)!! - val reader = table.newReader() + val path = Paths.get(checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")).toURI()) + val reader = format.newReader(path, TABLE_TASKS) assertAll( { assertTrue(reader.nextRow()) }, @@ -99,13 +69,4 @@ internal class GwfTraceFormatTest { { assertEquals(emptySet<String>(), reader.get(TASK_PARENTS)) }, ) } - - @Test - fun testTableReaderPartition() { - val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")) - val format = GwfTraceFormat() - val table = format.open(input).getTable(TABLE_TASKS)!! - - assertThrows<IllegalArgumentException> { table.newReader("test") } - } } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt deleted file mode 100644 index caacf192..00000000 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.opendc - -import org.apache.avro.generic.GenericRecord -import org.opendc.trace.* -import org.opendc.trace.util.parquet.LocalParquetReader -import java.nio.file.Path - -/** - * The resource state [Table] in the OpenDC virtual machine trace format. - */ -internal class OdcVmResourceStateTable(private val path: Path) : Table { - override val name: String = TABLE_RESOURCE_STATES - override val isSynthetic: Boolean = false - - override val columns: List<TableColumn<*>> = listOf( - RESOURCE_ID, - RESOURCE_STATE_TIMESTAMP, - RESOURCE_STATE_DURATION, - RESOURCE_CPU_COUNT, - RESOURCE_STATE_CPU_USAGE, - ) - - override val partitionKeys: List<TableColumn<*>> = listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP) - - override fun newReader(): TableReader { - val reader = LocalParquetReader<GenericRecord>(path.resolve("trace.parquet")) - return OdcVmResourceStateTableReader(reader) - } - - override fun newReader(partition: String): TableReader { - throw IllegalArgumentException("Unknown partition $partition") - } -} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt deleted file mode 100644 index 653b28b8..00000000 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.opendc - -import org.apache.avro.generic.GenericRecord -import org.opendc.trace.* -import org.opendc.trace.util.parquet.LocalParquetReader -import java.nio.file.Path - -/** - * The resource [Table] for the OpenDC virtual machine trace format. - */ -internal class OdcVmResourceTable(private val path: Path) : Table { - override val name: String = TABLE_RESOURCES - override val isSynthetic: Boolean = false - - override val columns: List<TableColumn<*>> = listOf( - RESOURCE_ID, - RESOURCE_START_TIME, - RESOURCE_STOP_TIME, - RESOURCE_CPU_COUNT, - RESOURCE_MEM_CAPACITY, - ) - - override val partitionKeys: List<TableColumn<*>> = emptyList() - - override fun newReader(): TableReader { - val reader = LocalParquetReader<GenericRecord>(path.resolve("meta.parquet")) - return OdcVmResourceTableReader(reader) - } - - override fun newReader(partition: String): TableReader { - throw IllegalArgumentException("Unknown partition $partition") - } -} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt index 8edba725..29818147 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt @@ -24,11 +24,13 @@ package org.opendc.trace.opendc import org.apache.avro.Schema import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericRecord +import org.opendc.trace.* +import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat +import org.opendc.trace.util.parquet.LocalParquetReader import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA -import java.net.URL -import java.nio.file.Paths -import kotlin.io.path.exists +import java.nio.file.Path /** * A [TraceFormat] implementation of the OpenDC virtual machine trace format. @@ -39,13 +41,45 @@ public class OdcVmTraceFormat : TraceFormat { */ override val name: String = "opendc-vm" - /** - * Open a Bitbrains Parquet trace. - */ - override fun open(url: URL): OdcVmTrace { - val path = Paths.get(url.toURI()) - require(path.exists()) { "URL $url does not exist" } - return OdcVmTrace(path) + override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) + + override fun getDetails(path: Path, table: String): TableDetails { + return when (table) { + TABLE_RESOURCES -> TableDetails( + listOf( + RESOURCE_ID, + RESOURCE_START_TIME, + RESOURCE_STOP_TIME, + RESOURCE_CPU_COUNT, + RESOURCE_MEM_CAPACITY, + ) + ) + TABLE_RESOURCE_STATES -> TableDetails( + listOf( + RESOURCE_ID, + RESOURCE_STATE_TIMESTAMP, + RESOURCE_STATE_DURATION, + RESOURCE_CPU_COUNT, + RESOURCE_STATE_CPU_USAGE, + ), + listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP) + ) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newReader(path: Path, table: String): TableReader { + return when (table) { + TABLE_RESOURCES -> { + val reader = LocalParquetReader<GenericRecord>(path.resolve("meta.parquet")) + OdcVmResourceTableReader(reader) + } + TABLE_RESOURCE_STATES -> { + val reader = LocalParquetReader<GenericRecord>(path.resolve("trace.parquet")) + OdcVmResourceStateTableReader(reader) + } + else -> throw IllegalArgumentException("Table $table not supported") + } } public companion object { diff --git a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt index 9fb6028d..bfe0f881 100644 --- a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt @@ -29,8 +29,7 @@ import org.junit.jupiter.api.assertThrows import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource import org.opendc.trace.* -import java.io.File -import java.net.URL +import java.nio.file.Paths /** * Test suite for the [OdcVmTraceFormat] implementation. @@ -39,52 +38,30 @@ internal class OdcVmTraceFormatTest { private val format = OdcVmTraceFormat() @Test - fun testTraceExists() { - val url = File("src/test/resources/trace-v2.1").toURI().toURL() - assertDoesNotThrow { format.open(url) } - } - - @Test - fun testTraceDoesNotExists() { - val url = File("src/test/resources/trace-v2.1").toURI().toURL() - assertThrows<IllegalArgumentException> { - format.open(URL(url.toString() + "help")) - } - } - - @Test fun testTables() { - val url = File("src/test/resources/trace-v2.1").toURI().toURL() - val trace = format.open(url) + val path = Paths.get("src/test/resources/trace-v2.1") - assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), trace.tables) + assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), format.getTables(path)) } @Test fun testTableExists() { - val url = File("src/test/resources/trace-v2.1").toURI().toURL() - val table = format.open(url).getTable(TABLE_RESOURCE_STATES) + val path = Paths.get("src/test/resources/trace-v2.1") - assertNotNull(table) - assertDoesNotThrow { table!!.newReader() } + assertDoesNotThrow { format.getDetails(path, TABLE_RESOURCE_STATES) } } @Test fun testTableDoesNotExist() { - val url = File("src/test/resources/trace-v2.1").toURI().toURL() - val trace = format.open(url) - - assertFalse(trace.containsTable("test")) - assertNull(trace.getTable("test")) + val path = Paths.get("src/test/resources/trace-v2.1") + assertThrows<IllegalArgumentException> { format.getDetails(path, "test") } } @ParameterizedTest @ValueSource(strings = ["trace-v2.0", "trace-v2.1"]) fun testResources(name: String) { - val url = File("src/test/resources/$name").toURI().toURL() - val trace = format.open(url) - - val reader = trace.getTable(TABLE_RESOURCES)!!.newReader() + val path = Paths.get("src/test/resources/$name") + val reader = format.newReader(path, TABLE_RESOURCES) assertAll( { assertTrue(reader.nextRow()) }, @@ -104,10 +81,8 @@ internal class OdcVmTraceFormatTest { @ParameterizedTest @ValueSource(strings = ["trace-v2.0", "trace-v2.1"]) fun testSmoke(name: String) { - val url = File("src/test/resources/$name").toURI().toURL() - val trace = format.open(url) - - val reader = trace.getTable(TABLE_RESOURCE_STATES)!!.newReader() + val path = Paths.get("src/test/resources/$name") + val reader = format.newReader(path, TABLE_RESOURCE_STATES) assertAll( { assertTrue(reader.nextRow()) }, diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt deleted file mode 100644 index 4898779d..00000000 --- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.swf - -import org.opendc.trace.* -import java.nio.file.Path -import kotlin.io.path.bufferedReader - -/** - * A [Table] containing the tasks in a SWF trace. - */ -internal class SwfTaskTable(private val path: Path) : Table { - override val name: String = TABLE_TASKS - - override val isSynthetic: Boolean = false - - override val columns: List<TableColumn<*>> = listOf( - TASK_ID, - TASK_SUBMIT_TIME, - TASK_WAIT_TIME, - TASK_RUNTIME, - TASK_REQ_NCPUS, - TASK_ALLOC_NCPUS, - TASK_PARENTS, - TASK_STATUS, - TASK_GROUP_ID, - TASK_USER_ID - ) - - override val partitionKeys: List<TableColumn<*>> = emptyList() - - override fun newReader(): TableReader { - val reader = path.bufferedReader() - return SwfTaskTableReader(reader) - } - - override fun newReader(partition: String): TableReader { - throw IllegalArgumentException("Invalid partition $partition") - } - - override fun toString(): String = "SwfTaskTable" -} diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt index 36c3122e..4cb7e49e 100644 --- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt +++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt @@ -22,10 +22,11 @@ package org.opendc.trace.swf +import org.opendc.trace.* +import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat -import java.net.URL -import java.nio.file.Paths -import kotlin.io.path.exists +import java.nio.file.Path +import kotlin.io.path.bufferedReader /** * Support for the Standard Workload Format (SWF) in OpenDC. @@ -35,9 +36,33 @@ import kotlin.io.path.exists public class SwfTraceFormat : TraceFormat { override val name: String = "swf" - override fun open(url: URL): SwfTrace { - val path = Paths.get(url.toURI()) - require(path.exists()) { "URL $url does not exist" } - return SwfTrace(path) + override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS) + + override fun getDetails(path: Path, table: String): TableDetails { + return when (table) { + TABLE_TASKS -> TableDetails( + listOf( + TASK_ID, + TASK_SUBMIT_TIME, + TASK_WAIT_TIME, + TASK_RUNTIME, + TASK_REQ_NCPUS, + TASK_ALLOC_NCPUS, + TASK_PARENTS, + TASK_STATUS, + TASK_GROUP_ID, + TASK_USER_ID + ), + emptyList() + ) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newReader(path: Path, table: String): TableReader { + return when (table) { + TABLE_TASKS -> SwfTaskTableReader(path.bufferedReader()) + else -> throw IllegalArgumentException("Table $table not supported") + } } } diff --git a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt index 828c2bfa..4dcd43f6 100644 --- a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt @@ -27,61 +27,38 @@ import org.junit.jupiter.api.Assertions.* import org.opendc.trace.TABLE_TASKS import org.opendc.trace.TASK_ALLOC_NCPUS import org.opendc.trace.TASK_ID -import java.net.URL +import java.nio.file.Paths /** * Test suite for the [SwfTraceFormat] class. */ internal class SwfTraceFormatTest { - @Test - fun testTraceExists() { - val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")) - val format = SwfTraceFormat() - assertDoesNotThrow { - format.open(input) - } - } - - @Test - fun testTraceDoesNotExists() { - val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")) - val format = SwfTraceFormat() - assertThrows<IllegalArgumentException> { - format.open(URL(input.toString() + "help")) - } - } + private val format = SwfTraceFormat() @Test fun testTables() { - val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")) - val trace = SwfTraceFormat().open(input) + val path = Paths.get(checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")).toURI()) - assertEquals(listOf(TABLE_TASKS), trace.tables) + assertEquals(listOf(TABLE_TASKS), format.getTables(path)) } @Test fun testTableExists() { - val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")) - val table = SwfTraceFormat().open(input).getTable(TABLE_TASKS) - - assertNotNull(table) - assertDoesNotThrow { table!!.newReader() } + val path = Paths.get(checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")).toURI()) + assertDoesNotThrow { format.getDetails(path, TABLE_TASKS) } } @Test fun testTableDoesNotExist() { - val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")) - val trace = SwfTraceFormat().open(input) + val path = Paths.get(checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")).toURI()) - assertFalse(trace.containsTable("test")) - assertNull(trace.getTable("test")) + assertThrows<IllegalArgumentException> { format.getDetails(path, "test") } } @Test fun testReader() { - val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")) - val trace = SwfTraceFormat().open(input) - val reader = trace.getTable(TABLE_TASKS)!!.newReader() + val path = Paths.get(checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")).toURI()) + val reader = format.newReader(path, TABLE_TASKS) assertAll( { assertTrue(reader.nextRow()) }, @@ -94,14 +71,4 @@ internal class SwfTraceFormatTest { reader.close() } - - @Test - fun testReaderPartition() { - val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")) - val trace = SwfTraceFormat().open(input) - - assertThrows<IllegalArgumentException> { - trace.getTable(TABLE_TASKS)!!.newReader("test") - } - } } diff --git a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt index 0b089904..cd5d287f 100644 --- a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt +++ b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt @@ -35,9 +35,6 @@ import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.opendc.trace.* -import org.opendc.trace.azure.AzureTraceFormat -import org.opendc.trace.bitbrains.BitbrainsExTraceFormat -import org.opendc.trace.bitbrains.BitbrainsTraceFormat import org.opendc.trace.opendc.OdcVmTraceFormat import org.opendc.trace.util.parquet.LocalOutputFile import java.io.File @@ -78,11 +75,7 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { * The input format of the trace. */ private val format by option("-f", "--format", help = "input format of trace") - .choice( - "solvinity" to BitbrainsExTraceFormat(), - "bitbrains" to BitbrainsTraceFormat(), - "azure" to AzureTraceFormat() - ) + .choice("bitbrains-ex", "bitbrains", "azure") .required() /** @@ -101,7 +94,7 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { traceParquet.delete() } - val trace = format.open(input.toURI().toURL()) + val trace = Trace.open(input, format = format) logger.info { "Building resources table" } diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt deleted file mode 100644 index 17aeee97..00000000 --- a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.wfformat - -import com.fasterxml.jackson.core.JsonFactory -import org.opendc.trace.* -import java.nio.file.Path - -/** - * A [Table] containing the tasks in a WfCommons workload trace. - */ -internal class WfFormatTaskTable(private val factory: JsonFactory, private val path: Path) : Table { - override val name: String = TABLE_TASKS - - override val isSynthetic: Boolean = false - - override val columns: List<TableColumn<*>> = listOf( - TASK_ID, - TASK_WORKFLOW_ID, - TASK_RUNTIME, - TASK_REQ_NCPUS, - TASK_PARENTS, - TASK_CHILDREN - ) - - override val partitionKeys: List<TableColumn<*>> = emptyList() - - override fun newReader(): TableReader { - val parser = factory.createParser(path.toFile()) - return WfFormatTaskTableReader(parser) - } - - override fun newReader(partition: String): TableReader { - throw IllegalArgumentException("Invalid partition $partition") - } - - override fun toString(): String = "WfFormatTaskTable" -} diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTrace.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTrace.kt deleted file mode 100644 index 2d9c79fb..00000000 --- a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTrace.kt +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.wfformat - -import com.fasterxml.jackson.core.JsonFactory -import org.opendc.trace.TABLE_TASKS -import org.opendc.trace.Table -import org.opendc.trace.Trace -import java.nio.file.Path - -/** - * [Trace] implementation for the WfCommons workload trace format. - */ -public class WfFormatTrace internal constructor(private val factory: JsonFactory, private val path: Path) : Trace { - override val tables: List<String> = listOf(TABLE_TASKS) - - override fun containsTable(name: String): Boolean = TABLE_TASKS == name - - override fun getTable(name: String): Table? { - return when (name) { - TABLE_TASKS -> WfFormatTaskTable(factory, path) - else -> null - } - } - - override fun toString(): String = "WfFormatTrace[$path]" -} diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt index ff8d054c..825c3d6d 100644 --- a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt +++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt @@ -23,10 +23,10 @@ package org.opendc.trace.wfformat import com.fasterxml.jackson.core.JsonFactory +import org.opendc.trace.* +import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat -import java.net.URL -import java.nio.file.Paths -import kotlin.io.path.exists +import java.nio.file.Path /** * A [TraceFormat] implementation for the WfCommons workload trace format. @@ -39,9 +39,29 @@ public class WfFormatTraceFormat : TraceFormat { override val name: String = "wfformat" - override fun open(url: URL): WfFormatTrace { - val path = Paths.get(url.toURI()) - require(path.exists()) { "URL $url does not exist" } - return WfFormatTrace(factory, path) + override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS) + + override fun getDetails(path: Path, table: String): TableDetails { + return when (table) { + TABLE_TASKS -> TableDetails( + listOf( + TASK_ID, + TASK_WORKFLOW_ID, + TASK_RUNTIME, + TASK_REQ_NCPUS, + TASK_PARENTS, + TASK_CHILDREN + ), + emptyList() + ) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newReader(path: Path, table: String): TableReader { + return when (table) { + TABLE_TASKS -> WfFormatTaskTableReader(factory.createParser(path.toFile())) + else -> throw IllegalArgumentException("Table $table not supported") + } } } diff --git a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt index 0bfc8840..217b175d 100644 --- a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt @@ -22,59 +22,38 @@ package org.opendc.trace.wfformat +import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow import org.junit.jupiter.api.assertThrows import org.opendc.trace.* -import java.io.File -import java.net.URL +import java.nio.file.Paths /** * Test suite for the [WfFormatTraceFormat] class. */ class WfFormatTraceFormatTest { - @Test - fun testTraceExists() { - val input = File("src/test/resources/trace.json").toURI().toURL() - val format = WfFormatTraceFormat() - assertDoesNotThrow { format.open(input) } - } - - @Test - fun testTraceDoesNotExists() { - val input = File("src/test/resources/trace.json").toURI().toURL() - val format = WfFormatTraceFormat() - assertThrows<IllegalArgumentException> { format.open(URL(input.toString() + "help")) } - } + private val format = WfFormatTraceFormat() @Test fun testTables() { - val input = File("src/test/resources/trace.json").toURI().toURL() - val format = WfFormatTraceFormat() - val trace = format.open(input) + val path = Paths.get("src/test/resources/trace.json") - assertEquals(listOf(TABLE_TASKS), trace.tables) + assertEquals(listOf(TABLE_TASKS), format.getTables(path)) } @Test fun testTableExists() { - val input = File("src/test/resources/trace.json").toURI().toURL() - val format = WfFormatTraceFormat() - val table = format.open(input).getTable(TABLE_TASKS) - - assertNotNull(table) - assertDoesNotThrow { table!!.newReader() } + val path = Paths.get("src/test/resources/trace.json") + Assertions.assertDoesNotThrow { format.getDetails(path, TABLE_TASKS) } } @Test fun testTableDoesNotExist() { - val input = File("src/test/resources/trace.json").toURI().toURL() - val format = WfFormatTraceFormat() - val trace = format.open(input) + val path = Paths.get("src/test/resources/trace.json") - assertFalse(trace.containsTable("test")) - assertNull(trace.getTable("test")) + assertThrows<IllegalArgumentException> { format.getDetails(path, "test") } } /** @@ -82,9 +61,8 @@ class WfFormatTraceFormatTest { */ @Test fun testTableReader() { - val input = File("src/test/resources/trace.json").toURI().toURL() - val trace = WfFormatTraceFormat().open(input) - val reader = trace.getTable(TABLE_TASKS)!!.newReader() + val path = Paths.get("src/test/resources/trace.json") + val reader = format.newReader(path, TABLE_TASKS) assertAll( { assertTrue(reader.nextRow()) }, @@ -110,9 +88,8 @@ class WfFormatTraceFormatTest { */ @Test fun testTableReaderFull() { - val input = File("src/test/resources/trace.json").toURI().toURL() - val trace = WfFormatTraceFormat().open(input) - val reader = trace.getTable(TABLE_TASKS)!!.newReader() + val path = Paths.get("src/test/resources/trace.json") + val reader = format.newReader(path, TABLE_TASKS) assertDoesNotThrow { while (reader.nextRow()) { @@ -121,13 +98,4 @@ class WfFormatTraceFormatTest { reader.close() } } - - @Test - fun testTableReaderPartition() { - val input = File("src/test/resources/trace.json").toURI().toURL() - val format = WfFormatTraceFormat() - val table = format.open(input).getTable(TABLE_TASKS)!! - - assertThrows<IllegalArgumentException> { table.newReader("test") } - } } diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt deleted file mode 100644 index 410bb347..00000000 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.wtf - -import org.apache.avro.generic.GenericRecord -import org.opendc.trace.* -import org.opendc.trace.util.parquet.LocalParquetReader -import java.nio.file.Path - -/** - * A [Table] containing the tasks in a GWF trace. - */ -internal class WtfTaskTable(private val path: Path) : Table { - override val name: String = TABLE_TASKS - - override val isSynthetic: Boolean = false - - override val columns: List<TableColumn<*>> = listOf( - TASK_ID, - TASK_WORKFLOW_ID, - TASK_SUBMIT_TIME, - TASK_WAIT_TIME, - TASK_RUNTIME, - TASK_REQ_NCPUS, - TASK_PARENTS, - TASK_CHILDREN, - TASK_GROUP_ID, - TASK_USER_ID - ) - - override val partitionKeys: List<TableColumn<*>> = listOf(TASK_SUBMIT_TIME) - - override fun newReader(): TableReader { - val reader = LocalParquetReader<GenericRecord>(path.resolve("tasks/schema-1.0")) - return WtfTaskTableReader(reader) - } - - override fun newReader(partition: String): TableReader { - throw IllegalArgumentException("Invalid partition $partition") - } - - override fun toString(): String = "WtfTaskTable" -} diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt index 781cb335..2f17694f 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt @@ -22,10 +22,12 @@ package org.opendc.trace.wtf +import org.apache.avro.generic.GenericRecord +import org.opendc.trace.* +import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat -import java.net.URL -import java.nio.file.Paths -import kotlin.io.path.exists +import org.opendc.trace.util.parquet.LocalParquetReader +import java.nio.file.Path /** * A [TraceFormat] implementation for the Workflow Trace Format (WTF). @@ -33,9 +35,36 @@ import kotlin.io.path.exists public class WtfTraceFormat : TraceFormat { override val name: String = "wtf" - override fun open(url: URL): WtfTrace { - val path = Paths.get(url.toURI()) - require(path.exists()) { "URL $url does not exist" } - return WtfTrace(path) + override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS) + + override fun getDetails(path: Path, table: String): TableDetails { + return when (table) { + TABLE_TASKS -> TableDetails( + listOf( + TASK_ID, + TASK_WORKFLOW_ID, + TASK_SUBMIT_TIME, + TASK_WAIT_TIME, + TASK_RUNTIME, + TASK_REQ_NCPUS, + TASK_PARENTS, + TASK_CHILDREN, + TASK_GROUP_ID, + TASK_USER_ID + ), + listOf(TASK_SUBMIT_TIME) + ) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newReader(path: Path, table: String): TableReader { + return when (table) { + TABLE_TASKS -> { + val reader = LocalParquetReader<GenericRecord>(path.resolve("tasks/schema-1.0")) + WtfTaskTableReader(reader) + } + else -> throw IllegalArgumentException("Table $table not supported") + } } } diff --git a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt index b155f265..09c3703a 100644 --- a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt @@ -26,8 +26,7 @@ import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows import org.opendc.trace.* -import java.io.File -import java.net.URL +import java.nio.file.Paths import java.time.Duration import java.time.Instant @@ -35,51 +34,25 @@ import java.time.Instant * Test suite for the [WtfTraceFormat] class. */ class WtfTraceFormatTest { - @Test - fun testTraceExists() { - val input = File("src/test/resources/wtf-trace").toURI().toURL() - val format = WtfTraceFormat() - org.junit.jupiter.api.assertDoesNotThrow { - format.open(input) - } - } - - @Test - fun testTraceDoesNotExists() { - val input = File("src/test/resources/wtf-trace").toURI().toURL() - val format = WtfTraceFormat() - assertThrows<IllegalArgumentException> { - format.open(URL(input.toString() + "help")) - } - } + private val format = WtfTraceFormat() @Test fun testTables() { - val input = File("src/test/resources/wtf-trace").toURI().toURL() - val format = WtfTraceFormat() - val trace = format.open(input) - - assertEquals(listOf(TABLE_TASKS), trace.tables) + val path = Paths.get("src/test/resources/wtf-trace") + assertEquals(listOf(TABLE_TASKS), format.getTables(path)) } @Test fun testTableExists() { - val input = File("src/test/resources/wtf-trace").toURI().toURL() - val format = WtfTraceFormat() - val table = format.open(input).getTable(TABLE_TASKS) - - assertNotNull(table) - org.junit.jupiter.api.assertDoesNotThrow { table!!.newReader() } + val path = Paths.get("src/test/resources/wtf-trace") + assertDoesNotThrow { format.getDetails(path, TABLE_TASKS) } } @Test fun testTableDoesNotExist() { - val input = File("src/test/resources/wtf-trace").toURI().toURL() - val format = WtfTraceFormat() - val trace = format.open(input) + val path = Paths.get("src/test/resources/wtf-trace") - assertFalse(trace.containsTable("test")) - assertNull(trace.getTable("test")) + assertThrows<IllegalArgumentException> { format.getDetails(path, "test") } } /** @@ -87,9 +60,8 @@ class WtfTraceFormatTest { */ @Test fun testTableReader() { - val input = File("src/test/resources/wtf-trace") - val trace = WtfTraceFormat().open(input.toURI().toURL()) - val reader = trace.getTable(TABLE_TASKS)!!.newReader() + val path = Paths.get("src/test/resources/wtf-trace") + val reader = format.newReader(path, TABLE_TASKS) assertAll( { assertTrue(reader.nextRow()) }, @@ -111,13 +83,4 @@ class WtfTraceFormatTest { reader.close() } - - @Test - fun testTableReaderPartition() { - val input = File("src/test/resources/wtf-trace").toURI().toURL() - val format = WtfTraceFormat() - val table = format.open(input).getTable(TABLE_TASKS)!! - - assertThrows<IllegalArgumentException> { table.newReader("test") } - } } diff --git a/opendc-workflow/opendc-workflow-service/build.gradle.kts b/opendc-workflow/opendc-workflow-service/build.gradle.kts index 941202d2..43b64b15 100644 --- a/opendc-workflow/opendc-workflow-service/build.gradle.kts +++ b/opendc-workflow/opendc-workflow-service/build.gradle.kts @@ -39,7 +39,8 @@ dependencies { testImplementation(projects.opendcSimulator.opendcSimulatorCore) testImplementation(projects.opendcCompute.opendcComputeSimulator) - testImplementation(projects.opendcTrace.opendcTraceGwf) + testImplementation(projects.opendcTrace.opendcTraceApi) testImplementation(projects.opendcTelemetry.opendcTelemetrySdk) + testRuntimeOnly(projects.opendcTrace.opendcTraceGwf) testRuntimeOnly(libs.log4j.slf4j) } diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt index 74316437..728dfd99 100644 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt @@ -44,13 +44,14 @@ import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.SimResourceInterpreter import org.opendc.telemetry.sdk.toOtelClock -import org.opendc.trace.gwf.GwfTraceFormat +import org.opendc.trace.Trace import org.opendc.workflow.service.internal.WorkflowServiceImpl import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy +import java.nio.file.Paths import java.time.Duration import java.util.* @@ -105,7 +106,10 @@ internal class WorkflowServiceTest { taskOrderPolicy = SubmissionTimeTaskOrderPolicy(), ) - val trace = GwfTraceFormat().open(checkNotNull(WorkflowServiceTest::class.java.getResource("/trace.gwf"))) + val trace = Trace.open( + Paths.get(checkNotNull(WorkflowServiceTest::class.java.getResource("/trace.gwf")).toURI()), + format = "gwf" + ) val replayer = TraceReplayer(trace) replayer.replay(clock, scheduler) |
