summaryrefslogtreecommitdiff
path: root/opendc-format/src/main/kotlin
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-format/src/main/kotlin')
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/environment/EnvironmentReader.kt35
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/environment/MachineDef.kt35
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Model.kt66
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt89
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Model.kt67
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt120
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt97
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/trace/PerformanceInterferenceModelReader.kt37
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt44
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt34
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/trace/VmPlacementReader.kt37
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt156
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt176
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/PerformanceInterferenceEntry.kt7
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt65
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt181
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20VmPlacementReader.kt51
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt177
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt118
19 files changed, 1592 insertions, 0 deletions
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/EnvironmentReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/EnvironmentReader.kt
new file mode 100644
index 00000000..97d6f239
--- /dev/null
+++ b/opendc-format/src/main/kotlin/org/opendc/format/environment/EnvironmentReader.kt
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.format.environment
+
+import java.io.Closeable
+
+/**
+ * An interface for reading descriptions of topology environments into memory.
+ */
+public interface EnvironmentReader : Closeable {
+ /**
+ * Read the environment into a list.
+ */
+ public fun read(): List<MachineDef>
+}
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/MachineDef.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/MachineDef.kt
new file mode 100644
index 00000000..9b799cc2
--- /dev/null
+++ b/opendc-format/src/main/kotlin/org/opendc/format/environment/MachineDef.kt
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.format.environment
+
+import org.opendc.simulator.compute.SimMachineModel
+import org.opendc.simulator.compute.power.PowerModel
+import java.util.*
+
+public data class MachineDef(
+ val uid: UUID,
+ val name: String,
+ val meta: Map<String, Any>,
+ val model: SimMachineModel,
+ val powerModel: PowerModel
+)
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Model.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Model.kt
new file mode 100644
index 00000000..c313467f
--- /dev/null
+++ b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Model.kt
@@ -0,0 +1,66 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.format.environment.sc18
+
+import com.fasterxml.jackson.annotation.JsonSubTypes
+import com.fasterxml.jackson.annotation.JsonTypeInfo
+
+/**
+ * A topology setup.
+ *
+ * @property name The name of the setup.
+ * @property rooms The rooms in the topology.
+ */
+internal data class Setup(val name: String, val rooms: List<Room>)
+
+/**
+ * A room in a topology.
+ *
+ * @property type The type of room in the topology.
+ * @property objects The objects in the room.
+ */
+internal data class Room(val type: String, val objects: List<RoomObject>)
+
+/**
+ * An object in a [Room].
+ *
+ * @property type The type of the room object.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type")
+@JsonSubTypes(value = [JsonSubTypes.Type(name = "RACK", value = RoomObject.Rack::class)])
+internal sealed class RoomObject(val type: String) {
+ /**
+ * A rack in a server room.
+ *
+ * @property machines The machines in the rack.
+ */
+ internal data class Rack(val machines: List<Machine>) : RoomObject("RACK")
+}
+
+/**
+ * A machine in the setup that consists of the specified CPU's represented as
+ * integer identifiers and ethernet speed.
+ *
+ * @property cpus The CPUs in the machine represented as integer identifiers.
+ */
+internal data class Machine(val cpus: List<Int>)
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
new file mode 100644
index 00000000..1f080c2d
--- /dev/null
+++ b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
@@ -0,0 +1,89 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.format.environment.sc18
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
+import com.fasterxml.jackson.module.kotlin.readValue
+import org.opendc.format.environment.EnvironmentReader
+import org.opendc.format.environment.MachineDef
+import org.opendc.simulator.compute.SimMachineModel
+import org.opendc.simulator.compute.model.MemoryUnit
+import org.opendc.simulator.compute.model.ProcessingNode
+import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.power.ConstantPowerModel
+import java.io.InputStream
+import java.util.*
+
+/**
+ * A parser for the JSON experiment setup files used for the SC18 paper: "A Reference Architecture for Topology
+ * Schedulers".
+ *
+ * @param input The input stream to read from.
+ * @param mapper The Jackson object mapper to use.
+ */
+public class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonObjectMapper()) : EnvironmentReader {
+ /**
+ * The environment that was read from the file.
+ */
+ private val setup: Setup = mapper.readValue(input)
+
+ /**
+ * Read the environment.
+ */
+ public override fun read(): List<MachineDef> {
+ var counter = 0
+ return setup.rooms.flatMap { room ->
+ room.objects.flatMap { roomObject ->
+ when (roomObject) {
+ is RoomObject.Rack -> {
+ roomObject.machines.map { machine ->
+ val cores = machine.cpus.flatMap { id ->
+ when (id) {
+ 1 -> {
+ val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 4)
+ List(node.coreCount) { ProcessingUnit(node, it, 4100.0) }
+ }
+ 2 -> {
+ val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 2)
+ List(node.coreCount) { ProcessingUnit(node, it, 3500.0) }
+ }
+ else -> throw IllegalArgumentException("The cpu id $id is not recognized")
+ }
+ }
+ MachineDef(
+ UUID(0L, counter++.toLong()),
+ "node-$counter",
+ emptyMap(),
+ SimMachineModel(cores, listOf(MemoryUnit("", "", 2300.0, 16000))),
+ ConstantPowerModel(0.0)
+ )
+ }
+ }
+ }
+ }
+ }
+ }
+
+ override fun close() {}
+}
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Model.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Model.kt
new file mode 100644
index 00000000..58af8453
--- /dev/null
+++ b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Model.kt
@@ -0,0 +1,67 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.format.environment.sc20
+
+import com.fasterxml.jackson.annotation.JsonSubTypes
+import com.fasterxml.jackson.annotation.JsonTypeInfo
+
+/**
+ * A topology setup.
+ *
+ * @property name The name of the setup.
+ * @property rooms The rooms in the topology.
+ */
+internal data class Setup(val name: String, val rooms: List<Room>)
+
+/**
+ * A room in a topology.
+ *
+ * @property type The type of room in the topology.
+ * @property objects The objects in the room.
+ */
+internal data class Room(val type: String, val objects: List<RoomObject>)
+
+/**
+ * An object in a [Room].
+ *
+ * @property type The type of the room object.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type")
+@JsonSubTypes(value = [JsonSubTypes.Type(name = "RACK", value = RoomObject.Rack::class)])
+internal sealed class RoomObject(val type: String) {
+ /**
+ * A rack in a server room.
+ *
+ * @property machines The machines in the rack.
+ */
+ internal data class Rack(val machines: List<Machine>) : RoomObject("RACK")
+}
+
+/**
+ * A machine in the setup that consists of the specified CPU's represented as
+ * integer identifiers and ethernet speed.
+ *
+ * @property cpus The CPUs in the machine represented as integer identifiers.
+ * @property memories The memories in the machine represented as integer identifiers.
+ */
+internal data class Machine(val cpus: List<Int>, val memories: List<Int>)
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt
new file mode 100644
index 00000000..cf90da68
--- /dev/null
+++ b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt
@@ -0,0 +1,120 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.format.environment.sc20
+
+import org.opendc.format.environment.EnvironmentReader
+import org.opendc.format.environment.MachineDef
+import org.opendc.simulator.compute.SimMachineModel
+import org.opendc.simulator.compute.model.MemoryUnit
+import org.opendc.simulator.compute.model.ProcessingNode
+import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.power.LinearPowerModel
+import java.io.File
+import java.io.FileInputStream
+import java.io.InputStream
+import java.util.*
+
+/**
+ * A [EnvironmentReader] for the internal environment format.
+ *
+ * @param environmentFile The file describing the physical cluster.
+ */
+public class Sc20ClusterEnvironmentReader(
+ private val input: InputStream
+) : EnvironmentReader {
+
+ public constructor(file: File) : this(FileInputStream(file))
+
+ public override fun read(): List<MachineDef> {
+ var clusterIdCol = 0
+ var speedCol = 0
+ var numberOfHostsCol = 0
+ var memoryPerHostCol = 0
+ var coresPerHostCol = 0
+
+ var clusterIdx: Int = 0
+ var clusterId: String
+ var speed: Double
+ var numberOfHosts: Int
+ var memoryPerHost: Long
+ var coresPerHost: Int
+
+ val nodes = mutableListOf<MachineDef>()
+ val random = Random(0)
+
+ input.bufferedReader().use { reader ->
+ reader.lineSequence()
+ .filter { line ->
+ // Ignore comments in the file
+ !line.startsWith("#") && line.isNotBlank()
+ }
+ .forEachIndexed { idx, line ->
+ val values = line.split(";")
+
+ if (idx == 0) {
+ val header = values.mapIndexed { col, name -> Pair(name.trim(), col) }.toMap()
+ clusterIdCol = header["ClusterID"]!!
+ speedCol = header["Speed"]!!
+ numberOfHostsCol = header["numberOfHosts"]!!
+ memoryPerHostCol = header["memoryCapacityPerHost"]!!
+ coresPerHostCol = header["coreCountPerHost"]!!
+ return@forEachIndexed
+ }
+
+ clusterIdx++
+ clusterId = values[clusterIdCol].trim()
+ speed = values[speedCol].trim().toDouble() * 1000.0
+ numberOfHosts = values[numberOfHostsCol].trim().toInt()
+ memoryPerHost = values[memoryPerHostCol].trim().toLong() * 1000L
+ coresPerHost = values[coresPerHostCol].trim().toInt()
+
+ val unknownProcessingNode = ProcessingNode("unknown", "unknown", "unknown", coresPerHost)
+ val unknownMemoryUnit = MemoryUnit("unknown", "unknown", -1.0, memoryPerHost)
+
+ repeat(numberOfHosts) {
+ nodes.add(
+ MachineDef(
+ UUID(random.nextLong(), random.nextLong()),
+ "node-$clusterId-$it",
+ mapOf("cluster" to clusterId),
+ SimMachineModel(
+ List(coresPerHost) { coreId ->
+ ProcessingUnit(unknownProcessingNode, coreId, speed)
+ },
+ listOf(unknownMemoryUnit)
+ ),
+ // For now we assume a simple linear load model with an idle draw of ~200W and a maximum
+ // power draw of 350W.
+ // Source: https://stackoverflow.com/questions/6128960
+ LinearPowerModel(350.0, 200 / 350.0)
+ )
+ )
+ }
+ }
+ }
+
+ return nodes
+ }
+
+ override fun close() {}
+}
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt
new file mode 100644
index 00000000..c6a19430
--- /dev/null
+++ b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt
@@ -0,0 +1,97 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.format.environment.sc20
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
+import com.fasterxml.jackson.module.kotlin.readValue
+import org.opendc.format.environment.EnvironmentReader
+import org.opendc.format.environment.MachineDef
+import org.opendc.simulator.compute.SimMachineModel
+import org.opendc.simulator.compute.model.MemoryUnit
+import org.opendc.simulator.compute.model.ProcessingNode
+import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.power.LinearPowerModel
+import java.io.InputStream
+import java.util.*
+
+/**
+ * A parser for the JSON experiment setup files used for the SC20 paper.
+ *
+ * @param input The input stream to read from.
+ * @param mapper The Jackson object mapper to use.
+ */
+public class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonObjectMapper()) : EnvironmentReader {
+ /**
+ * The environment that was read from the file.
+ */
+ private val setup: Setup = mapper.readValue(input)
+
+ /**
+ * Read the environment.
+ */
+ public override fun read(): List<MachineDef> {
+ var counter = 0
+ return setup.rooms.flatMap { room ->
+ room.objects.flatMap { roomObject ->
+ when (roomObject) {
+ is RoomObject.Rack -> {
+ roomObject.machines.map { machine ->
+ val cores = machine.cpus.flatMap { id ->
+ when (id) {
+ 1 -> {
+ val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 4)
+ List(node.coreCount) { ProcessingUnit(node, it, 4100.0) }
+ }
+ 2 -> {
+ val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 2)
+ List(node.coreCount) { ProcessingUnit(node, it, 3500.0) }
+ }
+ else -> throw IllegalArgumentException("The cpu id $id is not recognized")
+ }
+ }
+ val memories = machine.memories.map { id ->
+ when (id) {
+ 1 -> MemoryUnit("Samsung", "PC DRAM K4A4G045WD", 1600.0, 4_000L)
+ else -> throw IllegalArgumentException("The cpu id $id is not recognized")
+ }
+ }
+ MachineDef(
+ UUID(0L, counter++.toLong()),
+ "node-$counter",
+ emptyMap(),
+ SimMachineModel(cores, memories),
+ // For now we assume a simple linear load model with an idle draw of ~200W and a maximum
+ // power draw of 350W.
+ // Source: https://stackoverflow.com/questions/6128960
+ LinearPowerModel(350.0, 200 / 350.0)
+ )
+ }
+ }
+ }
+ }
+ }
+ }
+
+ override fun close() {}
+}
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/PerformanceInterferenceModelReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/PerformanceInterferenceModelReader.kt
new file mode 100644
index 00000000..f30e64cf
--- /dev/null
+++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/PerformanceInterferenceModelReader.kt
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.format.trace
+
+import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
+import java.io.Closeable
+import kotlin.random.Random
+
+/**
+ * An interface for reading descriptions of performance interference models into memory.
+ */
+public interface PerformanceInterferenceModelReader : Closeable {
+ /**
+ * Construct a [PerformanceInterferenceModel].
+ */
+ public fun construct(random: Random): Map<String, PerformanceInterferenceModel>
+}
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt
new file mode 100644
index 00000000..3ce79d69
--- /dev/null
+++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt
@@ -0,0 +1,44 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.format.trace
+
+import java.util.UUID
+
+/**
+ * An entry in a workload trace.
+ *
+ * @param uid The unique identifier of the entry.
+ * @param name The name of the entry.
+ * @param start The start time of the workload.
+ * @param workload The workload of the entry.
+ * @param meta The meta-data associated with the workload.
+ */
+public data class TraceEntry<out T>(
+ val uid: UUID,
+ val name: String,
+ val start: Long,
+ val workload: T,
+ val meta: Map<String, Any>
+)
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt
new file mode 100644
index 00000000..7df1acd3
--- /dev/null
+++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.format.trace
+
+import java.io.Closeable
+
+/**
+ * An interface for reading workloads into memory.
+ *
+ * This interface must guarantee that the entries are delivered in order of submission time.
+ *
+ * @param T The shape of the workloads supported by this reader.
+ */
+public interface TraceReader<T> : Iterator<TraceEntry<T>>, Closeable
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/VmPlacementReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/VmPlacementReader.kt
new file mode 100644
index 00000000..6861affe
--- /dev/null
+++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/VmPlacementReader.kt
@@ -0,0 +1,37 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.format.trace
+
+import java.io.Closeable
+
+/**
+ * An interface for reading VM placement data into memory.
+ */
+public interface VmPlacementReader : Closeable {
+ /**
+ * Construct a map of VMs to clusters.
+ */
+ public fun construct(): Map<String, String>
+}
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt
new file mode 100644
index 00000000..769b2b13
--- /dev/null
+++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt
@@ -0,0 +1,156 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.format.trace.bitbrains
+
+import org.opendc.format.trace.TraceEntry
+import org.opendc.format.trace.TraceReader
+import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL
+import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
+import org.opendc.simulator.compute.workload.SimTraceWorkload
+import org.opendc.simulator.compute.workload.SimWorkload
+import java.io.BufferedReader
+import java.io.File
+import java.io.FileReader
+import java.util.*
+import kotlin.math.min
+
+/**
+ * A [TraceReader] for the public VM workload trace format.
+ *
+ * @param traceDirectory The directory of the traces.
+ * @param performanceInterferenceModel The performance model covering the workload in the VM trace.
+ */
+public class BitbrainsTraceReader(
+ traceDirectory: File,
+ performanceInterferenceModel: PerformanceInterferenceModel
+) : TraceReader<SimWorkload> {
+ /**
+ * The internal iterator to use for this reader.
+ */
+ private val iterator: Iterator<TraceEntry<SimWorkload>>
+
+ /**
+ * Initialize the reader.
+ */
+ init {
+ val entries = mutableMapOf<Long, TraceEntry<SimWorkload>>()
+
+ var timestampCol = 0
+ var coreCol = 0
+ var cpuUsageCol = 0
+ var provisionedMemoryCol = 0
+ val traceInterval = 5 * 60 * 1000L
+
+ traceDirectory.walk()
+ .filterNot { it.isDirectory }
+ .forEach { vmFile ->
+ println(vmFile)
+ val flopsHistory = mutableListOf<SimTraceWorkload.Fragment>()
+ var vmId = -1L
+ var cores = -1
+ var requiredMemory = -1L
+ var startTime = -1L
+
+ BufferedReader(FileReader(vmFile)).use { reader ->
+ reader.lineSequence()
+ .filter { line ->
+ // Ignore comments in the trace
+ !line.startsWith("#") && line.isNotBlank()
+ }
+ .forEachIndexed { idx, line ->
+ val values = line.split(";\t")
+
+ // Parse GWF header
+ if (idx == 0) {
+ val header = values.mapIndexed { col, name -> Pair(name.trim(), col) }.toMap()
+ timestampCol = header["Timestamp [ms]"]!!
+ coreCol = header["CPU cores"]!!
+ cpuUsageCol = header["CPU usage [MHZ]"]!!
+ provisionedMemoryCol = header["Memory capacity provisioned [KB]"]!!
+ return@forEachIndexed
+ }
+
+ vmId = vmFile.nameWithoutExtension.trim().toLong()
+ startTime = min(startTime, values[timestampCol].trim().toLong() - 5 * 60)
+ cores = values[coreCol].trim().toInt()
+ val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz
+ requiredMemory = (values[provisionedMemoryCol].trim().toDouble() / 1000).toLong()
+
+ if (flopsHistory.isEmpty()) {
+ flopsHistory.add(SimTraceWorkload.Fragment(traceInterval, cpuUsage, cores))
+ } else {
+ if (flopsHistory.last().usage != cpuUsage) {
+ flopsHistory.add(
+ SimTraceWorkload.Fragment(
+ traceInterval,
+ cpuUsage,
+ cores
+ )
+ )
+ } else {
+ val oldFragment = flopsHistory.removeAt(flopsHistory.size - 1)
+ flopsHistory.add(
+ SimTraceWorkload.Fragment(
+ oldFragment.duration + traceInterval,
+ cpuUsage,
+ cores
+ )
+ )
+ }
+ }
+ }
+ }
+
+ val uuid = UUID(0L, vmId)
+
+ val relevantPerformanceInterferenceModelItems =
+ PerformanceInterferenceModel(
+ performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId.toString()) }
+ .toSortedSet()
+ )
+
+ val workload = SimTraceWorkload(flopsHistory.asSequence())
+ entries[vmId] = TraceEntry(
+ uuid,
+ vmId.toString(),
+ startTime,
+ workload,
+ mapOf(
+ IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems,
+ "cores" to cores,
+ "required-memory" to requiredMemory,
+ "workload" to workload
+ )
+ )
+ }
+
+ // Create the entry iterator
+ iterator = entries.values.sortedBy { it.start }.iterator()
+ }
+
+ override fun hasNext(): Boolean = iterator.hasNext()
+
+ override fun next(): TraceEntry<SimWorkload> = iterator.next()
+
+ override fun close() {}
+}
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt
new file mode 100644
index 00000000..e68afeb7
--- /dev/null
+++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt
@@ -0,0 +1,176 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.format.trace.gwf
+
+import org.opendc.format.trace.TraceEntry
+import org.opendc.format.trace.TraceReader
+import org.opendc.simulator.compute.workload.SimFlopsWorkload
+import org.opendc.workflow.api.Job
+import org.opendc.workflow.api.Task
+import org.opendc.workflow.api.WORKFLOW_TASK_CORES
+import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE
+import java.io.BufferedReader
+import java.io.File
+import java.io.InputStream
+import java.util.*
+import kotlin.collections.HashSet
+import kotlin.collections.Iterator
+import kotlin.collections.List
+import kotlin.collections.MutableSet
+import kotlin.collections.component1
+import kotlin.collections.component2
+import kotlin.collections.filter
+import kotlin.collections.forEach
+import kotlin.collections.getOrPut
+import kotlin.collections.map
+import kotlin.collections.mapIndexed
+import kotlin.collections.mapOf
+import kotlin.collections.mutableMapOf
+import kotlin.collections.set
+import kotlin.collections.sortedBy
+import kotlin.collections.toMap
+import kotlin.math.max
+import kotlin.math.min
+
+/**
+ * A [TraceReader] for the Grid Workload Format. See the Grid Workloads Archive (http://gwa.ewi.tudelft.nl/) for more
+ * information about the format.
+ *
+ * Be aware that in the Grid Workload Format, workflows are not required to be ordered by submission time and therefore
+ * this reader needs to read the whole trace into memory before an entry can be read. Consider converting the trace to a
+ * different format for better performance.
+ *
+ * @param reader The buffered reader to read the trace with.
+ */
+public class GwfTraceReader(reader: BufferedReader) : TraceReader<Job> {
+ /**
+ * The internal iterator to use for this reader.
+ */
+ private val iterator: Iterator<TraceEntry<Job>>
+
+ /**
+ * Create a [GwfTraceReader] instance from the specified [File].
+ *
+ * @param file The file to read from.
+ */
+ public constructor(file: File) : this(file.bufferedReader())
+
+ /**
+ * Create a [GwfTraceReader] instance from the specified [InputStream].
+ *
+ * @param input The input stream to read from.
+ */
+ public constructor(input: InputStream) : this(input.bufferedReader())
+
+ /**
+ * Initialize the reader.
+ */
+ init {
+ val workflows = mutableMapOf<Long, Job>()
+ val starts = mutableMapOf<Long, Long>()
+ val tasks = mutableMapOf<Long, Task>()
+ val taskDependencies = mutableMapOf<Task, List<Long>>()
+
+ var workflowIdCol = 0
+ var taskIdCol = 0
+ var submitTimeCol = 0
+ var runtimeCol = 0
+ var coreCol = 0
+ var dependencyCol = 0
+
+ try {
+ reader.lineSequence()
+ .filter { line ->
+ // Ignore comments in the trace
+ !line.startsWith("#") && line.isNotBlank()
+ }
+ .forEachIndexed { idx, line ->
+ val values = line.split(",")
+
+ // Parse GWF header
+ if (idx == 0) {
+ val header = values.mapIndexed { col, name -> Pair(name.trim(), col) }.toMap()
+ workflowIdCol = header["WorkflowID"]!!
+ taskIdCol = header["JobID"]!!
+ submitTimeCol = header["SubmitTime"]!!
+ runtimeCol = header["RunTime"]!!
+ coreCol = header["NProcs"]!!
+ dependencyCol = header["Dependencies"]!!
+ return@forEachIndexed
+ }
+
+ val workflowId = values[workflowIdCol].trim().toLong()
+ val taskId = values[taskIdCol].trim().toLong()
+ val submitTime = values[submitTimeCol].trim().toLong() * 1000 // ms
+ val runtime = max(0, values[runtimeCol].trim().toLong()) // s
+ val cores = values[coreCol].trim().toInt()
+ val dependencies = values[dependencyCol].split(" ")
+ .filter { it.isNotEmpty() }
+ .map { it.trim().toLong() }
+
+ val flops: Long = 4000 * runtime * cores
+
+ val workflow = workflows.getOrPut(workflowId) {
+ Job(UUID(0L, workflowId), "<unnamed>", HashSet())
+ }
+ val workload = SimFlopsWorkload(flops)
+ val task = Task(
+ UUID(0L, taskId),
+ "<unnamed>",
+ HashSet(),
+ mapOf(
+ "workload" to workload,
+ WORKFLOW_TASK_CORES to cores,
+ WORKFLOW_TASK_DEADLINE to (runtime * 1000)
+ ),
+ )
+ starts.merge(workflowId, submitTime, ::min)
+ (workflow.tasks as MutableSet<Task>).add(task)
+ tasks[taskId] = task
+ taskDependencies[task] = dependencies
+ }
+ } finally {
+ reader.close()
+ }
+
+ // Fix dependencies and dependents for all tasks
+ taskDependencies.forEach { (task, dependencies) ->
+ (task.dependencies as MutableSet<Task>).addAll(
+ dependencies.map { taskId ->
+ tasks[taskId] ?: throw IllegalArgumentException("Dependency task with id $taskId not found")
+ }
+ )
+ }
+
+ // Create the entry iterator
+ iterator = workflows.map { (id, job) -> TraceEntry(job.uid, job.name, starts.getValue(id), job, job.metadata) }
+ .sortedBy { it.start }
+ .iterator()
+ }
+
+ override fun hasNext(): Boolean = iterator.hasNext()
+
+ override fun next(): TraceEntry<Job> = iterator.next()
+
+ override fun close() {}
+}
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/PerformanceInterferenceEntry.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/PerformanceInterferenceEntry.kt
new file mode 100644
index 00000000..0da1f7c2
--- /dev/null
+++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/PerformanceInterferenceEntry.kt
@@ -0,0 +1,7 @@
+package org.opendc.format.trace.sc20
+
+internal data class PerformanceInterferenceEntry(
+ val vms: List<String>,
+ val minServerLoad: Double,
+ val performanceScore: Double
+)
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt
new file mode 100644
index 00000000..4267737d
--- /dev/null
+++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt
@@ -0,0 +1,65 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.format.trace.sc20
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
+import com.fasterxml.jackson.module.kotlin.readValue
+import org.opendc.format.trace.PerformanceInterferenceModelReader
+import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
+import java.io.InputStream
+import java.util.*
+import kotlin.random.Random
+
+/**
+ * A parser for the JSON performance interference setup files used for the SC20 paper.
+ *
+ * @param input The input stream to read from.
+ * @param mapper The Jackson object mapper to use.
+ */
+public class Sc20PerformanceInterferenceReader(input: InputStream, mapper: ObjectMapper = jacksonObjectMapper()) :
+ PerformanceInterferenceModelReader {
+ /**
+ * The computed value from the file.
+ */
+ private val items: Map<String, TreeSet<PerformanceInterferenceModel.Item>>
+
+ init {
+ val entries: List<PerformanceInterferenceEntry> = mapper.readValue(input)
+ val res = mutableMapOf<String, TreeSet<PerformanceInterferenceModel.Item>>()
+ for (entry in entries) {
+ val item = PerformanceInterferenceModel.Item(TreeSet(entry.vms), entry.minServerLoad, entry.performanceScore)
+ for (workload in entry.vms) {
+ res.computeIfAbsent(workload) { TreeSet() }.add(item)
+ }
+ }
+
+ items = res
+ }
+
+ override fun construct(random: Random): Map<String, PerformanceInterferenceModel> {
+ return items.mapValues { PerformanceInterferenceModel(it.value, Random(random.nextInt())) }
+ }
+
+ override fun close() {}
+}
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt
new file mode 100644
index 00000000..1eb4bac2
--- /dev/null
+++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt
@@ -0,0 +1,181 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.format.trace.sc20
+
+import org.opendc.format.trace.TraceEntry
+import org.opendc.format.trace.TraceReader
+import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL
+import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
+import org.opendc.simulator.compute.workload.SimTraceWorkload
+import org.opendc.simulator.compute.workload.SimWorkload
+import java.io.BufferedReader
+import java.io.File
+import java.io.FileReader
+import java.util.*
+import kotlin.math.max
+import kotlin.math.min
+import kotlin.random.Random
+
+/**
+ * A [TraceReader] for the internal VM workload trace format.
+ *
+ * @param traceDirectory The directory of the traces.
+ * @param performanceInterferenceModel The performance model covering the workload in the VM trace.
+ */
+public class Sc20TraceReader(
+ traceDirectory: File,
+ performanceInterferenceModel: PerformanceInterferenceModel,
+ selectedVms: List<String>,
+ random: Random
+) : TraceReader<SimWorkload> {
+ /**
+ * The internal iterator to use for this reader.
+ */
+ private val iterator: Iterator<TraceEntry<SimWorkload>>
+
+ /**
+ * Initialize the reader.
+ */
+ init {
+ val entries = mutableMapOf<UUID, TraceEntry<SimWorkload>>()
+
+ val timestampCol = 0
+ val cpuUsageCol = 1
+ val coreCol = 12
+ val provisionedMemoryCol = 20
+ val traceInterval = 5 * 60 * 1000L
+
+ val vms = if (selectedVms.isEmpty()) {
+ traceDirectory.walk()
+ .filterNot { it.isDirectory }
+ .filter { it.extension == "csv" || it.extension == "txt" }
+ .toList()
+ } else {
+ selectedVms.map {
+ File(traceDirectory, it)
+ }
+ }
+
+ vms
+ .forEachIndexed { idx, vmFile ->
+ println(vmFile)
+
+ var vmId = ""
+ var maxCores = -1
+ var requiredMemory = -1L
+ var timestamp: Long
+ var cores = -1
+ var minTime = Long.MAX_VALUE
+
+ BufferedReader(FileReader(vmFile)).use { reader ->
+ reader.lineSequence()
+ .filter { line ->
+ // Ignore comments in the trace
+ !line.startsWith("#") && line.isNotBlank()
+ }
+ .forEach { line ->
+ val values = line.split(" ")
+
+ vmId = vmFile.name
+ timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L
+ cores = values[coreCol].trim().toInt()
+ requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong())
+ maxCores = max(maxCores, cores)
+ minTime = min(minTime, timestamp)
+ }
+ }
+
+ val flopsFragments = sequence {
+ var last: SimTraceWorkload.Fragment? = null
+
+ BufferedReader(FileReader(vmFile)).use { reader ->
+ reader.lineSequence()
+ .chunked(128)
+ .forEach { lines ->
+ for (line in lines) {
+ // Ignore comments in the trace
+ if (line.startsWith("#") || line.isBlank()) {
+ continue
+ }
+
+ val values = line.split(" ")
+ val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz
+ requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong())
+ maxCores = max(maxCores, cores)
+
+ last = if (last != null && last!!.usage == 0.0 && cpuUsage == 0.0) {
+ val oldFragment = last!!
+ SimTraceWorkload.Fragment(
+ oldFragment.duration + traceInterval,
+ cpuUsage,
+ cores
+ )
+ } else {
+ val fragment =
+ SimTraceWorkload.Fragment(traceInterval, cpuUsage, cores)
+ if (last != null) {
+ yield(last!!)
+ }
+ fragment
+ }
+ }
+ }
+
+ if (last != null) {
+ yield(last!!)
+ }
+ }
+ }
+
+ val uuid = UUID(0, idx.toLong())
+
+ val relevantPerformanceInterferenceModelItems =
+ PerformanceInterferenceModel(
+ performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId) }.toSortedSet(),
+ Random(random.nextInt())
+ )
+ val workload = SimTraceWorkload(flopsFragments.asSequence())
+ entries[uuid] = TraceEntry(
+ uuid,
+ vmId,
+ minTime,
+ workload,
+ mapOf(
+ IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems,
+ "cores" to cores,
+ "required-memory" to requiredMemory,
+ "workload" to workload
+ )
+ )
+ }
+
+ // Create the entry iterator
+ iterator = entries.values.sortedBy { it.start }.iterator()
+ }
+
+ override fun hasNext(): Boolean = iterator.hasNext()
+
+ override fun next(): TraceEntry<SimWorkload> = iterator.next()
+
+ override fun close() {}
+}
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20VmPlacementReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20VmPlacementReader.kt
new file mode 100644
index 00000000..61bdea60
--- /dev/null
+++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20VmPlacementReader.kt
@@ -0,0 +1,51 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.format.trace.sc20
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
+import com.fasterxml.jackson.module.kotlin.readValue
+import org.opendc.format.trace.VmPlacementReader
+import java.io.InputStream
+
+/**
+ * A parser for the JSON VM placement data files used for the SC20 paper.
+ *
+ * @param input The input stream to read from.
+ * @param mapper The Jackson object mapper to use.
+ */
+public class Sc20VmPlacementReader(input: InputStream, mapper: ObjectMapper = jacksonObjectMapper()) :
+ VmPlacementReader {
+ /**
+ * The environment that was read from the file.
+ */
+ private val placements = mapper.readValue<Map<String, String>>(input)
+
+ override fun construct(): Map<String, String> {
+ return placements
+ .mapKeys { "vm__workload__${it.key}.txt" }
+ .mapValues { it.value.split("/")[1] } // Clusters have format XX0 / X00
+ }
+
+ override fun close() {}
+}
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt
new file mode 100644
index 00000000..0d1f3cea
--- /dev/null
+++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt
@@ -0,0 +1,177 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.format.trace.swf
+
+import org.opendc.format.trace.TraceEntry
+import org.opendc.format.trace.TraceReader
+import org.opendc.simulator.compute.workload.SimTraceWorkload
+import org.opendc.simulator.compute.workload.SimWorkload
+import java.io.BufferedReader
+import java.io.File
+import java.io.FileReader
+import java.util.*
+
+/**
+ * A [TraceReader] for reading SWF traces into VM-modeled workloads.
+ *
+ * The standard is defined by the PWA, see here: https://www.cse.huji.ac.il/labs/parallel/workload/swf.html
+ *
+ * @param file The trace file.
+ */
+public class SwfTraceReader(
+ file: File,
+ maxNumCores: Int = -1
+) : TraceReader<SimWorkload> {
+ /**
+ * The internal iterator to use for this reader.
+ */
+ private val iterator: Iterator<TraceEntry<SimWorkload>>
+
+ /**
+ * Initialize the reader.
+ */
+ init {
+ val entries = mutableMapOf<Long, TraceEntry<SimWorkload>>()
+
+ val jobNumberCol = 0
+ val submitTimeCol = 1 // seconds (begin of trace is 0)
+ val waitTimeCol = 2 // seconds
+ val runTimeCol = 3 // seconds
+ val numAllocatedCoresCol = 4 // We assume that single-core processors were used at the time
+ val requestedMemoryCol = 9 // KB per processor/core (-1 if not specified)
+
+ val sliceDuration = 5 * 60L
+
+ var jobNumber: Long
+ var submitTime: Long
+ var waitTime: Long
+ var runTime: Long
+ var cores: Int
+ var memory: Long
+ var slicedWaitTime: Long
+ var flopsPerSecond: Long
+ var flopsPartialSlice: Long
+ var runtimePartialSliceRemainder: Long
+
+ BufferedReader(FileReader(file)).use { reader ->
+ reader.lineSequence()
+ .filter { line ->
+ // Ignore comments in the trace
+ !line.startsWith(";") && line.isNotBlank()
+ }
+ .forEach { line ->
+ val values = line.trim().split("\\s+".toRegex())
+
+ jobNumber = values[jobNumberCol].trim().toLong()
+ submitTime = values[submitTimeCol].trim().toLong()
+ waitTime = values[waitTimeCol].trim().toLong()
+ runTime = values[runTimeCol].trim().toLong()
+ cores = values[numAllocatedCoresCol].trim().toInt()
+ memory = values[requestedMemoryCol].trim().toLong()
+
+ if (maxNumCores != -1 && cores > maxNumCores) {
+ println("Skipped a task due to processor count ($cores > $maxNumCores).")
+ return@forEach
+ }
+
+ if (memory == -1L) {
+ memory = 1000L * cores // assume 1GB of memory per processor if not specified
+ } else {
+ memory /= 1000 // convert KB to MB
+ }
+
+ val flopsHistory = mutableListOf<SimTraceWorkload.Fragment>()
+
+ // Insert waiting time slices
+
+ // We ignore wait time remainders under one
+ slicedWaitTime = 0L
+ if (waitTime >= sliceDuration) {
+ for (tick in submitTime until (submitTime + waitTime - sliceDuration) step sliceDuration) {
+ flopsHistory.add(
+ SimTraceWorkload.Fragment(
+ sliceDuration * 1000L,
+ 0.0,
+ cores
+ )
+ )
+ slicedWaitTime += sliceDuration
+ }
+ }
+
+ // Insert run time slices
+
+ flopsPerSecond = 4_000L * cores
+ runtimePartialSliceRemainder = runTime % sliceDuration
+ flopsPartialSlice = flopsPerSecond * runtimePartialSliceRemainder
+
+ for (
+ tick in (submitTime + slicedWaitTime)
+ until (submitTime + slicedWaitTime + runTime - sliceDuration)
+ step sliceDuration
+ ) {
+ flopsHistory.add(
+ SimTraceWorkload.Fragment(
+ sliceDuration * 1000L,
+ 1.0,
+ cores
+ )
+ )
+ }
+
+ if (runtimePartialSliceRemainder > 0) {
+ flopsHistory.add(
+ SimTraceWorkload.Fragment(
+ sliceDuration,
+ runtimePartialSliceRemainder / sliceDuration.toDouble(),
+ cores
+ )
+ )
+ }
+
+ val uuid = UUID(0L, jobNumber)
+ val workload = SimTraceWorkload(flopsHistory.asSequence())
+ entries[jobNumber] = TraceEntry(
+ uuid,
+ jobNumber.toString(),
+ submitTime,
+ workload,
+ mapOf(
+ "cores" to cores,
+ "required-memory" to memory,
+ "workload" to workload
+ )
+ )
+ }
+ }
+
+ // Create the entry iterator
+ iterator = entries.values.sortedBy { it.start }.iterator()
+ }
+
+ override fun hasNext(): Boolean = iterator.hasNext()
+
+ override fun next(): TraceEntry<SimWorkload> = iterator.next()
+
+ override fun close() {}
+}
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt
new file mode 100644
index 00000000..feadf61f
--- /dev/null
+++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt
@@ -0,0 +1,118 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.format.trace.wtf
+
+import org.apache.avro.generic.GenericRecord
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.avro.AvroParquetReader
+import org.opendc.format.trace.TraceEntry
+import org.opendc.format.trace.TraceReader
+import org.opendc.simulator.compute.workload.SimFlopsWorkload
+import org.opendc.workflow.api.Job
+import org.opendc.workflow.api.Task
+import org.opendc.workflow.api.WORKFLOW_TASK_CORES
+import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE
+import java.util.UUID
+import kotlin.math.min
+
+/**
+ * A [TraceReader] for the Workflow Trace Format (WTF). See the Workflow Trace Archive
+ * (https://wta.atlarge-research.com/) for more information about the format.
+ *
+ * @param path The path to the trace.
+ */
+public class WtfTraceReader(path: String) : TraceReader<Job> {
+ /**
+ * The internal iterator to use for this reader.
+ */
+ private val iterator: Iterator<TraceEntry<Job>>
+
+ /**
+ * Initialize the reader.
+ */
+ init {
+ val workflows = mutableMapOf<Long, Job>()
+ val starts = mutableMapOf<Long, Long>()
+ val tasks = mutableMapOf<Long, Task>()
+ val taskDependencies = mutableMapOf<Task, List<Long>>()
+
+ @Suppress("DEPRECATION")
+ val reader = AvroParquetReader.builder<GenericRecord>(Path(path, "tasks/schema-1.0")).build()
+
+ while (true) {
+ val nextRecord = reader.read() ?: break
+
+ val workflowId = nextRecord.get("workflow_id") as Long
+ val taskId = nextRecord.get("id") as Long
+ val submitTime = nextRecord.get("ts_submit") as Long
+ val runtime = nextRecord.get("runtime") as Long
+ val cores = (nextRecord.get("resource_amount_requested") as Double).toInt()
+ @Suppress("UNCHECKED_CAST")
+ val dependencies = (nextRecord.get("parents") as ArrayList<GenericRecord>).map {
+ it.get("item") as Long
+ }
+
+ val flops: Long = 4100 * (runtime / 1000) * cores
+
+ val workflow = workflows.getOrPut(workflowId) {
+ Job(UUID(0L, workflowId), "<unnamed>", HashSet())
+ }
+ val workload = SimFlopsWorkload(flops)
+ val task = Task(
+ UUID(0L, taskId),
+ "<unnamed>",
+ HashSet(),
+ mapOf(
+ "workload" to workload,
+ WORKFLOW_TASK_CORES to cores,
+ WORKFLOW_TASK_DEADLINE to runtime
+ )
+ )
+
+ starts.merge(workflowId, submitTime, ::min)
+ (workflow.tasks as MutableSet<Task>).add(task)
+ tasks[taskId] = task
+ taskDependencies[task] = dependencies
+ }
+
+ // Fix dependencies and dependents for all tasks
+ taskDependencies.forEach { (task, dependencies) ->
+ (task.dependencies as MutableSet<Task>).addAll(
+ dependencies.map { taskId ->
+ tasks[taskId] ?: throw IllegalArgumentException("Dependency task with id $taskId not found")
+ }
+ )
+ }
+
+ // Create the entry iterator
+ iterator = workflows.map { (id, job) -> TraceEntry(job.uid, job.name, starts.getValue(id), job, job.metadata) }
+ .sortedBy { it.start }
+ .iterator()
+ }
+
+ override fun hasNext(): Boolean = iterator.hasNext()
+
+ override fun next(): TraceEntry<Job> = iterator.next()
+
+ override fun close() {}
+}