diff options
Diffstat (limited to 'opendc-format/src/main/kotlin')
19 files changed, 1592 insertions, 0 deletions
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/EnvironmentReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/EnvironmentReader.kt new file mode 100644 index 00000000..97d6f239 --- /dev/null +++ b/opendc-format/src/main/kotlin/org/opendc/format/environment/EnvironmentReader.kt @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.format.environment + +import java.io.Closeable + +/** + * An interface for reading descriptions of topology environments into memory. + */ +public interface EnvironmentReader : Closeable { + /** + * Read the environment into a list. + */ + public fun read(): List<MachineDef> +} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/MachineDef.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/MachineDef.kt new file mode 100644 index 00000000..9b799cc2 --- /dev/null +++ b/opendc-format/src/main/kotlin/org/opendc/format/environment/MachineDef.kt @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.format.environment + +import org.opendc.simulator.compute.SimMachineModel +import org.opendc.simulator.compute.power.PowerModel +import java.util.* + +public data class MachineDef( + val uid: UUID, + val name: String, + val meta: Map<String, Any>, + val model: SimMachineModel, + val powerModel: PowerModel +) diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Model.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Model.kt new file mode 100644 index 00000000..c313467f --- /dev/null +++ b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Model.kt @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.format.environment.sc18 + +import com.fasterxml.jackson.annotation.JsonSubTypes +import com.fasterxml.jackson.annotation.JsonTypeInfo + +/** + * A topology setup. + * + * @property name The name of the setup. + * @property rooms The rooms in the topology. + */ +internal data class Setup(val name: String, val rooms: List<Room>) + +/** + * A room in a topology. + * + * @property type The type of room in the topology. + * @property objects The objects in the room. + */ +internal data class Room(val type: String, val objects: List<RoomObject>) + +/** + * An object in a [Room]. + * + * @property type The type of the room object. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type") +@JsonSubTypes(value = [JsonSubTypes.Type(name = "RACK", value = RoomObject.Rack::class)]) +internal sealed class RoomObject(val type: String) { + /** + * A rack in a server room. + * + * @property machines The machines in the rack. + */ + internal data class Rack(val machines: List<Machine>) : RoomObject("RACK") +} + +/** + * A machine in the setup that consists of the specified CPU's represented as + * integer identifiers and ethernet speed. + * + * @property cpus The CPUs in the machine represented as integer identifiers. + */ +internal data class Machine(val cpus: List<Int>) diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt new file mode 100644 index 00000000..1f080c2d --- /dev/null +++ b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.format.environment.sc18 + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper +import com.fasterxml.jackson.module.kotlin.readValue +import org.opendc.format.environment.EnvironmentReader +import org.opendc.format.environment.MachineDef +import org.opendc.simulator.compute.SimMachineModel +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.power.ConstantPowerModel +import java.io.InputStream +import java.util.* + +/** + * A parser for the JSON experiment setup files used for the SC18 paper: "A Reference Architecture for Topology + * Schedulers". + * + * @param input The input stream to read from. + * @param mapper The Jackson object mapper to use. + */ +public class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonObjectMapper()) : EnvironmentReader { + /** + * The environment that was read from the file. + */ + private val setup: Setup = mapper.readValue(input) + + /** + * Read the environment. + */ + public override fun read(): List<MachineDef> { + var counter = 0 + return setup.rooms.flatMap { room -> + room.objects.flatMap { roomObject -> + when (roomObject) { + is RoomObject.Rack -> { + roomObject.machines.map { machine -> + val cores = machine.cpus.flatMap { id -> + when (id) { + 1 -> { + val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 4) + List(node.coreCount) { ProcessingUnit(node, it, 4100.0) } + } + 2 -> { + val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 2) + List(node.coreCount) { ProcessingUnit(node, it, 3500.0) } + } + else -> throw IllegalArgumentException("The cpu id $id is not recognized") + } + } + MachineDef( + UUID(0L, counter++.toLong()), + "node-$counter", + emptyMap(), + SimMachineModel(cores, listOf(MemoryUnit("", "", 2300.0, 16000))), + ConstantPowerModel(0.0) + ) + } + } + } + } + } + } + + override fun close() {} +} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Model.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Model.kt new file mode 100644 index 00000000..58af8453 --- /dev/null +++ b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Model.kt @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.format.environment.sc20 + +import com.fasterxml.jackson.annotation.JsonSubTypes +import com.fasterxml.jackson.annotation.JsonTypeInfo + +/** + * A topology setup. + * + * @property name The name of the setup. + * @property rooms The rooms in the topology. + */ +internal data class Setup(val name: String, val rooms: List<Room>) + +/** + * A room in a topology. + * + * @property type The type of room in the topology. + * @property objects The objects in the room. + */ +internal data class Room(val type: String, val objects: List<RoomObject>) + +/** + * An object in a [Room]. + * + * @property type The type of the room object. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type") +@JsonSubTypes(value = [JsonSubTypes.Type(name = "RACK", value = RoomObject.Rack::class)]) +internal sealed class RoomObject(val type: String) { + /** + * A rack in a server room. + * + * @property machines The machines in the rack. + */ + internal data class Rack(val machines: List<Machine>) : RoomObject("RACK") +} + +/** + * A machine in the setup that consists of the specified CPU's represented as + * integer identifiers and ethernet speed. + * + * @property cpus The CPUs in the machine represented as integer identifiers. + * @property memories The memories in the machine represented as integer identifiers. + */ +internal data class Machine(val cpus: List<Int>, val memories: List<Int>) diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt new file mode 100644 index 00000000..cf90da68 --- /dev/null +++ b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt @@ -0,0 +1,120 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.format.environment.sc20 + +import org.opendc.format.environment.EnvironmentReader +import org.opendc.format.environment.MachineDef +import org.opendc.simulator.compute.SimMachineModel +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.power.LinearPowerModel +import java.io.File +import java.io.FileInputStream +import java.io.InputStream +import java.util.* + +/** + * A [EnvironmentReader] for the internal environment format. + * + * @param environmentFile The file describing the physical cluster. + */ +public class Sc20ClusterEnvironmentReader( + private val input: InputStream +) : EnvironmentReader { + + public constructor(file: File) : this(FileInputStream(file)) + + public override fun read(): List<MachineDef> { + var clusterIdCol = 0 + var speedCol = 0 + var numberOfHostsCol = 0 + var memoryPerHostCol = 0 + var coresPerHostCol = 0 + + var clusterIdx: Int = 0 + var clusterId: String + var speed: Double + var numberOfHosts: Int + var memoryPerHost: Long + var coresPerHost: Int + + val nodes = mutableListOf<MachineDef>() + val random = Random(0) + + input.bufferedReader().use { reader -> + reader.lineSequence() + .filter { line -> + // Ignore comments in the file + !line.startsWith("#") && line.isNotBlank() + } + .forEachIndexed { idx, line -> + val values = line.split(";") + + if (idx == 0) { + val header = values.mapIndexed { col, name -> Pair(name.trim(), col) }.toMap() + clusterIdCol = header["ClusterID"]!! + speedCol = header["Speed"]!! + numberOfHostsCol = header["numberOfHosts"]!! + memoryPerHostCol = header["memoryCapacityPerHost"]!! + coresPerHostCol = header["coreCountPerHost"]!! + return@forEachIndexed + } + + clusterIdx++ + clusterId = values[clusterIdCol].trim() + speed = values[speedCol].trim().toDouble() * 1000.0 + numberOfHosts = values[numberOfHostsCol].trim().toInt() + memoryPerHost = values[memoryPerHostCol].trim().toLong() * 1000L + coresPerHost = values[coresPerHostCol].trim().toInt() + + val unknownProcessingNode = ProcessingNode("unknown", "unknown", "unknown", coresPerHost) + val unknownMemoryUnit = MemoryUnit("unknown", "unknown", -1.0, memoryPerHost) + + repeat(numberOfHosts) { + nodes.add( + MachineDef( + UUID(random.nextLong(), random.nextLong()), + "node-$clusterId-$it", + mapOf("cluster" to clusterId), + SimMachineModel( + List(coresPerHost) { coreId -> + ProcessingUnit(unknownProcessingNode, coreId, speed) + }, + listOf(unknownMemoryUnit) + ), + // For now we assume a simple linear load model with an idle draw of ~200W and a maximum + // power draw of 350W. + // Source: https://stackoverflow.com/questions/6128960 + LinearPowerModel(350.0, 200 / 350.0) + ) + ) + } + } + } + + return nodes + } + + override fun close() {} +} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt new file mode 100644 index 00000000..c6a19430 --- /dev/null +++ b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt @@ -0,0 +1,97 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.format.environment.sc20 + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper +import com.fasterxml.jackson.module.kotlin.readValue +import org.opendc.format.environment.EnvironmentReader +import org.opendc.format.environment.MachineDef +import org.opendc.simulator.compute.SimMachineModel +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.power.LinearPowerModel +import java.io.InputStream +import java.util.* + +/** + * A parser for the JSON experiment setup files used for the SC20 paper. + * + * @param input The input stream to read from. + * @param mapper The Jackson object mapper to use. + */ +public class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonObjectMapper()) : EnvironmentReader { + /** + * The environment that was read from the file. + */ + private val setup: Setup = mapper.readValue(input) + + /** + * Read the environment. + */ + public override fun read(): List<MachineDef> { + var counter = 0 + return setup.rooms.flatMap { room -> + room.objects.flatMap { roomObject -> + when (roomObject) { + is RoomObject.Rack -> { + roomObject.machines.map { machine -> + val cores = machine.cpus.flatMap { id -> + when (id) { + 1 -> { + val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 4) + List(node.coreCount) { ProcessingUnit(node, it, 4100.0) } + } + 2 -> { + val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 2) + List(node.coreCount) { ProcessingUnit(node, it, 3500.0) } + } + else -> throw IllegalArgumentException("The cpu id $id is not recognized") + } + } + val memories = machine.memories.map { id -> + when (id) { + 1 -> MemoryUnit("Samsung", "PC DRAM K4A4G045WD", 1600.0, 4_000L) + else -> throw IllegalArgumentException("The cpu id $id is not recognized") + } + } + MachineDef( + UUID(0L, counter++.toLong()), + "node-$counter", + emptyMap(), + SimMachineModel(cores, memories), + // For now we assume a simple linear load model with an idle draw of ~200W and a maximum + // power draw of 350W. + // Source: https://stackoverflow.com/questions/6128960 + LinearPowerModel(350.0, 200 / 350.0) + ) + } + } + } + } + } + } + + override fun close() {} +} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/PerformanceInterferenceModelReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/PerformanceInterferenceModelReader.kt new file mode 100644 index 00000000..f30e64cf --- /dev/null +++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/PerformanceInterferenceModelReader.kt @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.format.trace + +import org.opendc.simulator.compute.interference.PerformanceInterferenceModel +import java.io.Closeable +import kotlin.random.Random + +/** + * An interface for reading descriptions of performance interference models into memory. + */ +public interface PerformanceInterferenceModelReader : Closeable { + /** + * Construct a [PerformanceInterferenceModel]. + */ + public fun construct(random: Random): Map<String, PerformanceInterferenceModel> +} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt new file mode 100644 index 00000000..3ce79d69 --- /dev/null +++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt @@ -0,0 +1,44 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.format.trace + +import java.util.UUID + +/** + * An entry in a workload trace. + * + * @param uid The unique identifier of the entry. + * @param name The name of the entry. + * @param start The start time of the workload. + * @param workload The workload of the entry. + * @param meta The meta-data associated with the workload. + */ +public data class TraceEntry<out T>( + val uid: UUID, + val name: String, + val start: Long, + val workload: T, + val meta: Map<String, Any> +) diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt new file mode 100644 index 00000000..7df1acd3 --- /dev/null +++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.format.trace + +import java.io.Closeable + +/** + * An interface for reading workloads into memory. + * + * This interface must guarantee that the entries are delivered in order of submission time. + * + * @param T The shape of the workloads supported by this reader. + */ +public interface TraceReader<T> : Iterator<TraceEntry<T>>, Closeable diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/VmPlacementReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/VmPlacementReader.kt new file mode 100644 index 00000000..6861affe --- /dev/null +++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/VmPlacementReader.kt @@ -0,0 +1,37 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.format.trace + +import java.io.Closeable + +/** + * An interface for reading VM placement data into memory. + */ +public interface VmPlacementReader : Closeable { + /** + * Construct a map of VMs to clusters. + */ + public fun construct(): Map<String, String> +} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt new file mode 100644 index 00000000..769b2b13 --- /dev/null +++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt @@ -0,0 +1,156 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.format.trace.bitbrains + +import org.opendc.format.trace.TraceEntry +import org.opendc.format.trace.TraceReader +import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL +import org.opendc.simulator.compute.interference.PerformanceInterferenceModel +import org.opendc.simulator.compute.workload.SimTraceWorkload +import org.opendc.simulator.compute.workload.SimWorkload +import java.io.BufferedReader +import java.io.File +import java.io.FileReader +import java.util.* +import kotlin.math.min + +/** + * A [TraceReader] for the public VM workload trace format. + * + * @param traceDirectory The directory of the traces. + * @param performanceInterferenceModel The performance model covering the workload in the VM trace. + */ +public class BitbrainsTraceReader( + traceDirectory: File, + performanceInterferenceModel: PerformanceInterferenceModel +) : TraceReader<SimWorkload> { + /** + * The internal iterator to use for this reader. + */ + private val iterator: Iterator<TraceEntry<SimWorkload>> + + /** + * Initialize the reader. + */ + init { + val entries = mutableMapOf<Long, TraceEntry<SimWorkload>>() + + var timestampCol = 0 + var coreCol = 0 + var cpuUsageCol = 0 + var provisionedMemoryCol = 0 + val traceInterval = 5 * 60 * 1000L + + traceDirectory.walk() + .filterNot { it.isDirectory } + .forEach { vmFile -> + println(vmFile) + val flopsHistory = mutableListOf<SimTraceWorkload.Fragment>() + var vmId = -1L + var cores = -1 + var requiredMemory = -1L + var startTime = -1L + + BufferedReader(FileReader(vmFile)).use { reader -> + reader.lineSequence() + .filter { line -> + // Ignore comments in the trace + !line.startsWith("#") && line.isNotBlank() + } + .forEachIndexed { idx, line -> + val values = line.split(";\t") + + // Parse GWF header + if (idx == 0) { + val header = values.mapIndexed { col, name -> Pair(name.trim(), col) }.toMap() + timestampCol = header["Timestamp [ms]"]!! + coreCol = header["CPU cores"]!! + cpuUsageCol = header["CPU usage [MHZ]"]!! + provisionedMemoryCol = header["Memory capacity provisioned [KB]"]!! + return@forEachIndexed + } + + vmId = vmFile.nameWithoutExtension.trim().toLong() + startTime = min(startTime, values[timestampCol].trim().toLong() - 5 * 60) + cores = values[coreCol].trim().toInt() + val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz + requiredMemory = (values[provisionedMemoryCol].trim().toDouble() / 1000).toLong() + + if (flopsHistory.isEmpty()) { + flopsHistory.add(SimTraceWorkload.Fragment(traceInterval, cpuUsage, cores)) + } else { + if (flopsHistory.last().usage != cpuUsage) { + flopsHistory.add( + SimTraceWorkload.Fragment( + traceInterval, + cpuUsage, + cores + ) + ) + } else { + val oldFragment = flopsHistory.removeAt(flopsHistory.size - 1) + flopsHistory.add( + SimTraceWorkload.Fragment( + oldFragment.duration + traceInterval, + cpuUsage, + cores + ) + ) + } + } + } + } + + val uuid = UUID(0L, vmId) + + val relevantPerformanceInterferenceModelItems = + PerformanceInterferenceModel( + performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId.toString()) } + .toSortedSet() + ) + + val workload = SimTraceWorkload(flopsHistory.asSequence()) + entries[vmId] = TraceEntry( + uuid, + vmId.toString(), + startTime, + workload, + mapOf( + IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems, + "cores" to cores, + "required-memory" to requiredMemory, + "workload" to workload + ) + ) + } + + // Create the entry iterator + iterator = entries.values.sortedBy { it.start }.iterator() + } + + override fun hasNext(): Boolean = iterator.hasNext() + + override fun next(): TraceEntry<SimWorkload> = iterator.next() + + override fun close() {} +} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt new file mode 100644 index 00000000..e68afeb7 --- /dev/null +++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt @@ -0,0 +1,176 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.format.trace.gwf + +import org.opendc.format.trace.TraceEntry +import org.opendc.format.trace.TraceReader +import org.opendc.simulator.compute.workload.SimFlopsWorkload +import org.opendc.workflow.api.Job +import org.opendc.workflow.api.Task +import org.opendc.workflow.api.WORKFLOW_TASK_CORES +import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE +import java.io.BufferedReader +import java.io.File +import java.io.InputStream +import java.util.* +import kotlin.collections.HashSet +import kotlin.collections.Iterator +import kotlin.collections.List +import kotlin.collections.MutableSet +import kotlin.collections.component1 +import kotlin.collections.component2 +import kotlin.collections.filter +import kotlin.collections.forEach +import kotlin.collections.getOrPut +import kotlin.collections.map +import kotlin.collections.mapIndexed +import kotlin.collections.mapOf +import kotlin.collections.mutableMapOf +import kotlin.collections.set +import kotlin.collections.sortedBy +import kotlin.collections.toMap +import kotlin.math.max +import kotlin.math.min + +/** + * A [TraceReader] for the Grid Workload Format. See the Grid Workloads Archive (http://gwa.ewi.tudelft.nl/) for more + * information about the format. + * + * Be aware that in the Grid Workload Format, workflows are not required to be ordered by submission time and therefore + * this reader needs to read the whole trace into memory before an entry can be read. Consider converting the trace to a + * different format for better performance. + * + * @param reader The buffered reader to read the trace with. + */ +public class GwfTraceReader(reader: BufferedReader) : TraceReader<Job> { + /** + * The internal iterator to use for this reader. + */ + private val iterator: Iterator<TraceEntry<Job>> + + /** + * Create a [GwfTraceReader] instance from the specified [File]. + * + * @param file The file to read from. + */ + public constructor(file: File) : this(file.bufferedReader()) + + /** + * Create a [GwfTraceReader] instance from the specified [InputStream]. + * + * @param input The input stream to read from. + */ + public constructor(input: InputStream) : this(input.bufferedReader()) + + /** + * Initialize the reader. + */ + init { + val workflows = mutableMapOf<Long, Job>() + val starts = mutableMapOf<Long, Long>() + val tasks = mutableMapOf<Long, Task>() + val taskDependencies = mutableMapOf<Task, List<Long>>() + + var workflowIdCol = 0 + var taskIdCol = 0 + var submitTimeCol = 0 + var runtimeCol = 0 + var coreCol = 0 + var dependencyCol = 0 + + try { + reader.lineSequence() + .filter { line -> + // Ignore comments in the trace + !line.startsWith("#") && line.isNotBlank() + } + .forEachIndexed { idx, line -> + val values = line.split(",") + + // Parse GWF header + if (idx == 0) { + val header = values.mapIndexed { col, name -> Pair(name.trim(), col) }.toMap() + workflowIdCol = header["WorkflowID"]!! + taskIdCol = header["JobID"]!! + submitTimeCol = header["SubmitTime"]!! + runtimeCol = header["RunTime"]!! + coreCol = header["NProcs"]!! + dependencyCol = header["Dependencies"]!! + return@forEachIndexed + } + + val workflowId = values[workflowIdCol].trim().toLong() + val taskId = values[taskIdCol].trim().toLong() + val submitTime = values[submitTimeCol].trim().toLong() * 1000 // ms + val runtime = max(0, values[runtimeCol].trim().toLong()) // s + val cores = values[coreCol].trim().toInt() + val dependencies = values[dependencyCol].split(" ") + .filter { it.isNotEmpty() } + .map { it.trim().toLong() } + + val flops: Long = 4000 * runtime * cores + + val workflow = workflows.getOrPut(workflowId) { + Job(UUID(0L, workflowId), "<unnamed>", HashSet()) + } + val workload = SimFlopsWorkload(flops) + val task = Task( + UUID(0L, taskId), + "<unnamed>", + HashSet(), + mapOf( + "workload" to workload, + WORKFLOW_TASK_CORES to cores, + WORKFLOW_TASK_DEADLINE to (runtime * 1000) + ), + ) + starts.merge(workflowId, submitTime, ::min) + (workflow.tasks as MutableSet<Task>).add(task) + tasks[taskId] = task + taskDependencies[task] = dependencies + } + } finally { + reader.close() + } + + // Fix dependencies and dependents for all tasks + taskDependencies.forEach { (task, dependencies) -> + (task.dependencies as MutableSet<Task>).addAll( + dependencies.map { taskId -> + tasks[taskId] ?: throw IllegalArgumentException("Dependency task with id $taskId not found") + } + ) + } + + // Create the entry iterator + iterator = workflows.map { (id, job) -> TraceEntry(job.uid, job.name, starts.getValue(id), job, job.metadata) } + .sortedBy { it.start } + .iterator() + } + + override fun hasNext(): Boolean = iterator.hasNext() + + override fun next(): TraceEntry<Job> = iterator.next() + + override fun close() {} +} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/PerformanceInterferenceEntry.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/PerformanceInterferenceEntry.kt new file mode 100644 index 00000000..0da1f7c2 --- /dev/null +++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/PerformanceInterferenceEntry.kt @@ -0,0 +1,7 @@ +package org.opendc.format.trace.sc20 + +internal data class PerformanceInterferenceEntry( + val vms: List<String>, + val minServerLoad: Double, + val performanceScore: Double +) diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt new file mode 100644 index 00000000..4267737d --- /dev/null +++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.format.trace.sc20 + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper +import com.fasterxml.jackson.module.kotlin.readValue +import org.opendc.format.trace.PerformanceInterferenceModelReader +import org.opendc.simulator.compute.interference.PerformanceInterferenceModel +import java.io.InputStream +import java.util.* +import kotlin.random.Random + +/** + * A parser for the JSON performance interference setup files used for the SC20 paper. + * + * @param input The input stream to read from. + * @param mapper The Jackson object mapper to use. + */ +public class Sc20PerformanceInterferenceReader(input: InputStream, mapper: ObjectMapper = jacksonObjectMapper()) : + PerformanceInterferenceModelReader { + /** + * The computed value from the file. + */ + private val items: Map<String, TreeSet<PerformanceInterferenceModel.Item>> + + init { + val entries: List<PerformanceInterferenceEntry> = mapper.readValue(input) + val res = mutableMapOf<String, TreeSet<PerformanceInterferenceModel.Item>>() + for (entry in entries) { + val item = PerformanceInterferenceModel.Item(TreeSet(entry.vms), entry.minServerLoad, entry.performanceScore) + for (workload in entry.vms) { + res.computeIfAbsent(workload) { TreeSet() }.add(item) + } + } + + items = res + } + + override fun construct(random: Random): Map<String, PerformanceInterferenceModel> { + return items.mapValues { PerformanceInterferenceModel(it.value, Random(random.nextInt())) } + } + + override fun close() {} +} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt new file mode 100644 index 00000000..1eb4bac2 --- /dev/null +++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt @@ -0,0 +1,181 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.format.trace.sc20 + +import org.opendc.format.trace.TraceEntry +import org.opendc.format.trace.TraceReader +import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL +import org.opendc.simulator.compute.interference.PerformanceInterferenceModel +import org.opendc.simulator.compute.workload.SimTraceWorkload +import org.opendc.simulator.compute.workload.SimWorkload +import java.io.BufferedReader +import java.io.File +import java.io.FileReader +import java.util.* +import kotlin.math.max +import kotlin.math.min +import kotlin.random.Random + +/** + * A [TraceReader] for the internal VM workload trace format. + * + * @param traceDirectory The directory of the traces. + * @param performanceInterferenceModel The performance model covering the workload in the VM trace. + */ +public class Sc20TraceReader( + traceDirectory: File, + performanceInterferenceModel: PerformanceInterferenceModel, + selectedVms: List<String>, + random: Random +) : TraceReader<SimWorkload> { + /** + * The internal iterator to use for this reader. + */ + private val iterator: Iterator<TraceEntry<SimWorkload>> + + /** + * Initialize the reader. + */ + init { + val entries = mutableMapOf<UUID, TraceEntry<SimWorkload>>() + + val timestampCol = 0 + val cpuUsageCol = 1 + val coreCol = 12 + val provisionedMemoryCol = 20 + val traceInterval = 5 * 60 * 1000L + + val vms = if (selectedVms.isEmpty()) { + traceDirectory.walk() + .filterNot { it.isDirectory } + .filter { it.extension == "csv" || it.extension == "txt" } + .toList() + } else { + selectedVms.map { + File(traceDirectory, it) + } + } + + vms + .forEachIndexed { idx, vmFile -> + println(vmFile) + + var vmId = "" + var maxCores = -1 + var requiredMemory = -1L + var timestamp: Long + var cores = -1 + var minTime = Long.MAX_VALUE + + BufferedReader(FileReader(vmFile)).use { reader -> + reader.lineSequence() + .filter { line -> + // Ignore comments in the trace + !line.startsWith("#") && line.isNotBlank() + } + .forEach { line -> + val values = line.split(" ") + + vmId = vmFile.name + timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L + cores = values[coreCol].trim().toInt() + requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) + maxCores = max(maxCores, cores) + minTime = min(minTime, timestamp) + } + } + + val flopsFragments = sequence { + var last: SimTraceWorkload.Fragment? = null + + BufferedReader(FileReader(vmFile)).use { reader -> + reader.lineSequence() + .chunked(128) + .forEach { lines -> + for (line in lines) { + // Ignore comments in the trace + if (line.startsWith("#") || line.isBlank()) { + continue + } + + val values = line.split(" ") + val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz + requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) + maxCores = max(maxCores, cores) + + last = if (last != null && last!!.usage == 0.0 && cpuUsage == 0.0) { + val oldFragment = last!! + SimTraceWorkload.Fragment( + oldFragment.duration + traceInterval, + cpuUsage, + cores + ) + } else { + val fragment = + SimTraceWorkload.Fragment(traceInterval, cpuUsage, cores) + if (last != null) { + yield(last!!) + } + fragment + } + } + } + + if (last != null) { + yield(last!!) + } + } + } + + val uuid = UUID(0, idx.toLong()) + + val relevantPerformanceInterferenceModelItems = + PerformanceInterferenceModel( + performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId) }.toSortedSet(), + Random(random.nextInt()) + ) + val workload = SimTraceWorkload(flopsFragments.asSequence()) + entries[uuid] = TraceEntry( + uuid, + vmId, + minTime, + workload, + mapOf( + IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems, + "cores" to cores, + "required-memory" to requiredMemory, + "workload" to workload + ) + ) + } + + // Create the entry iterator + iterator = entries.values.sortedBy { it.start }.iterator() + } + + override fun hasNext(): Boolean = iterator.hasNext() + + override fun next(): TraceEntry<SimWorkload> = iterator.next() + + override fun close() {} +} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20VmPlacementReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20VmPlacementReader.kt new file mode 100644 index 00000000..61bdea60 --- /dev/null +++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20VmPlacementReader.kt @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.format.trace.sc20 + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper +import com.fasterxml.jackson.module.kotlin.readValue +import org.opendc.format.trace.VmPlacementReader +import java.io.InputStream + +/** + * A parser for the JSON VM placement data files used for the SC20 paper. + * + * @param input The input stream to read from. + * @param mapper The Jackson object mapper to use. + */ +public class Sc20VmPlacementReader(input: InputStream, mapper: ObjectMapper = jacksonObjectMapper()) : + VmPlacementReader { + /** + * The environment that was read from the file. + */ + private val placements = mapper.readValue<Map<String, String>>(input) + + override fun construct(): Map<String, String> { + return placements + .mapKeys { "vm__workload__${it.key}.txt" } + .mapValues { it.value.split("/")[1] } // Clusters have format XX0 / X00 + } + + override fun close() {} +} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt new file mode 100644 index 00000000..0d1f3cea --- /dev/null +++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt @@ -0,0 +1,177 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.format.trace.swf + +import org.opendc.format.trace.TraceEntry +import org.opendc.format.trace.TraceReader +import org.opendc.simulator.compute.workload.SimTraceWorkload +import org.opendc.simulator.compute.workload.SimWorkload +import java.io.BufferedReader +import java.io.File +import java.io.FileReader +import java.util.* + +/** + * A [TraceReader] for reading SWF traces into VM-modeled workloads. + * + * The standard is defined by the PWA, see here: https://www.cse.huji.ac.il/labs/parallel/workload/swf.html + * + * @param file The trace file. + */ +public class SwfTraceReader( + file: File, + maxNumCores: Int = -1 +) : TraceReader<SimWorkload> { + /** + * The internal iterator to use for this reader. + */ + private val iterator: Iterator<TraceEntry<SimWorkload>> + + /** + * Initialize the reader. + */ + init { + val entries = mutableMapOf<Long, TraceEntry<SimWorkload>>() + + val jobNumberCol = 0 + val submitTimeCol = 1 // seconds (begin of trace is 0) + val waitTimeCol = 2 // seconds + val runTimeCol = 3 // seconds + val numAllocatedCoresCol = 4 // We assume that single-core processors were used at the time + val requestedMemoryCol = 9 // KB per processor/core (-1 if not specified) + + val sliceDuration = 5 * 60L + + var jobNumber: Long + var submitTime: Long + var waitTime: Long + var runTime: Long + var cores: Int + var memory: Long + var slicedWaitTime: Long + var flopsPerSecond: Long + var flopsPartialSlice: Long + var runtimePartialSliceRemainder: Long + + BufferedReader(FileReader(file)).use { reader -> + reader.lineSequence() + .filter { line -> + // Ignore comments in the trace + !line.startsWith(";") && line.isNotBlank() + } + .forEach { line -> + val values = line.trim().split("\\s+".toRegex()) + + jobNumber = values[jobNumberCol].trim().toLong() + submitTime = values[submitTimeCol].trim().toLong() + waitTime = values[waitTimeCol].trim().toLong() + runTime = values[runTimeCol].trim().toLong() + cores = values[numAllocatedCoresCol].trim().toInt() + memory = values[requestedMemoryCol].trim().toLong() + + if (maxNumCores != -1 && cores > maxNumCores) { + println("Skipped a task due to processor count ($cores > $maxNumCores).") + return@forEach + } + + if (memory == -1L) { + memory = 1000L * cores // assume 1GB of memory per processor if not specified + } else { + memory /= 1000 // convert KB to MB + } + + val flopsHistory = mutableListOf<SimTraceWorkload.Fragment>() + + // Insert waiting time slices + + // We ignore wait time remainders under one + slicedWaitTime = 0L + if (waitTime >= sliceDuration) { + for (tick in submitTime until (submitTime + waitTime - sliceDuration) step sliceDuration) { + flopsHistory.add( + SimTraceWorkload.Fragment( + sliceDuration * 1000L, + 0.0, + cores + ) + ) + slicedWaitTime += sliceDuration + } + } + + // Insert run time slices + + flopsPerSecond = 4_000L * cores + runtimePartialSliceRemainder = runTime % sliceDuration + flopsPartialSlice = flopsPerSecond * runtimePartialSliceRemainder + + for ( + tick in (submitTime + slicedWaitTime) + until (submitTime + slicedWaitTime + runTime - sliceDuration) + step sliceDuration + ) { + flopsHistory.add( + SimTraceWorkload.Fragment( + sliceDuration * 1000L, + 1.0, + cores + ) + ) + } + + if (runtimePartialSliceRemainder > 0) { + flopsHistory.add( + SimTraceWorkload.Fragment( + sliceDuration, + runtimePartialSliceRemainder / sliceDuration.toDouble(), + cores + ) + ) + } + + val uuid = UUID(0L, jobNumber) + val workload = SimTraceWorkload(flopsHistory.asSequence()) + entries[jobNumber] = TraceEntry( + uuid, + jobNumber.toString(), + submitTime, + workload, + mapOf( + "cores" to cores, + "required-memory" to memory, + "workload" to workload + ) + ) + } + } + + // Create the entry iterator + iterator = entries.values.sortedBy { it.start }.iterator() + } + + override fun hasNext(): Boolean = iterator.hasNext() + + override fun next(): TraceEntry<SimWorkload> = iterator.next() + + override fun close() {} +} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt new file mode 100644 index 00000000..feadf61f --- /dev/null +++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt @@ -0,0 +1,118 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.format.trace.wtf + +import org.apache.avro.generic.GenericRecord +import org.apache.hadoop.fs.Path +import org.apache.parquet.avro.AvroParquetReader +import org.opendc.format.trace.TraceEntry +import org.opendc.format.trace.TraceReader +import org.opendc.simulator.compute.workload.SimFlopsWorkload +import org.opendc.workflow.api.Job +import org.opendc.workflow.api.Task +import org.opendc.workflow.api.WORKFLOW_TASK_CORES +import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE +import java.util.UUID +import kotlin.math.min + +/** + * A [TraceReader] for the Workflow Trace Format (WTF). See the Workflow Trace Archive + * (https://wta.atlarge-research.com/) for more information about the format. + * + * @param path The path to the trace. + */ +public class WtfTraceReader(path: String) : TraceReader<Job> { + /** + * The internal iterator to use for this reader. + */ + private val iterator: Iterator<TraceEntry<Job>> + + /** + * Initialize the reader. + */ + init { + val workflows = mutableMapOf<Long, Job>() + val starts = mutableMapOf<Long, Long>() + val tasks = mutableMapOf<Long, Task>() + val taskDependencies = mutableMapOf<Task, List<Long>>() + + @Suppress("DEPRECATION") + val reader = AvroParquetReader.builder<GenericRecord>(Path(path, "tasks/schema-1.0")).build() + + while (true) { + val nextRecord = reader.read() ?: break + + val workflowId = nextRecord.get("workflow_id") as Long + val taskId = nextRecord.get("id") as Long + val submitTime = nextRecord.get("ts_submit") as Long + val runtime = nextRecord.get("runtime") as Long + val cores = (nextRecord.get("resource_amount_requested") as Double).toInt() + @Suppress("UNCHECKED_CAST") + val dependencies = (nextRecord.get("parents") as ArrayList<GenericRecord>).map { + it.get("item") as Long + } + + val flops: Long = 4100 * (runtime / 1000) * cores + + val workflow = workflows.getOrPut(workflowId) { + Job(UUID(0L, workflowId), "<unnamed>", HashSet()) + } + val workload = SimFlopsWorkload(flops) + val task = Task( + UUID(0L, taskId), + "<unnamed>", + HashSet(), + mapOf( + "workload" to workload, + WORKFLOW_TASK_CORES to cores, + WORKFLOW_TASK_DEADLINE to runtime + ) + ) + + starts.merge(workflowId, submitTime, ::min) + (workflow.tasks as MutableSet<Task>).add(task) + tasks[taskId] = task + taskDependencies[task] = dependencies + } + + // Fix dependencies and dependents for all tasks + taskDependencies.forEach { (task, dependencies) -> + (task.dependencies as MutableSet<Task>).addAll( + dependencies.map { taskId -> + tasks[taskId] ?: throw IllegalArgumentException("Dependency task with id $taskId not found") + } + ) + } + + // Create the entry iterator + iterator = workflows.map { (id, job) -> TraceEntry(job.uid, job.name, starts.getValue(id), job, job.metadata) } + .sortedBy { it.start } + .iterator() + } + + override fun hasNext(): Boolean = iterator.hasNext() + + override fun next(): TraceEntry<Job> = iterator.next() + + override fun close() {} +} |
