diff options
42 files changed, 542 insertions, 1462 deletions
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt index ab7f051f..6dba41e6 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt @@ -25,7 +25,6 @@ package org.opendc.compute.workload import mu.KotlinLogging import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.trace.* -import org.opendc.trace.opendc.OdcVmTraceFormat import java.io.File import java.time.Duration import java.time.Instant @@ -45,11 +44,6 @@ public class ComputeWorkloadLoader(private val baseDir: File) { private val logger = KotlinLogging.logger {} /** - * The [OdcVmTraceFormat] instance to load the traces - */ - private val format = OdcVmTraceFormat() - - /** * The cache of workloads. */ private val cache = ConcurrentHashMap<String, List<VirtualMachine>>() @@ -159,7 +153,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) { logger.info { "Loading trace $it at $path" } - val trace = format.open(path.toURI().toURL()) + val trace = Trace.open(path, format = "opendc-vm") val fragments = parseFragments(trace) parseMeta(trace, fragments) } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt index 164f5084..031ee269 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt @@ -32,11 +32,6 @@ public interface Table { public val name: String /** - * A flag to indicate that the table is synthetic (derived from another table). - */ - public val isSynthetic: Boolean - - /** * The list of columns supported in this table. */ public val columns: List<TableColumn<*>> @@ -50,9 +45,4 @@ public interface Table { * Open a [TableReader] for this table. */ public fun newReader(): TableReader - - /** - * Open a [TableReader] for [partition] of the table. - */ - public fun newReader(partition: String): TableReader } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt index 0ae45e86..6d0014cb 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt @@ -22,9 +22,9 @@ package org.opendc.trace +import org.opendc.trace.internal.TraceImpl import org.opendc.trace.spi.TraceFormat import java.io.File -import java.net.URL import java.nio.file.Path /** @@ -48,31 +48,24 @@ public interface Trace { public companion object { /** - * Open a [Trace] at the specified [url] in the given [format]. - * - * @throws IllegalArgumentException if [format] is not supported. - */ - public fun open(url: URL, format: String): Trace { - val provider = requireNotNull(TraceFormat.byName(format)) { "Unknown format $format" } - return provider.open(url) - } - - /** * Open a [Trace] at the specified [path] in the given [format]. * + * @param path The path to the trace. * @throws IllegalArgumentException if [format] is not supported. */ public fun open(path: File, format: String): Trace { - return open(path.toURI().toURL(), format) + return open(path.toPath(), format) } /** * Open a [Trace] at the specified [path] in the given [format]. * + * @param path The [Path] to the trace. * @throws IllegalArgumentException if [format] is not supported. */ public fun open(path: Path, format: String): Trace { - return open(path.toUri().toURL(), format) + val provider = requireNotNull(TraceFormat.byName(format)) { "Unknown format $format" } + return TraceImpl(provider, path) } } } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTrace.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt index 3e5029b4..fd0a0f04 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTrace.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt @@ -20,30 +20,33 @@ * SOFTWARE. */ -package org.opendc.trace.opendc +package org.opendc.trace.internal -import org.opendc.trace.TABLE_RESOURCES -import org.opendc.trace.TABLE_RESOURCE_STATES import org.opendc.trace.Table -import org.opendc.trace.Trace -import java.nio.file.Path +import org.opendc.trace.TableColumn +import org.opendc.trace.TableReader +import java.util.* /** - * A [Trace] in the OpenDC virtual machine trace format. + * Internal implementation of [Table]. */ -public class OdcVmTrace internal constructor(private val path: Path) : Trace { - override val tables: List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) +internal class TableImpl(val trace: TraceImpl, override val name: String) : Table { + /** + * The details of this table. + */ + private val details = trace.format.getDetails(trace.path, name) - override fun containsTable(name: String): Boolean = - name == TABLE_RESOURCES || name == TABLE_RESOURCE_STATES + override val columns: List<TableColumn<*>> + get() = details.columns - override fun getTable(name: String): Table? { - return when (name) { - TABLE_RESOURCES -> OdcVmResourceTable(path) - TABLE_RESOURCE_STATES -> OdcVmResourceStateTable(path) - else -> null - } - } + override val partitionKeys: List<TableColumn<*>> + get() = details.partitionKeys - override fun toString(): String = "OdcVmTrace[$path]" + override fun newReader(): TableReader = trace.format.newReader(trace.path, name) + + override fun toString(): String = "Table[name=$name]" + + override fun hashCode(): Int = Objects.hash(trace, name) + + override fun equals(other: Any?): Boolean = other is TableImpl && trace == other.trace && name == other.name } diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TraceImpl.kt index a755a107..fd9536ab 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TraceImpl.kt @@ -20,28 +20,37 @@ * SOFTWARE. */ -package org.opendc.trace.wtf +package org.opendc.trace.internal -import org.opendc.trace.TABLE_TASKS import org.opendc.trace.Table import org.opendc.trace.Trace +import org.opendc.trace.spi.TraceFormat import java.nio.file.Path +import java.util.* +import java.util.concurrent.ConcurrentHashMap /** - * [Trace] implementation for the WTF format. + * Internal implementation of the [Trace] interface. */ -public class WtfTrace internal constructor(private val path: Path) : Trace { - override val tables: List<String> = listOf(TABLE_TASKS) +internal class TraceImpl(val format: TraceFormat, val path: Path) : Trace { + /** + * A map containing the [TableImpl] instances associated with the trace. + */ + private val tableMap = ConcurrentHashMap<String, TableImpl>() - override fun containsTable(name: String): Boolean = TABLE_TASKS == name + override val tables: List<String> = format.getTables(path) - override fun getTable(name: String): Table? { - if (!containsTable(name)) { - return null + init { + for (table in tables) { + tableMap.computeIfAbsent(table) { TableImpl(this, it) } } - - return WtfTaskTable(path) } - override fun toString(): String = "WtfTrace[$path]" + override fun containsTable(name: String): Boolean = tableMap.containsKey(name) + + override fun getTable(name: String): Table? = tableMap[name] + + override fun hashCode(): Int = Objects.hash(format, path) + + override fun equals(other: Any?): Boolean = other is TraceImpl && format == other.format && path == other.path } diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTrace.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TableDetails.kt index d4da735e..1a9b9ee1 100644 --- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTrace.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TableDetails.kt @@ -20,27 +20,18 @@ * SOFTWARE. */ -package org.opendc.trace.swf +package org.opendc.trace.spi -import org.opendc.trace.TABLE_TASKS import org.opendc.trace.Table -import org.opendc.trace.Trace -import java.nio.file.Path +import org.opendc.trace.TableColumn /** - * [Trace] implementation for the SWF format. + * A class used by the [TraceFormat] interface for describing the metadata of a [Table]. + * + * @param columns The available columns in the table. + * @param partitionKeys The table columns that act as partition keys for the table. */ -public class SwfTrace internal constructor(private val path: Path) : Trace { - override val tables: List<String> = listOf(TABLE_TASKS) - - override fun containsTable(name: String): Boolean = TABLE_TASKS == name - - override fun getTable(name: String): Table? { - if (!containsTable(name)) { - return null - } - return SwfTaskTable(path) - } - - override fun toString(): String = "SwfTrace[$path]" -} +public data class TableDetails( + val columns: List<TableColumn<*>>, + val partitionKeys: List<TableColumn<*>> = emptyList() +) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt index 54029fcf..e04dd948 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt @@ -22,8 +22,8 @@ package org.opendc.trace.spi -import org.opendc.trace.Trace -import java.net.URL +import org.opendc.trace.TableReader +import java.nio.file.Path import java.util.* /** @@ -36,11 +36,32 @@ public interface TraceFormat { public val name: String /** - * Open a new [Trace] with this provider. + * Return the name of the tables available in the trace at the specified [path]. * - * @param url A reference to the trace. + * @param path The path to the trace. + * @return The list of tables available in the trace. */ - public fun open(url: URL): Trace + public fun getTables(path: Path): List<String> + + /** + * Return the details of [table] in the trace at the specified [path]. + * + * @param path The path to the trace. + * @param table The name of the table to obtain the details for. + * @throws IllegalArgumentException If [table] does not exist. + * @return The [TableDetails] for the specified [table]. + */ + public fun getDetails(path: Path, table: String): TableDetails + + /** + * Open a [TableReader] for the specified [table]. + * + * @param path The path to the trace to open. + * @param table The name of the table to open a [TableReader] for. + * @throws IllegalArgumentException If [table] does not exist. + * @return A [TableReader] instance for the table. + */ + public fun newReader(path: Path, table: String): TableReader /** * A helper object for resolving providers. @@ -49,6 +70,7 @@ public interface TraceFormat { /** * A list of [TraceFormat] that are available on this system. */ + @JvmStatic public val installedProviders: List<TraceFormat> by lazy { val loader = ServiceLoader.load(TraceFormat::class.java) loader.toList() @@ -57,6 +79,7 @@ public interface TraceFormat { /** * Obtain a [TraceFormat] implementation by [name]. */ + @JvmStatic public fun byName(name: String): TraceFormat? = installedProviders.find { it.name == name } } } diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt deleted file mode 100644 index 285e7216..00000000 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.azure - -import com.fasterxml.jackson.dataformat.csv.CsvFactory -import org.opendc.trace.* -import org.opendc.trace.util.CompositeTableReader -import java.nio.file.Files -import java.nio.file.Path -import java.util.stream.Collectors -import kotlin.io.path.extension -import kotlin.io.path.nameWithoutExtension - -/** - * The resource state [Table] for the Azure v1 VM traces. - */ -internal class AzureResourceStateTable(private val factory: CsvFactory, path: Path) : Table { - /** - * The partitions that belong to the table. - */ - private val partitions = Files.walk(path.resolve("vm_cpu_readings"), 1) - .filter { !Files.isDirectory(it) && it.extension == "csv" } - .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) - .toSortedMap() - - override val name: String = TABLE_RESOURCE_STATES - - override val isSynthetic: Boolean = false - - override val columns: List<TableColumn<*>> = listOf( - RESOURCE_ID, - RESOURCE_STATE_TIMESTAMP, - RESOURCE_STATE_CPU_USAGE_PCT - ) - - override val partitionKeys: List<TableColumn<*>> = listOf(RESOURCE_STATE_TIMESTAMP) - - override fun newReader(): TableReader { - val it = partitions.iterator() - - return object : CompositeTableReader() { - override fun nextReader(): TableReader? { - return if (it.hasNext()) { - val (_, path) = it.next() - return AzureResourceStateTableReader(factory.createParser(path.toFile())) - } else { - null - } - } - - override fun toString(): String = "AzureCompositeTableReader" - } - } - - override fun newReader(partition: String): TableReader { - val path = requireNotNull(partitions[partition]) { "Invalid partition $partition" } - return AzureResourceStateTableReader(factory.createParser(path.toFile())) - } - - override fun toString(): String = "AzureResourceStateTable" -} diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTable.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTable.kt deleted file mode 100644 index ff7af172..00000000 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTable.kt +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.azure - -import com.fasterxml.jackson.dataformat.csv.CsvFactory -import org.opendc.trace.* -import java.nio.file.Path - -/** - * The resource [Table] for the Azure v1 VM traces. - */ -internal class AzureResourceTable(private val factory: CsvFactory, private val path: Path) : Table { - override val name: String = TABLE_RESOURCES - - override val isSynthetic: Boolean = false - - override val columns: List<TableColumn<*>> = listOf( - RESOURCE_ID, - RESOURCE_START_TIME, - RESOURCE_STOP_TIME, - RESOURCE_CPU_COUNT, - RESOURCE_MEM_CAPACITY - ) - - override val partitionKeys: List<TableColumn<*>> = emptyList() - - override fun newReader(): TableReader { - return AzureResourceTableReader(factory.createParser(path.resolve("vmtable/vmtable.csv").toFile())) - } - - override fun newReader(partition: String): TableReader { - throw IllegalArgumentException("No partition $partition") - } - - override fun toString(): String = "AzureResourceTable" -} diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTrace.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTrace.kt deleted file mode 100644 index c7e7dc36..00000000 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTrace.kt +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.azure - -import com.fasterxml.jackson.dataformat.csv.CsvFactory -import org.opendc.trace.* -import java.nio.file.Path - -/** - * [Trace] implementation for the Azure v1 VM traces. - */ -public class AzureTrace internal constructor(private val factory: CsvFactory, private val path: Path) : Trace { - override val tables: List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) - - override fun containsTable(name: String): Boolean = name in tables - - override fun getTable(name: String): Table? { - return when (name) { - TABLE_RESOURCES -> AzureResourceTable(factory, path) - TABLE_RESOURCE_STATES -> AzureResourceStateTable(factory, path) - else -> null - } - } - - override fun toString(): String = "AzureTrace[$path]" -} diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt index 1230d857..77af0d81 100644 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt +++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt @@ -24,10 +24,15 @@ package org.opendc.trace.azure import com.fasterxml.jackson.dataformat.csv.CsvFactory import com.fasterxml.jackson.dataformat.csv.CsvParser +import org.opendc.trace.* +import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat -import java.net.URL -import java.nio.file.Paths -import kotlin.io.path.exists +import org.opendc.trace.util.CompositeTableReader +import java.nio.file.Files +import java.nio.file.Path +import java.util.stream.Collectors +import kotlin.io.path.extension +import kotlin.io.path.nameWithoutExtension /** * A format implementation for the Azure v1 format. @@ -45,12 +50,60 @@ public class AzureTraceFormat : TraceFormat { .enable(CsvParser.Feature.ALLOW_COMMENTS) .enable(CsvParser.Feature.TRIM_SPACES) + override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) + + override fun getDetails(path: Path, table: String): TableDetails { + return when (table) { + TABLE_RESOURCES -> TableDetails( + listOf( + RESOURCE_ID, + RESOURCE_START_TIME, + RESOURCE_STOP_TIME, + RESOURCE_CPU_COUNT, + RESOURCE_MEM_CAPACITY + ) + ) + TABLE_RESOURCE_STATES -> TableDetails( + listOf( + RESOURCE_ID, + RESOURCE_STATE_TIMESTAMP, + RESOURCE_STATE_CPU_USAGE_PCT + ), + listOf(RESOURCE_STATE_TIMESTAMP) + ) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newReader(path: Path, table: String): TableReader { + return when (table) { + TABLE_RESOURCES -> AzureResourceTableReader(factory.createParser(path.resolve("vmtable/vmtable.csv").toFile())) + TABLE_RESOURCE_STATES -> newResourceStateReader(path) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + /** - * Open the trace file. + * Construct a [TableReader] for reading over all VM CPU readings. */ - override fun open(url: URL): AzureTrace { - val path = Paths.get(url.toURI()) - require(path.exists()) { "URL $url does not exist" } - return AzureTrace(factory, path) + private fun newResourceStateReader(path: Path): TableReader { + val partitions = Files.walk(path.resolve("vm_cpu_readings"), 1) + .filter { !Files.isDirectory(it) && it.extension == "csv" } + .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) + .toSortedMap() + val it = partitions.iterator() + + return object : CompositeTableReader() { + override fun nextReader(): TableReader? { + return if (it.hasNext()) { + val (_, partPath) = it.next() + return AzureResourceStateTableReader(factory.createParser(partPath.toFile())) + } else { + null + } + } + + override fun toString(): String = "AzureCompositeTableReader" + } } } diff --git a/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt b/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt index 2c1a2125..b73bb728 100644 --- a/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt @@ -26,8 +26,7 @@ import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows import org.opendc.trace.* -import java.io.File -import java.net.URL +import java.nio.file.Paths /** * Test suite for the [AzureTraceFormat] class. @@ -36,54 +35,29 @@ class AzureTraceFormatTest { private val format = AzureTraceFormat() @Test - fun testTraceExists() { - val url = File("src/test/resources/trace").toURI().toURL() - assertDoesNotThrow { - format.open(url) - } - } - - @Test - fun testTraceDoesNotExists() { - val url = File("src/test/resources/trace").toURI().toURL() - assertThrows<IllegalArgumentException> { - format.open(URL(url.toString() + "help")) - } - } - - @Test fun testTables() { - val url = File("src/test/resources/trace").toURI().toURL() - val trace = format.open(url) + val path = Paths.get("src/test/resources/trace") - assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), trace.tables) + assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), format.getTables(path)) } @Test fun testTableExists() { - val url = File("src/test/resources/trace").toURI().toURL() - val table = format.open(url).getTable(TABLE_RESOURCE_STATES) + val path = Paths.get("src/test/resources/trace") - assertNotNull(table) - assertDoesNotThrow { table!!.newReader() } + assertDoesNotThrow { format.getDetails(path, TABLE_RESOURCE_STATES) } } @Test fun testTableDoesNotExist() { - val url = File("src/test/resources/trace").toURI().toURL() - val trace = format.open(url) - - assertFalse(trace.containsTable("test")) - assertNull(trace.getTable("test")) + val path = Paths.get("src/test/resources/trace") + assertThrows<IllegalArgumentException> { format.getDetails(path, "test") } } @Test fun testResources() { - val url = File("src/test/resources/trace").toURI().toURL() - val trace = format.open(url) - - val reader = trace.getTable(TABLE_RESOURCES)!!.newReader() - + val path = Paths.get("src/test/resources/trace") + val reader = format.newReader(path, TABLE_RESOURCES) assertAll( { assertTrue(reader.nextRow()) }, { assertEquals("x/XsOfHO4ocsV99i4NluqKDuxctW2MMVmwqOPAlg4wp8mqbBOe3wxBlQo0+Qx+uf", reader.get(RESOURCE_ID)) }, @@ -96,10 +70,8 @@ class AzureTraceFormatTest { @Test fun testSmoke() { - val url = File("src/test/resources/trace").toURI().toURL() - val trace = format.open(url) - - val reader = trace.getTable(TABLE_RESOURCE_STATES)!!.newReader() + val path = Paths.get("src/test/resources/trace") + val reader = format.newReader(path, TABLE_RESOURCE_STATES) assertAll( { assertTrue(reader.nextRow()) }, diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt deleted file mode 100644 index 7cb58226..00000000 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.bitbrains - -import org.opendc.trace.* -import org.opendc.trace.util.CompositeTableReader -import java.nio.file.Files -import java.nio.file.Path -import java.util.stream.Collectors -import kotlin.io.path.bufferedReader -import kotlin.io.path.extension -import kotlin.io.path.nameWithoutExtension - -/** - * The resource state [Table] in the extended Bitbrains format. - */ -internal class BitbrainsExResourceStateTable(path: Path) : Table { - /** - * The partitions that belong to the table. - */ - private val partitions = Files.walk(path, 1) - .filter { !Files.isDirectory(it) && it.extension == "txt" } - .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) - .toSortedMap() - - override val name: String = TABLE_RESOURCE_STATES - - override val isSynthetic: Boolean = false - - override val columns: List<TableColumn<*>> = listOf( - RESOURCE_ID, - RESOURCE_CLUSTER_ID, - RESOURCE_STATE_TIMESTAMP, - RESOURCE_CPU_COUNT, - RESOURCE_CPU_CAPACITY, - RESOURCE_STATE_CPU_USAGE, - RESOURCE_STATE_CPU_USAGE_PCT, - RESOURCE_STATE_CPU_DEMAND, - RESOURCE_STATE_CPU_READY_PCT, - RESOURCE_MEM_CAPACITY, - RESOURCE_STATE_DISK_READ, - RESOURCE_STATE_DISK_WRITE, - ) - - override val partitionKeys: List<TableColumn<*>> = listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP) - - override fun newReader(): TableReader { - val it = partitions.iterator() - - return object : CompositeTableReader() { - override fun nextReader(): TableReader? { - return if (it.hasNext()) { - val (_, path) = it.next() - val reader = path.bufferedReader() - return BitbrainsExResourceStateTableReader(reader) - } else { - null - } - } - - override fun toString(): String = "SvCompositeTableReader" - } - } - - override fun newReader(partition: String): TableReader { - val path = requireNotNull(partitions[partition]) { "Invalid partition $partition" } - val reader = path.bufferedReader() - return BitbrainsExResourceStateTableReader(reader) - } - - override fun toString(): String = "BitbrainsExResourceStateTable" -} diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTrace.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTrace.kt deleted file mode 100644 index f16c493d..00000000 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTrace.kt +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.bitbrains - -import org.opendc.trace.* -import java.nio.file.Path - -/** - * [Trace] implementation for the extended Bitbrains format. - */ -public class BitbrainsExTrace internal constructor(private val path: Path) : Trace { - override val tables: List<String> = listOf(TABLE_RESOURCE_STATES) - - override fun containsTable(name: String): Boolean = TABLE_RESOURCE_STATES == name - - override fun getTable(name: String): Table? { - if (!containsTable(name)) { - return null - } - - return BitbrainsExResourceStateTable(path) - } - - override fun toString(): String = "BitbrainsExTrace[$path]" -} diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt index 06388a84..080b73de 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt @@ -22,10 +22,16 @@ package org.opendc.trace.bitbrains +import org.opendc.trace.* +import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat -import java.net.URL -import java.nio.file.Paths -import kotlin.io.path.exists +import org.opendc.trace.util.CompositeTableReader +import java.nio.file.Files +import java.nio.file.Path +import java.util.stream.Collectors +import kotlin.io.path.bufferedReader +import kotlin.io.path.extension +import kotlin.io.path.nameWithoutExtension /** * A format implementation for the extended Bitbrains trace format. @@ -36,12 +42,59 @@ public class BitbrainsExTraceFormat : TraceFormat { */ override val name: String = "bitbrains-ex" + override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCE_STATES) + + override fun getDetails(path: Path, table: String): TableDetails { + return when (table) { + TABLE_RESOURCE_STATES -> TableDetails( + listOf( + RESOURCE_ID, + RESOURCE_CLUSTER_ID, + RESOURCE_STATE_TIMESTAMP, + RESOURCE_CPU_COUNT, + RESOURCE_CPU_CAPACITY, + RESOURCE_STATE_CPU_USAGE, + RESOURCE_STATE_CPU_USAGE_PCT, + RESOURCE_STATE_CPU_DEMAND, + RESOURCE_STATE_CPU_READY_PCT, + RESOURCE_MEM_CAPACITY, + RESOURCE_STATE_DISK_READ, + RESOURCE_STATE_DISK_WRITE + ), + listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP) + ) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newReader(path: Path, table: String): TableReader { + return when (table) { + TABLE_RESOURCE_STATES -> newResourceStateReader(path) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + /** - * Open the trace file. + * Construct a [TableReader] for reading over all resource state partitions. */ - override fun open(url: URL): BitbrainsExTrace { - val path = Paths.get(url.toURI()) - require(path.exists()) { "URL $url does not exist" } - return BitbrainsExTrace(path) + private fun newResourceStateReader(path: Path): TableReader { + val partitions = Files.walk(path, 1) + .filter { !Files.isDirectory(it) && it.extension == "txt" } + .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) + .toSortedMap() + val it = partitions.iterator() + + return object : CompositeTableReader() { + override fun nextReader(): TableReader? { + return if (it.hasNext()) { + val (_, partPath) = it.next() + return BitbrainsExResourceStateTableReader(partPath.bufferedReader()) + } else { + null + } + } + + override fun toString(): String = "BitbrainsExCompositeTableReader" + } } } diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt deleted file mode 100644 index 7b08b8be..00000000 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.bitbrains - -import com.fasterxml.jackson.dataformat.csv.CsvFactory -import org.opendc.trace.* -import org.opendc.trace.util.CompositeTableReader -import java.nio.file.Files -import java.nio.file.Path -import java.util.stream.Collectors -import kotlin.io.path.extension -import kotlin.io.path.nameWithoutExtension - -/** - * The resource state [Table] in the Bitbrains format. - */ -internal class BitbrainsResourceStateTable(private val factory: CsvFactory, path: Path) : Table { - /** - * The partitions that belong to the table. - */ - private val partitions = - Files.walk(path, 1) - .filter { !Files.isDirectory(it) && it.extension == "csv" } - .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) - .toSortedMap() - - override val name: String = TABLE_RESOURCE_STATES - - override val isSynthetic: Boolean = false - - override val columns: List<TableColumn<*>> = listOf( - RESOURCE_ID, - RESOURCE_STATE_TIMESTAMP, - RESOURCE_CPU_COUNT, - RESOURCE_CPU_CAPACITY, - RESOURCE_STATE_CPU_USAGE, - RESOURCE_STATE_CPU_USAGE_PCT, - RESOURCE_MEM_CAPACITY, - RESOURCE_STATE_MEM_USAGE, - RESOURCE_STATE_DISK_READ, - RESOURCE_STATE_DISK_WRITE, - RESOURCE_STATE_NET_RX, - RESOURCE_STATE_NET_TX, - ) - - override val partitionKeys: List<TableColumn<*>> = listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP) - - override fun newReader(): TableReader { - val it = partitions.iterator() - - return object : CompositeTableReader() { - override fun nextReader(): TableReader? { - return if (it.hasNext()) { - val (partition, path) = it.next() - return BitbrainsResourceStateTableReader(partition, factory.createParser(path.toFile())) - } else { - null - } - } - - override fun toString(): String = "BitbrainsCompositeTableReader" - } - } - - override fun newReader(partition: String): TableReader { - val path = requireNotNull(partitions[partition]) { "Invalid partition $partition" } - return BitbrainsResourceStateTableReader(partition, factory.createParser(path.toFile())) - } - - override fun toString(): String = "BitbrainsResourceStateTable" -} diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTable.kt deleted file mode 100644 index d024af2d..00000000 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTable.kt +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.bitbrains - -import com.fasterxml.jackson.dataformat.csv.CsvFactory -import org.opendc.trace.* -import java.nio.file.Files -import java.nio.file.Path -import java.util.stream.Collectors -import kotlin.io.path.extension -import kotlin.io.path.nameWithoutExtension - -/** - * The resources [Table] in the Bitbrains format. - */ -internal class BitbrainsResourceTable(private val factory: CsvFactory, path: Path) : Table { - /** - * The VMs that belong to the table. - */ - private val vms = - Files.walk(path, 1) - .filter { !Files.isDirectory(it) && it.extension == "csv" } - .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) - .toSortedMap() - - override val name: String = TABLE_RESOURCES - - override val isSynthetic: Boolean = true - - override val columns: List<TableColumn<*>> = listOf(RESOURCE_ID) - - override val partitionKeys: List<TableColumn<*>> = emptyList() - - override fun newReader(): TableReader { - return BitbrainsResourceTableReader(factory, vms) - } - - override fun newReader(partition: String): TableReader { - throw IllegalArgumentException("Unknown partition $partition") - } - - override fun toString(): String = "BitbrainsResourceTable" -} diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt deleted file mode 100644 index bcd8dd52..00000000 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.bitbrains - -import com.fasterxml.jackson.dataformat.csv.CsvFactory -import org.opendc.trace.* -import java.nio.file.Path - -/** - * [Trace] implementation for the Bitbrains format. - */ -public class BitbrainsTrace internal constructor(private val factory: CsvFactory, private val path: Path) : Trace { - override val tables: List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) - - override fun containsTable(name: String): Boolean = tables.contains(name) - - override fun getTable(name: String): Table? { - return when (name) { - TABLE_RESOURCES -> BitbrainsResourceTable(factory, path) - TABLE_RESOURCE_STATES -> BitbrainsResourceStateTable(factory, path) - else -> null - } - } - - override fun toString(): String = "BitbrainsTrace[$path]" -} diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt index 55b11fe3..1573726f 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt @@ -24,10 +24,15 @@ package org.opendc.trace.bitbrains import com.fasterxml.jackson.dataformat.csv.CsvFactory import com.fasterxml.jackson.dataformat.csv.CsvParser +import org.opendc.trace.* +import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat -import java.net.URL -import java.nio.file.Paths -import kotlin.io.path.exists +import org.opendc.trace.util.CompositeTableReader +import java.nio.file.Files +import java.nio.file.Path +import java.util.stream.Collectors +import kotlin.io.path.extension +import kotlin.io.path.nameWithoutExtension /** * A format implementation for the GWF trace format. @@ -45,12 +50,67 @@ public class BitbrainsTraceFormat : TraceFormat { .enable(CsvParser.Feature.ALLOW_COMMENTS) .enable(CsvParser.Feature.TRIM_SPACES) + override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) + + override fun getDetails(path: Path, table: String): TableDetails { + return when (table) { + TABLE_RESOURCES -> TableDetails(listOf(RESOURCE_ID)) + TABLE_RESOURCE_STATES -> TableDetails( + listOf( + RESOURCE_ID, + RESOURCE_STATE_TIMESTAMP, + RESOURCE_CPU_COUNT, + RESOURCE_CPU_CAPACITY, + RESOURCE_STATE_CPU_USAGE, + RESOURCE_STATE_CPU_USAGE_PCT, + RESOURCE_MEM_CAPACITY, + RESOURCE_STATE_MEM_USAGE, + RESOURCE_STATE_DISK_READ, + RESOURCE_STATE_DISK_WRITE, + RESOURCE_STATE_NET_RX, + RESOURCE_STATE_NET_TX, + ), + listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP) + ) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newReader(path: Path, table: String): TableReader { + return when (table) { + TABLE_RESOURCES -> { + val vms = Files.walk(path, 1) + .filter { !Files.isDirectory(it) && it.extension == "csv" } + .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) + .toSortedMap() + BitbrainsResourceTableReader(factory, vms) + } + TABLE_RESOURCE_STATES -> newResourceStateReader(path) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + /** - * Open a Bitbrains trace. + * Construct a [TableReader] for reading over all resource state partitions. */ - override fun open(url: URL): BitbrainsTrace { - val path = Paths.get(url.toURI()) - require(path.exists()) { "URL $url does not exist" } - return BitbrainsTrace(factory, path) + private fun newResourceStateReader(path: Path): TableReader { + val partitions = Files.walk(path, 1) + .filter { !Files.isDirectory(it) && it.extension == "csv" } + .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) + .toSortedMap() + val it = partitions.iterator() + + return object : CompositeTableReader() { + override fun nextReader(): TableReader? { + return if (it.hasNext()) { + val (partition, partPath) = it.next() + return BitbrainsResourceStateTableReader(partition, factory.createParser(partPath.toFile())) + } else { + null + } + } + + override fun toString(): String = "BitbrainsCompositeTableReader" + } } } diff --git a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt index 2e4f176a..d734cf5f 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt @@ -26,62 +26,38 @@ import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows import org.opendc.trace.* -import java.net.URL +import java.nio.file.Paths /** * Test suite for the [BitbrainsExTraceFormat] class. */ -class BitbrainsExTraceFormatTest { +internal class BitbrainsExTraceFormatTest { private val format = BitbrainsExTraceFormat() @Test - fun testTraceExists() { - val url = checkNotNull(BitbrainsExTraceFormatTest::class.java.getResource("/vm.txt")) - assertDoesNotThrow { - format.open(url) - } - } - - @Test - fun testTraceDoesNotExists() { - val url = checkNotNull(BitbrainsExTraceFormatTest::class.java.getResource("/vm.txt")) - assertThrows<IllegalArgumentException> { - format.open(URL(url.toString() + "help")) - } - } - - @Test fun testTables() { - val url = checkNotNull(BitbrainsExTraceFormatTest::class.java.getResource("/vm.txt")) - val trace = format.open(url) + val path = Paths.get("src/test/resources/vm.txt") - assertEquals(listOf(TABLE_RESOURCE_STATES), trace.tables) + assertEquals(listOf(TABLE_RESOURCE_STATES), format.getTables(path)) } @Test fun testTableExists() { - val url = checkNotNull(BitbrainsExTraceFormatTest::class.java.getResource("/vm.txt")) - val table = format.open(url).getTable(TABLE_RESOURCE_STATES) + val path = Paths.get("src/test/resources/vm.txt") - assertNotNull(table) - assertDoesNotThrow { table!!.newReader() } + assertDoesNotThrow { format.getDetails(path, TABLE_RESOURCE_STATES) } } @Test fun testTableDoesNotExist() { - val url = checkNotNull(BitbrainsExTraceFormatTest::class.java.getResource("/vm.txt")) - val trace = format.open(url) - - assertFalse(trace.containsTable("test")) - assertNull(trace.getTable("test")) + val path = Paths.get("src/test/resources/vm.txt") + assertThrows<IllegalArgumentException> { format.getDetails(path, "test") } } @Test fun testSmoke() { - val url = checkNotNull(BitbrainsExTraceFormatTest::class.java.getResource("/vm.txt")) - val trace = format.open(url) - - val reader = trace.getTable(TABLE_RESOURCE_STATES)!!.newReader() + val path = Paths.get("src/test/resources/vm.txt") + val reader = format.newReader(path, TABLE_RESOURCE_STATES) assertAll( { assertTrue(reader.nextRow()) }, diff --git a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt index ff4a33f8..41e7def2 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt @@ -26,66 +26,38 @@ import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows import org.opendc.trace.* -import java.net.URL +import java.nio.file.Paths /** * Test suite for the [BitbrainsTraceFormat] class. */ class BitbrainsTraceFormatTest { - @Test - fun testTraceExists() { - val format = BitbrainsTraceFormat() - val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv")) - assertDoesNotThrow { - format.open(url) - } - } - - @Test - fun testTraceDoesNotExists() { - val format = BitbrainsTraceFormat() - val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv")) - assertThrows<IllegalArgumentException> { - format.open(URL(url.toString() + "help")) - } - } + private val format = BitbrainsTraceFormat() @Test fun testTables() { - val format = BitbrainsTraceFormat() - val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv")) - val trace = format.open(url) + val path = Paths.get("src/test/resources/bitbrains.csv") - assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), trace.tables) + assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), format.getTables(path)) } @Test fun testTableExists() { - val format = BitbrainsTraceFormat() - val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv")) - val table = format.open(url).getTable(TABLE_RESOURCE_STATES) + val path = Paths.get("src/test/resources/bitbrains.csv") - assertNotNull(table) - assertDoesNotThrow { table!!.newReader() } + assertDoesNotThrow { format.getDetails(path, TABLE_RESOURCE_STATES) } } @Test fun testTableDoesNotExist() { - val format = BitbrainsTraceFormat() - val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv")) - val trace = format.open(url) - - assertFalse(trace.containsTable("test")) - assertNull(trace.getTable("test")) + val path = Paths.get("src/test/resources/bitbrains.csv") + assertThrows<IllegalArgumentException> { format.getDetails(path, "test") } } @Test fun testResources() { - val format = BitbrainsTraceFormat() - val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv")) - val trace = format.open(url) - - val reader = trace.getTable(TABLE_RESOURCES)!!.newReader() + val path = Paths.get("src/test/resources/bitbrains.csv") + val reader = format.newReader(path, TABLE_RESOURCES) assertAll( { assertTrue(reader.nextRow()) }, @@ -98,11 +70,8 @@ class BitbrainsTraceFormatTest { @Test fun testSmoke() { - val format = BitbrainsTraceFormat() - val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv")) - val trace = format.open(url) - - val reader = trace.getTable(TABLE_RESOURCE_STATES)!!.newReader() + val path = Paths.get("src/test/resources/bitbrains.csv") + val reader = format.newReader(path, TABLE_RESOURCE_STATES) assertAll( { assertTrue(reader.nextRow()) }, diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt deleted file mode 100644 index ca720de4..00000000 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.gwf - -import com.fasterxml.jackson.dataformat.csv.CsvFactory -import org.opendc.trace.* -import java.net.URL - -/** - * A [Table] containing the tasks in a GWF trace. - */ -internal class GwfTaskTable(private val factory: CsvFactory, private val url: URL) : Table { - override val name: String = TABLE_TASKS - - override val isSynthetic: Boolean = false - - override val columns: List<TableColumn<*>> = listOf( - TASK_WORKFLOW_ID, - TASK_ID, - TASK_SUBMIT_TIME, - TASK_RUNTIME, - TASK_REQ_NCPUS, - TASK_ALLOC_NCPUS, - TASK_PARENTS - ) - - override val partitionKeys: List<TableColumn<*>> = listOf(TASK_WORKFLOW_ID) - - override fun newReader(): TableReader { - return GwfTaskTableReader(factory.createParser(url)) - } - - override fun newReader(partition: String): TableReader { - throw IllegalArgumentException("Invalid partition $partition") - } - - override fun toString(): String = "GwfTaskTable" -} diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTrace.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTrace.kt deleted file mode 100644 index 166c1e56..00000000 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTrace.kt +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.gwf - -import com.fasterxml.jackson.dataformat.csv.CsvFactory -import org.opendc.trace.* -import java.net.URL - -/** - * [Trace] implementation for the GWF format. - */ -public class GwfTrace internal constructor(private val factory: CsvFactory, private val url: URL) : Trace { - override val tables: List<String> = listOf(TABLE_TASKS) - - override fun containsTable(name: String): Boolean = TABLE_TASKS == name - - override fun getTable(name: String): Table? { - if (!containsTable(name)) { - return null - } - - return GwfTaskTable(factory, url) - } - - override fun toString(): String = "GwfTrace[$url]" -} diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt index 6d542503..0f7b9d6e 100644 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt @@ -24,10 +24,10 @@ package org.opendc.trace.gwf import com.fasterxml.jackson.dataformat.csv.CsvFactory import com.fasterxml.jackson.dataformat.csv.CsvParser +import org.opendc.trace.* +import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat -import java.net.URL -import java.nio.file.Paths -import kotlin.io.path.exists +import java.nio.file.Path /** * A [TraceFormat] implementation for the GWF trace format. @@ -45,12 +45,30 @@ public class GwfTraceFormat : TraceFormat { .enable(CsvParser.Feature.ALLOW_COMMENTS) .enable(CsvParser.Feature.TRIM_SPACES) - /** - * Read the tasks in the GWF trace. - */ - public override fun open(url: URL): GwfTrace { - val path = Paths.get(url.toURI()) - require(path.exists()) { "URL $url does not exist" } - return GwfTrace(factory, url) + override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS) + + override fun getDetails(path: Path, table: String): TableDetails { + return when (table) { + TABLE_TASKS -> TableDetails( + listOf( + TASK_WORKFLOW_ID, + TASK_ID, + TASK_SUBMIT_TIME, + TASK_RUNTIME, + TASK_REQ_NCPUS, + TASK_ALLOC_NCPUS, + TASK_PARENTS, + ), + listOf(TASK_WORKFLOW_ID) + ) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newReader(path: Path, table: String): TableReader { + return when (table) { + TABLE_TASKS -> GwfTaskTableReader(factory.createParser(path.toFile())) + else -> throw IllegalArgumentException("Table $table not supported") + } } } diff --git a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt index b209b979..7fe403b2 100644 --- a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt @@ -22,13 +22,10 @@ package org.opendc.trace.gwf +import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.* -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertAll -import org.junit.jupiter.api.assertDoesNotThrow -import org.junit.jupiter.api.assertThrows import org.opendc.trace.* -import java.net.URL +import java.nio.file.Paths import java.time.Duration import java.time.Instant @@ -36,59 +33,32 @@ import java.time.Instant * Test suite for the [GwfTraceFormat] class. */ internal class GwfTraceFormatTest { - @Test - fun testTraceExists() { - val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")) - val format = GwfTraceFormat() - assertDoesNotThrow { - format.open(input) - } - } - - @Test - fun testTraceDoesNotExists() { - val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")) - val format = GwfTraceFormat() - assertThrows<IllegalArgumentException> { - format.open(URL(input.toString() + "help")) - } - } + private val format = GwfTraceFormat() @Test fun testTables() { - val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")) - val format = GwfTraceFormat() - val trace = format.open(input) + val path = Paths.get(checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")).toURI()) - assertEquals(listOf(TABLE_TASKS), trace.tables) + assertEquals(listOf(TABLE_TASKS), format.getTables(path)) } @Test fun testTableExists() { - val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")) - val format = GwfTraceFormat() - val table = format.open(input).getTable(TABLE_TASKS) - - assertNotNull(table) - assertDoesNotThrow { table!!.newReader() } + val path = Paths.get(checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")).toURI()) + assertDoesNotThrow { format.getDetails(path, TABLE_TASKS) } } @Test fun testTableDoesNotExist() { - val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")) - val format = GwfTraceFormat() - val trace = format.open(input) + val path = Paths.get(checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")).toURI()) - assertFalse(trace.containsTable("test")) - assertNull(trace.getTable("test")) + assertThrows<IllegalArgumentException> { format.getDetails(path, "test") } } @Test fun testTableReader() { - val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")) - val format = GwfTraceFormat() - val table = format.open(input).getTable(TABLE_TASKS)!! - val reader = table.newReader() + val path = Paths.get(checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")).toURI()) + val reader = format.newReader(path, TABLE_TASKS) assertAll( { assertTrue(reader.nextRow()) }, @@ -99,13 +69,4 @@ internal class GwfTraceFormatTest { { assertEquals(emptySet<String>(), reader.get(TASK_PARENTS)) }, ) } - - @Test - fun testTableReaderPartition() { - val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")) - val format = GwfTraceFormat() - val table = format.open(input).getTable(TABLE_TASKS)!! - - assertThrows<IllegalArgumentException> { table.newReader("test") } - } } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt deleted file mode 100644 index caacf192..00000000 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.opendc - -import org.apache.avro.generic.GenericRecord -import org.opendc.trace.* -import org.opendc.trace.util.parquet.LocalParquetReader -import java.nio.file.Path - -/** - * The resource state [Table] in the OpenDC virtual machine trace format. - */ -internal class OdcVmResourceStateTable(private val path: Path) : Table { - override val name: String = TABLE_RESOURCE_STATES - override val isSynthetic: Boolean = false - - override val columns: List<TableColumn<*>> = listOf( - RESOURCE_ID, - RESOURCE_STATE_TIMESTAMP, - RESOURCE_STATE_DURATION, - RESOURCE_CPU_COUNT, - RESOURCE_STATE_CPU_USAGE, - ) - - override val partitionKeys: List<TableColumn<*>> = listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP) - - override fun newReader(): TableReader { - val reader = LocalParquetReader<GenericRecord>(path.resolve("trace.parquet")) - return OdcVmResourceStateTableReader(reader) - } - - override fun newReader(partition: String): TableReader { - throw IllegalArgumentException("Unknown partition $partition") - } -} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt deleted file mode 100644 index 653b28b8..00000000 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.opendc - -import org.apache.avro.generic.GenericRecord -import org.opendc.trace.* -import org.opendc.trace.util.parquet.LocalParquetReader -import java.nio.file.Path - -/** - * The resource [Table] for the OpenDC virtual machine trace format. - */ -internal class OdcVmResourceTable(private val path: Path) : Table { - override val name: String = TABLE_RESOURCES - override val isSynthetic: Boolean = false - - override val columns: List<TableColumn<*>> = listOf( - RESOURCE_ID, - RESOURCE_START_TIME, - RESOURCE_STOP_TIME, - RESOURCE_CPU_COUNT, - RESOURCE_MEM_CAPACITY, - ) - - override val partitionKeys: List<TableColumn<*>> = emptyList() - - override fun newReader(): TableReader { - val reader = LocalParquetReader<GenericRecord>(path.resolve("meta.parquet")) - return OdcVmResourceTableReader(reader) - } - - override fun newReader(partition: String): TableReader { - throw IllegalArgumentException("Unknown partition $partition") - } -} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt index 8edba725..29818147 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt @@ -24,11 +24,13 @@ package org.opendc.trace.opendc import org.apache.avro.Schema import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericRecord +import org.opendc.trace.* +import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat +import org.opendc.trace.util.parquet.LocalParquetReader import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA -import java.net.URL -import java.nio.file.Paths -import kotlin.io.path.exists +import java.nio.file.Path /** * A [TraceFormat] implementation of the OpenDC virtual machine trace format. @@ -39,13 +41,45 @@ public class OdcVmTraceFormat : TraceFormat { */ override val name: String = "opendc-vm" - /** - * Open a Bitbrains Parquet trace. - */ - override fun open(url: URL): OdcVmTrace { - val path = Paths.get(url.toURI()) - require(path.exists()) { "URL $url does not exist" } - return OdcVmTrace(path) + override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) + + override fun getDetails(path: Path, table: String): TableDetails { + return when (table) { + TABLE_RESOURCES -> TableDetails( + listOf( + RESOURCE_ID, + RESOURCE_START_TIME, + RESOURCE_STOP_TIME, + RESOURCE_CPU_COUNT, + RESOURCE_MEM_CAPACITY, + ) + ) + TABLE_RESOURCE_STATES -> TableDetails( + listOf( + RESOURCE_ID, + RESOURCE_STATE_TIMESTAMP, + RESOURCE_STATE_DURATION, + RESOURCE_CPU_COUNT, + RESOURCE_STATE_CPU_USAGE, + ), + listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP) + ) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newReader(path: Path, table: String): TableReader { + return when (table) { + TABLE_RESOURCES -> { + val reader = LocalParquetReader<GenericRecord>(path.resolve("meta.parquet")) + OdcVmResourceTableReader(reader) + } + TABLE_RESOURCE_STATES -> { + val reader = LocalParquetReader<GenericRecord>(path.resolve("trace.parquet")) + OdcVmResourceStateTableReader(reader) + } + else -> throw IllegalArgumentException("Table $table not supported") + } } public companion object { diff --git a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt index 9fb6028d..bfe0f881 100644 --- a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt @@ -29,8 +29,7 @@ import org.junit.jupiter.api.assertThrows import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource import org.opendc.trace.* -import java.io.File -import java.net.URL +import java.nio.file.Paths /** * Test suite for the [OdcVmTraceFormat] implementation. @@ -39,52 +38,30 @@ internal class OdcVmTraceFormatTest { private val format = OdcVmTraceFormat() @Test - fun testTraceExists() { - val url = File("src/test/resources/trace-v2.1").toURI().toURL() - assertDoesNotThrow { format.open(url) } - } - - @Test - fun testTraceDoesNotExists() { - val url = File("src/test/resources/trace-v2.1").toURI().toURL() - assertThrows<IllegalArgumentException> { - format.open(URL(url.toString() + "help")) - } - } - - @Test fun testTables() { - val url = File("src/test/resources/trace-v2.1").toURI().toURL() - val trace = format.open(url) + val path = Paths.get("src/test/resources/trace-v2.1") - assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), trace.tables) + assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), format.getTables(path)) } @Test fun testTableExists() { - val url = File("src/test/resources/trace-v2.1").toURI().toURL() - val table = format.open(url).getTable(TABLE_RESOURCE_STATES) + val path = Paths.get("src/test/resources/trace-v2.1") - assertNotNull(table) - assertDoesNotThrow { table!!.newReader() } + assertDoesNotThrow { format.getDetails(path, TABLE_RESOURCE_STATES) } } @Test fun testTableDoesNotExist() { - val url = File("src/test/resources/trace-v2.1").toURI().toURL() - val trace = format.open(url) - - assertFalse(trace.containsTable("test")) - assertNull(trace.getTable("test")) + val path = Paths.get("src/test/resources/trace-v2.1") + assertThrows<IllegalArgumentException> { format.getDetails(path, "test") } } @ParameterizedTest @ValueSource(strings = ["trace-v2.0", "trace-v2.1"]) fun testResources(name: String) { - val url = File("src/test/resources/$name").toURI().toURL() - val trace = format.open(url) - - val reader = trace.getTable(TABLE_RESOURCES)!!.newReader() + val path = Paths.get("src/test/resources/$name") + val reader = format.newReader(path, TABLE_RESOURCES) assertAll( { assertTrue(reader.nextRow()) }, @@ -104,10 +81,8 @@ internal class OdcVmTraceFormatTest { @ParameterizedTest @ValueSource(strings = ["trace-v2.0", "trace-v2.1"]) fun testSmoke(name: String) { - val url = File("src/test/resources/$name").toURI().toURL() - val trace = format.open(url) - - val reader = trace.getTable(TABLE_RESOURCE_STATES)!!.newReader() + val path = Paths.get("src/test/resources/$name") + val reader = format.newReader(path, TABLE_RESOURCE_STATES) assertAll( { assertTrue(reader.nextRow()) }, diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt deleted file mode 100644 index 4898779d..00000000 --- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.swf - -import org.opendc.trace.* -import java.nio.file.Path -import kotlin.io.path.bufferedReader - -/** - * A [Table] containing the tasks in a SWF trace. - */ -internal class SwfTaskTable(private val path: Path) : Table { - override val name: String = TABLE_TASKS - - override val isSynthetic: Boolean = false - - override val columns: List<TableColumn<*>> = listOf( - TASK_ID, - TASK_SUBMIT_TIME, - TASK_WAIT_TIME, - TASK_RUNTIME, - TASK_REQ_NCPUS, - TASK_ALLOC_NCPUS, - TASK_PARENTS, - TASK_STATUS, - TASK_GROUP_ID, - TASK_USER_ID - ) - - override val partitionKeys: List<TableColumn<*>> = emptyList() - - override fun newReader(): TableReader { - val reader = path.bufferedReader() - return SwfTaskTableReader(reader) - } - - override fun newReader(partition: String): TableReader { - throw IllegalArgumentException("Invalid partition $partition") - } - - override fun toString(): String = "SwfTaskTable" -} diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt index 36c3122e..4cb7e49e 100644 --- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt +++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt @@ -22,10 +22,11 @@ package org.opendc.trace.swf +import org.opendc.trace.* +import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat -import java.net.URL -import java.nio.file.Paths -import kotlin.io.path.exists +import java.nio.file.Path +import kotlin.io.path.bufferedReader /** * Support for the Standard Workload Format (SWF) in OpenDC. @@ -35,9 +36,33 @@ import kotlin.io.path.exists public class SwfTraceFormat : TraceFormat { override val name: String = "swf" - override fun open(url: URL): SwfTrace { - val path = Paths.get(url.toURI()) - require(path.exists()) { "URL $url does not exist" } - return SwfTrace(path) + override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS) + + override fun getDetails(path: Path, table: String): TableDetails { + return when (table) { + TABLE_TASKS -> TableDetails( + listOf( + TASK_ID, + TASK_SUBMIT_TIME, + TASK_WAIT_TIME, + TASK_RUNTIME, + TASK_REQ_NCPUS, + TASK_ALLOC_NCPUS, + TASK_PARENTS, + TASK_STATUS, + TASK_GROUP_ID, + TASK_USER_ID + ), + emptyList() + ) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newReader(path: Path, table: String): TableReader { + return when (table) { + TABLE_TASKS -> SwfTaskTableReader(path.bufferedReader()) + else -> throw IllegalArgumentException("Table $table not supported") + } } } diff --git a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt index 828c2bfa..4dcd43f6 100644 --- a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt @@ -27,61 +27,38 @@ import org.junit.jupiter.api.Assertions.* import org.opendc.trace.TABLE_TASKS import org.opendc.trace.TASK_ALLOC_NCPUS import org.opendc.trace.TASK_ID -import java.net.URL +import java.nio.file.Paths /** * Test suite for the [SwfTraceFormat] class. */ internal class SwfTraceFormatTest { - @Test - fun testTraceExists() { - val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")) - val format = SwfTraceFormat() - assertDoesNotThrow { - format.open(input) - } - } - - @Test - fun testTraceDoesNotExists() { - val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")) - val format = SwfTraceFormat() - assertThrows<IllegalArgumentException> { - format.open(URL(input.toString() + "help")) - } - } + private val format = SwfTraceFormat() @Test fun testTables() { - val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")) - val trace = SwfTraceFormat().open(input) + val path = Paths.get(checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")).toURI()) - assertEquals(listOf(TABLE_TASKS), trace.tables) + assertEquals(listOf(TABLE_TASKS), format.getTables(path)) } @Test fun testTableExists() { - val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")) - val table = SwfTraceFormat().open(input).getTable(TABLE_TASKS) - - assertNotNull(table) - assertDoesNotThrow { table!!.newReader() } + val path = Paths.get(checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")).toURI()) + assertDoesNotThrow { format.getDetails(path, TABLE_TASKS) } } @Test fun testTableDoesNotExist() { - val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")) - val trace = SwfTraceFormat().open(input) + val path = Paths.get(checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")).toURI()) - assertFalse(trace.containsTable("test")) - assertNull(trace.getTable("test")) + assertThrows<IllegalArgumentException> { format.getDetails(path, "test") } } @Test fun testReader() { - val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")) - val trace = SwfTraceFormat().open(input) - val reader = trace.getTable(TABLE_TASKS)!!.newReader() + val path = Paths.get(checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")).toURI()) + val reader = format.newReader(path, TABLE_TASKS) assertAll( { assertTrue(reader.nextRow()) }, @@ -94,14 +71,4 @@ internal class SwfTraceFormatTest { reader.close() } - - @Test - fun testReaderPartition() { - val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")) - val trace = SwfTraceFormat().open(input) - - assertThrows<IllegalArgumentException> { - trace.getTable(TABLE_TASKS)!!.newReader("test") - } - } } diff --git a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt index 0b089904..cd5d287f 100644 --- a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt +++ b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt @@ -35,9 +35,6 @@ import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.opendc.trace.* -import org.opendc.trace.azure.AzureTraceFormat -import org.opendc.trace.bitbrains.BitbrainsExTraceFormat -import org.opendc.trace.bitbrains.BitbrainsTraceFormat import org.opendc.trace.opendc.OdcVmTraceFormat import org.opendc.trace.util.parquet.LocalOutputFile import java.io.File @@ -78,11 +75,7 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { * The input format of the trace. */ private val format by option("-f", "--format", help = "input format of trace") - .choice( - "solvinity" to BitbrainsExTraceFormat(), - "bitbrains" to BitbrainsTraceFormat(), - "azure" to AzureTraceFormat() - ) + .choice("bitbrains-ex", "bitbrains", "azure") .required() /** @@ -101,7 +94,7 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { traceParquet.delete() } - val trace = format.open(input.toURI().toURL()) + val trace = Trace.open(input, format = format) logger.info { "Building resources table" } diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt deleted file mode 100644 index 17aeee97..00000000 --- a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.wfformat - -import com.fasterxml.jackson.core.JsonFactory -import org.opendc.trace.* -import java.nio.file.Path - -/** - * A [Table] containing the tasks in a WfCommons workload trace. - */ -internal class WfFormatTaskTable(private val factory: JsonFactory, private val path: Path) : Table { - override val name: String = TABLE_TASKS - - override val isSynthetic: Boolean = false - - override val columns: List<TableColumn<*>> = listOf( - TASK_ID, - TASK_WORKFLOW_ID, - TASK_RUNTIME, - TASK_REQ_NCPUS, - TASK_PARENTS, - TASK_CHILDREN - ) - - override val partitionKeys: List<TableColumn<*>> = emptyList() - - override fun newReader(): TableReader { - val parser = factory.createParser(path.toFile()) - return WfFormatTaskTableReader(parser) - } - - override fun newReader(partition: String): TableReader { - throw IllegalArgumentException("Invalid partition $partition") - } - - override fun toString(): String = "WfFormatTaskTable" -} diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTrace.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTrace.kt deleted file mode 100644 index 2d9c79fb..00000000 --- a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTrace.kt +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.wfformat - -import com.fasterxml.jackson.core.JsonFactory -import org.opendc.trace.TABLE_TASKS -import org.opendc.trace.Table -import org.opendc.trace.Trace -import java.nio.file.Path - -/** - * [Trace] implementation for the WfCommons workload trace format. - */ -public class WfFormatTrace internal constructor(private val factory: JsonFactory, private val path: Path) : Trace { - override val tables: List<String> = listOf(TABLE_TASKS) - - override fun containsTable(name: String): Boolean = TABLE_TASKS == name - - override fun getTable(name: String): Table? { - return when (name) { - TABLE_TASKS -> WfFormatTaskTable(factory, path) - else -> null - } - } - - override fun toString(): String = "WfFormatTrace[$path]" -} diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt index ff8d054c..825c3d6d 100644 --- a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt +++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt @@ -23,10 +23,10 @@ package org.opendc.trace.wfformat import com.fasterxml.jackson.core.JsonFactory +import org.opendc.trace.* +import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat -import java.net.URL -import java.nio.file.Paths -import kotlin.io.path.exists +import java.nio.file.Path /** * A [TraceFormat] implementation for the WfCommons workload trace format. @@ -39,9 +39,29 @@ public class WfFormatTraceFormat : TraceFormat { override val name: String = "wfformat" - override fun open(url: URL): WfFormatTrace { - val path = Paths.get(url.toURI()) - require(path.exists()) { "URL $url does not exist" } - return WfFormatTrace(factory, path) + override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS) + + override fun getDetails(path: Path, table: String): TableDetails { + return when (table) { + TABLE_TASKS -> TableDetails( + listOf( + TASK_ID, + TASK_WORKFLOW_ID, + TASK_RUNTIME, + TASK_REQ_NCPUS, + TASK_PARENTS, + TASK_CHILDREN + ), + emptyList() + ) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newReader(path: Path, table: String): TableReader { + return when (table) { + TABLE_TASKS -> WfFormatTaskTableReader(factory.createParser(path.toFile())) + else -> throw IllegalArgumentException("Table $table not supported") + } } } diff --git a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt index 0bfc8840..217b175d 100644 --- a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt @@ -22,59 +22,38 @@ package org.opendc.trace.wfformat +import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow import org.junit.jupiter.api.assertThrows import org.opendc.trace.* -import java.io.File -import java.net.URL +import java.nio.file.Paths /** * Test suite for the [WfFormatTraceFormat] class. */ class WfFormatTraceFormatTest { - @Test - fun testTraceExists() { - val input = File("src/test/resources/trace.json").toURI().toURL() - val format = WfFormatTraceFormat() - assertDoesNotThrow { format.open(input) } - } - - @Test - fun testTraceDoesNotExists() { - val input = File("src/test/resources/trace.json").toURI().toURL() - val format = WfFormatTraceFormat() - assertThrows<IllegalArgumentException> { format.open(URL(input.toString() + "help")) } - } + private val format = WfFormatTraceFormat() @Test fun testTables() { - val input = File("src/test/resources/trace.json").toURI().toURL() - val format = WfFormatTraceFormat() - val trace = format.open(input) + val path = Paths.get("src/test/resources/trace.json") - assertEquals(listOf(TABLE_TASKS), trace.tables) + assertEquals(listOf(TABLE_TASKS), format.getTables(path)) } @Test fun testTableExists() { - val input = File("src/test/resources/trace.json").toURI().toURL() - val format = WfFormatTraceFormat() - val table = format.open(input).getTable(TABLE_TASKS) - - assertNotNull(table) - assertDoesNotThrow { table!!.newReader() } + val path = Paths.get("src/test/resources/trace.json") + Assertions.assertDoesNotThrow { format.getDetails(path, TABLE_TASKS) } } @Test fun testTableDoesNotExist() { - val input = File("src/test/resources/trace.json").toURI().toURL() - val format = WfFormatTraceFormat() - val trace = format.open(input) + val path = Paths.get("src/test/resources/trace.json") - assertFalse(trace.containsTable("test")) - assertNull(trace.getTable("test")) + assertThrows<IllegalArgumentException> { format.getDetails(path, "test") } } /** @@ -82,9 +61,8 @@ class WfFormatTraceFormatTest { */ @Test fun testTableReader() { - val input = File("src/test/resources/trace.json").toURI().toURL() - val trace = WfFormatTraceFormat().open(input) - val reader = trace.getTable(TABLE_TASKS)!!.newReader() + val path = Paths.get("src/test/resources/trace.json") + val reader = format.newReader(path, TABLE_TASKS) assertAll( { assertTrue(reader.nextRow()) }, @@ -110,9 +88,8 @@ class WfFormatTraceFormatTest { */ @Test fun testTableReaderFull() { - val input = File("src/test/resources/trace.json").toURI().toURL() - val trace = WfFormatTraceFormat().open(input) - val reader = trace.getTable(TABLE_TASKS)!!.newReader() + val path = Paths.get("src/test/resources/trace.json") + val reader = format.newReader(path, TABLE_TASKS) assertDoesNotThrow { while (reader.nextRow()) { @@ -121,13 +98,4 @@ class WfFormatTraceFormatTest { reader.close() } } - - @Test - fun testTableReaderPartition() { - val input = File("src/test/resources/trace.json").toURI().toURL() - val format = WfFormatTraceFormat() - val table = format.open(input).getTable(TABLE_TASKS)!! - - assertThrows<IllegalArgumentException> { table.newReader("test") } - } } diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt deleted file mode 100644 index 410bb347..00000000 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.wtf - -import org.apache.avro.generic.GenericRecord -import org.opendc.trace.* -import org.opendc.trace.util.parquet.LocalParquetReader -import java.nio.file.Path - -/** - * A [Table] containing the tasks in a GWF trace. - */ -internal class WtfTaskTable(private val path: Path) : Table { - override val name: String = TABLE_TASKS - - override val isSynthetic: Boolean = false - - override val columns: List<TableColumn<*>> = listOf( - TASK_ID, - TASK_WORKFLOW_ID, - TASK_SUBMIT_TIME, - TASK_WAIT_TIME, - TASK_RUNTIME, - TASK_REQ_NCPUS, - TASK_PARENTS, - TASK_CHILDREN, - TASK_GROUP_ID, - TASK_USER_ID - ) - - override val partitionKeys: List<TableColumn<*>> = listOf(TASK_SUBMIT_TIME) - - override fun newReader(): TableReader { - val reader = LocalParquetReader<GenericRecord>(path.resolve("tasks/schema-1.0")) - return WtfTaskTableReader(reader) - } - - override fun newReader(partition: String): TableReader { - throw IllegalArgumentException("Invalid partition $partition") - } - - override fun toString(): String = "WtfTaskTable" -} diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt index 781cb335..2f17694f 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt @@ -22,10 +22,12 @@ package org.opendc.trace.wtf +import org.apache.avro.generic.GenericRecord +import org.opendc.trace.* +import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat -import java.net.URL -import java.nio.file.Paths -import kotlin.io.path.exists +import org.opendc.trace.util.parquet.LocalParquetReader +import java.nio.file.Path /** * A [TraceFormat] implementation for the Workflow Trace Format (WTF). @@ -33,9 +35,36 @@ import kotlin.io.path.exists public class WtfTraceFormat : TraceFormat { override val name: String = "wtf" - override fun open(url: URL): WtfTrace { - val path = Paths.get(url.toURI()) - require(path.exists()) { "URL $url does not exist" } - return WtfTrace(path) + override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS) + + override fun getDetails(path: Path, table: String): TableDetails { + return when (table) { + TABLE_TASKS -> TableDetails( + listOf( + TASK_ID, + TASK_WORKFLOW_ID, + TASK_SUBMIT_TIME, + TASK_WAIT_TIME, + TASK_RUNTIME, + TASK_REQ_NCPUS, + TASK_PARENTS, + TASK_CHILDREN, + TASK_GROUP_ID, + TASK_USER_ID + ), + listOf(TASK_SUBMIT_TIME) + ) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newReader(path: Path, table: String): TableReader { + return when (table) { + TABLE_TASKS -> { + val reader = LocalParquetReader<GenericRecord>(path.resolve("tasks/schema-1.0")) + WtfTaskTableReader(reader) + } + else -> throw IllegalArgumentException("Table $table not supported") + } } } diff --git a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt index b155f265..09c3703a 100644 --- a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt @@ -26,8 +26,7 @@ import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows import org.opendc.trace.* -import java.io.File -import java.net.URL +import java.nio.file.Paths import java.time.Duration import java.time.Instant @@ -35,51 +34,25 @@ import java.time.Instant * Test suite for the [WtfTraceFormat] class. */ class WtfTraceFormatTest { - @Test - fun testTraceExists() { - val input = File("src/test/resources/wtf-trace").toURI().toURL() - val format = WtfTraceFormat() - org.junit.jupiter.api.assertDoesNotThrow { - format.open(input) - } - } - - @Test - fun testTraceDoesNotExists() { - val input = File("src/test/resources/wtf-trace").toURI().toURL() - val format = WtfTraceFormat() - assertThrows<IllegalArgumentException> { - format.open(URL(input.toString() + "help")) - } - } + private val format = WtfTraceFormat() @Test fun testTables() { - val input = File("src/test/resources/wtf-trace").toURI().toURL() - val format = WtfTraceFormat() - val trace = format.open(input) - - assertEquals(listOf(TABLE_TASKS), trace.tables) + val path = Paths.get("src/test/resources/wtf-trace") + assertEquals(listOf(TABLE_TASKS), format.getTables(path)) } @Test fun testTableExists() { - val input = File("src/test/resources/wtf-trace").toURI().toURL() - val format = WtfTraceFormat() - val table = format.open(input).getTable(TABLE_TASKS) - - assertNotNull(table) - org.junit.jupiter.api.assertDoesNotThrow { table!!.newReader() } + val path = Paths.get("src/test/resources/wtf-trace") + assertDoesNotThrow { format.getDetails(path, TABLE_TASKS) } } @Test fun testTableDoesNotExist() { - val input = File("src/test/resources/wtf-trace").toURI().toURL() - val format = WtfTraceFormat() - val trace = format.open(input) + val path = Paths.get("src/test/resources/wtf-trace") - assertFalse(trace.containsTable("test")) - assertNull(trace.getTable("test")) + assertThrows<IllegalArgumentException> { format.getDetails(path, "test") } } /** @@ -87,9 +60,8 @@ class WtfTraceFormatTest { */ @Test fun testTableReader() { - val input = File("src/test/resources/wtf-trace") - val trace = WtfTraceFormat().open(input.toURI().toURL()) - val reader = trace.getTable(TABLE_TASKS)!!.newReader() + val path = Paths.get("src/test/resources/wtf-trace") + val reader = format.newReader(path, TABLE_TASKS) assertAll( { assertTrue(reader.nextRow()) }, @@ -111,13 +83,4 @@ class WtfTraceFormatTest { reader.close() } - - @Test - fun testTableReaderPartition() { - val input = File("src/test/resources/wtf-trace").toURI().toURL() - val format = WtfTraceFormat() - val table = format.open(input).getTable(TABLE_TASKS)!! - - assertThrows<IllegalArgumentException> { table.newReader("test") } - } } diff --git a/opendc-workflow/opendc-workflow-service/build.gradle.kts b/opendc-workflow/opendc-workflow-service/build.gradle.kts index 941202d2..43b64b15 100644 --- a/opendc-workflow/opendc-workflow-service/build.gradle.kts +++ b/opendc-workflow/opendc-workflow-service/build.gradle.kts @@ -39,7 +39,8 @@ dependencies { testImplementation(projects.opendcSimulator.opendcSimulatorCore) testImplementation(projects.opendcCompute.opendcComputeSimulator) - testImplementation(projects.opendcTrace.opendcTraceGwf) + testImplementation(projects.opendcTrace.opendcTraceApi) testImplementation(projects.opendcTelemetry.opendcTelemetrySdk) + testRuntimeOnly(projects.opendcTrace.opendcTraceGwf) testRuntimeOnly(libs.log4j.slf4j) } diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt index 74316437..728dfd99 100644 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt @@ -44,13 +44,14 @@ import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.SimResourceInterpreter import org.opendc.telemetry.sdk.toOtelClock -import org.opendc.trace.gwf.GwfTraceFormat +import org.opendc.trace.Trace import org.opendc.workflow.service.internal.WorkflowServiceImpl import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy +import java.nio.file.Paths import java.time.Duration import java.util.* @@ -105,7 +106,10 @@ internal class WorkflowServiceTest { taskOrderPolicy = SubmissionTimeTaskOrderPolicy(), ) - val trace = GwfTraceFormat().open(checkNotNull(WorkflowServiceTest::class.java.getResource("/trace.gwf"))) + val trace = Trace.open( + Paths.get(checkNotNull(WorkflowServiceTest::class.java.getResource("/trace.gwf")).toURI()), + format = "gwf" + ) val replayer = TraceReplayer(trace) replayer.replay(clock, scheduler) |
