From 8a9f5573bef3f68316add17c04a47cc4e5fe75fa Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 1 Oct 2020 00:49:53 +0200 Subject: Move OpenDC modules into simulator root This change moves the OpenDC modules previously living in the simulator/opendc directory to the simulator directory itself given that we do not make a distinction between OpenDC and odcsim anymore. --- .../opendc/format/environment/EnvironmentReader.kt | 40 ++++ .../opendc/format/environment/sc18/Model.kt | 44 +++++ .../environment/sc18/Sc18EnvironmentReader.kt | 112 +++++++++++ .../opendc/format/environment/sc20/Model.kt | 45 +++++ .../sc20/Sc20ClusterEnvironmentReader.kt | 147 +++++++++++++++ .../environment/sc20/Sc20EnvironmentReader.kt | 123 ++++++++++++ .../trace/PerformanceInterferenceModelReader.kt | 39 ++++ .../com/atlarge/opendc/format/trace/TraceEntry.kt | 54 ++++++ .../com/atlarge/opendc/format/trace/TraceReader.kt | 37 ++++ .../com/atlarge/opendc/format/trace/TraceWriter.kt | 45 +++++ .../opendc/format/trace/VmPlacementReader.kt | 37 ++++ .../format/trace/bitbrains/BitbrainsTraceReader.kt | 175 +++++++++++++++++ .../opendc/format/trace/gwf/GwfTraceReader.kt | 172 +++++++++++++++++ .../trace/sc20/PerformanceInterferenceEntry.kt | 7 + .../sc20/Sc20PerformanceInterferenceReader.kt | 68 +++++++ .../opendc/format/trace/sc20/Sc20TraceReader.kt | 210 +++++++++++++++++++++ .../format/trace/sc20/Sc20VmPlacementReader.kt | 53 ++++++ .../opendc/format/trace/swf/SwfTraceReader.kt | 208 ++++++++++++++++++++ 18 files changed, 1616 insertions(+) create mode 100644 simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt create mode 100644 simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Model.kt create mode 100644 simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt create mode 100644 simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Model.kt create mode 100644 simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt create mode 100644 simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt create mode 100644 simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/PerformanceInterferenceModelReader.kt create mode 100644 simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/TraceEntry.kt create mode 100644 simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/TraceReader.kt create mode 100644 simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/TraceWriter.kt create mode 100644 simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/VmPlacementReader.kt create mode 100644 simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt create mode 100644 simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReader.kt create mode 100644 simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/PerformanceInterferenceEntry.kt create mode 100644 simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt create mode 100644 simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt create mode 100644 simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20VmPlacementReader.kt create mode 100644 simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReader.kt (limited to 'simulator/opendc-format/src/main') diff --git a/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt new file mode 100644 index 00000000..570b936d --- /dev/null +++ b/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt @@ -0,0 +1,40 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.format.environment + +import com.atlarge.opendc.core.Environment +import kotlinx.coroutines.CoroutineScope +import java.io.Closeable +import java.time.Clock + +/** + * An interface for reading descriptions of topology environments into memory as [Environment]. + */ +interface EnvironmentReader : Closeable { + /** + * Construct an [Environment] in the specified [CoroutineScope]. + */ + suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment +} diff --git a/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Model.kt b/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Model.kt new file mode 100644 index 00000000..f3e70982 --- /dev/null +++ b/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Model.kt @@ -0,0 +1,44 @@ +package com.atlarge.opendc.format.environment.sc18 + +import com.fasterxml.jackson.annotation.JsonSubTypes +import com.fasterxml.jackson.annotation.JsonTypeInfo + +/** + * A topology setup. + * + * @property name The name of the setup. + * @property rooms The rooms in the topology. + */ +internal data class Setup(val name: String, val rooms: List) + +/** + * A room in a topology. + * + * @property type The type of room in the topology. + * @property objects The objects in the room. + */ +internal data class Room(val type: String, val objects: List) + +/** + * An object in a [Room]. + * + * @property type The type of the room object. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type") +@JsonSubTypes(value = [JsonSubTypes.Type(name = "RACK", value = RoomObject.Rack::class)]) +internal sealed class RoomObject(val type: String) { + /** + * A rack in a server room. + * + * @property machines The machines in the rack. + */ + internal data class Rack(val machines: List) : RoomObject("RACK") +} + +/** + * A machine in the setup that consists of the specified CPU's represented as + * integer identifiers and ethernet speed. + * + * @property cpus The CPUs in the machine represented as integer identifiers. + */ +internal data class Machine(val cpus: List) diff --git a/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt new file mode 100644 index 00000000..188d9fd8 --- /dev/null +++ b/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt @@ -0,0 +1,112 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.format.environment.sc18 + +import com.atlarge.opendc.compute.core.MemoryUnit +import com.atlarge.opendc.compute.core.ProcessingNode +import com.atlarge.opendc.compute.core.ProcessingUnit +import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver +import com.atlarge.opendc.compute.metal.service.ProvisioningService +import com.atlarge.opendc.compute.metal.service.SimpleProvisioningService +import com.atlarge.opendc.core.Environment +import com.atlarge.opendc.core.Platform +import com.atlarge.opendc.core.Zone +import com.atlarge.opendc.core.services.ServiceRegistry +import com.atlarge.opendc.format.environment.EnvironmentReader +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper +import com.fasterxml.jackson.module.kotlin.readValue +import kotlinx.coroutines.CoroutineScope +import java.io.InputStream +import java.time.Clock +import java.util.UUID + +/** + * A parser for the JSON experiment setup files used for the SC18 paper: "A Reference Architecture for Topology + * Schedulers". + * + * @param input The input stream to read from. + * @param mapper The Jackson object mapper to use. + */ +class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonObjectMapper()) : EnvironmentReader { + /** + * The environment that was read from the file. + */ + private val setup: Setup = mapper.readValue(input) + + override suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment { + var counter = 0 + val nodes = setup.rooms.flatMap { room -> + room.objects.flatMap { roomObject -> + when (roomObject) { + is RoomObject.Rack -> { + roomObject.machines.map { machine -> + val cores = machine.cpus.flatMap { id -> + when (id) { + 1 -> { + val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 4) + List(node.coreCount) { ProcessingUnit(node, it, 4100.0) } + } + 2 -> { + val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 2) + List(node.coreCount) { ProcessingUnit(node, it, 3500.0) } + } + else -> throw IllegalArgumentException("The cpu id $id is not recognized") + } + } + SimpleBareMetalDriver( + coroutineScope, + clock, + UUID.randomUUID(), + "node-${counter++}", + emptyMap(), + cores, + listOf(MemoryUnit("", "", 2300.0, 16000)) + ) + } + } + } + } + } + + val provisioningService = SimpleProvisioningService() + for (node in nodes) { + provisioningService.create(node) + } + + val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService) + val platform = Platform( + UUID.randomUUID(), + "sc18-platform", + listOf( + Zone(UUID.randomUUID(), "zone", serviceRegistry) + ) + ) + + return Environment(setup.name, null, listOf(platform)) + } + + override fun close() {} +} diff --git a/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Model.kt b/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Model.kt new file mode 100644 index 00000000..0a8f1c14 --- /dev/null +++ b/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Model.kt @@ -0,0 +1,45 @@ +package com.atlarge.opendc.format.environment.sc20 + +import com.fasterxml.jackson.annotation.JsonSubTypes +import com.fasterxml.jackson.annotation.JsonTypeInfo + +/** + * A topology setup. + * + * @property name The name of the setup. + * @property rooms The rooms in the topology. + */ +internal data class Setup(val name: String, val rooms: List) + +/** + * A room in a topology. + * + * @property type The type of room in the topology. + * @property objects The objects in the room. + */ +internal data class Room(val type: String, val objects: List) + +/** + * An object in a [Room]. + * + * @property type The type of the room object. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type") +@JsonSubTypes(value = [JsonSubTypes.Type(name = "RACK", value = RoomObject.Rack::class)]) +internal sealed class RoomObject(val type: String) { + /** + * A rack in a server room. + * + * @property machines The machines in the rack. + */ + internal data class Rack(val machines: List) : RoomObject("RACK") +} + +/** + * A machine in the setup that consists of the specified CPU's represented as + * integer identifiers and ethernet speed. + * + * @property cpus The CPUs in the machine represented as integer identifiers. + * @property memories The memories in the machine represented as integer identifiers. + */ +internal data class Machine(val cpus: List, val memories: List) diff --git a/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt new file mode 100644 index 00000000..d7845081 --- /dev/null +++ b/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt @@ -0,0 +1,147 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.format.environment.sc20 + +import com.atlarge.opendc.compute.core.MemoryUnit +import com.atlarge.opendc.compute.core.ProcessingNode +import com.atlarge.opendc.compute.core.ProcessingUnit +import com.atlarge.opendc.compute.metal.NODE_CLUSTER +import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver +import com.atlarge.opendc.compute.metal.power.LinearLoadPowerModel +import com.atlarge.opendc.compute.metal.service.ProvisioningService +import com.atlarge.opendc.compute.metal.service.SimpleProvisioningService +import com.atlarge.opendc.core.Environment +import com.atlarge.opendc.core.Platform +import com.atlarge.opendc.core.Zone +import com.atlarge.opendc.core.services.ServiceRegistry +import com.atlarge.opendc.format.environment.EnvironmentReader +import kotlinx.coroutines.CoroutineScope +import java.io.File +import java.io.FileInputStream +import java.io.InputStream +import java.time.Clock +import java.util.Random +import java.util.UUID + +/** + * A [EnvironmentReader] for the internal environment format. + * + * @param environmentFile The file describing the physical cluster. + */ +class Sc20ClusterEnvironmentReader( + private val input: InputStream +) : EnvironmentReader { + + constructor(file: File) : this(FileInputStream(file)) + + @Suppress("BlockingMethodInNonBlockingContext") + override suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment { + var clusterIdCol = 0 + var speedCol = 0 + var numberOfHostsCol = 0 + var memoryPerHostCol = 0 + var coresPerHostCol = 0 + + var clusterIdx: Int = 0 + var clusterId: String + var speed: Double + var numberOfHosts: Int + var memoryPerHost: Long + var coresPerHost: Int + + val nodes = mutableListOf() + val random = Random(0) + + input.bufferedReader().use { reader -> + reader.lineSequence() + .filter { line -> + // Ignore comments in the file + !line.startsWith("#") && line.isNotBlank() + } + .forEachIndexed { idx, line -> + val values = line.split(";") + + if (idx == 0) { + val header = values.mapIndexed { col, name -> Pair(name.trim(), col) }.toMap() + clusterIdCol = header["ClusterID"]!! + speedCol = header["Speed"]!! + numberOfHostsCol = header["numberOfHosts"]!! + memoryPerHostCol = header["memoryCapacityPerHost"]!! + coresPerHostCol = header["coreCountPerHost"]!! + return@forEachIndexed + } + + clusterIdx++ + clusterId = values[clusterIdCol].trim() + speed = values[speedCol].trim().toDouble() * 1000.0 + numberOfHosts = values[numberOfHostsCol].trim().toInt() + memoryPerHost = values[memoryPerHostCol].trim().toLong() * 1000L + coresPerHost = values[coresPerHostCol].trim().toInt() + + val unknownProcessingNode = ProcessingNode("unknown", "unknown", "unknown", coresPerHost) + val unknownMemoryUnit = MemoryUnit("unknown", "unknown", -1.0, memoryPerHost) + + repeat(numberOfHosts) { + nodes.add( + SimpleBareMetalDriver( + coroutineScope, + clock, + UUID(random.nextLong(), random.nextLong()), + "node-$clusterId-$it", + mapOf(NODE_CLUSTER to clusterId), + List(coresPerHost) { coreId -> + ProcessingUnit(unknownProcessingNode, coreId, speed) + }, + // For now we assume a simple linear load model with an idle draw of ~200W and a maximum + // power draw of 350W. + // Source: https://stackoverflow.com/questions/6128960 + listOf(unknownMemoryUnit), + LinearLoadPowerModel(200.0, 350.0) + ) + ) + } + } + } + + val provisioningService = SimpleProvisioningService() + for (node in nodes) { + provisioningService.create(node) + } + + val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService) + + val platform = Platform( + UUID.randomUUID(), + "sc20-platform", + listOf( + Zone(UUID.randomUUID(), "zone", serviceRegistry) + ) + ) + + return Environment("SC20 Environment", null, listOf(platform)) + } + + override fun close() {} +} diff --git a/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt new file mode 100644 index 00000000..adfa1cf0 --- /dev/null +++ b/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt @@ -0,0 +1,123 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.format.environment.sc20 + +import com.atlarge.opendc.compute.core.MemoryUnit +import com.atlarge.opendc.compute.core.ProcessingNode +import com.atlarge.opendc.compute.core.ProcessingUnit +import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver +import com.atlarge.opendc.compute.metal.power.LinearLoadPowerModel +import com.atlarge.opendc.compute.metal.service.ProvisioningService +import com.atlarge.opendc.compute.metal.service.SimpleProvisioningService +import com.atlarge.opendc.core.Environment +import com.atlarge.opendc.core.Platform +import com.atlarge.opendc.core.Zone +import com.atlarge.opendc.core.services.ServiceRegistry +import com.atlarge.opendc.format.environment.EnvironmentReader +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper +import com.fasterxml.jackson.module.kotlin.readValue +import kotlinx.coroutines.CoroutineScope +import java.io.InputStream +import java.time.Clock +import java.util.UUID + +/** + * A parser for the JSON experiment setup files used for the SC20 paper. + * + * @param input The input stream to read from. + * @param mapper The Jackson object mapper to use. + */ +class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonObjectMapper()) : EnvironmentReader { + /** + * The environment that was read from the file. + */ + private val setup: Setup = mapper.readValue(input) + + override suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment { + var counter = 0 + val nodes = setup.rooms.flatMap { room -> + room.objects.flatMap { roomObject -> + when (roomObject) { + is RoomObject.Rack -> { + roomObject.machines.map { machine -> + val cores = machine.cpus.flatMap { id -> + when (id) { + 1 -> { + val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 4) + List(node.coreCount) { ProcessingUnit(node, it, 4100.0) } + } + 2 -> { + val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 2) + List(node.coreCount) { ProcessingUnit(node, it, 3500.0) } + } + else -> throw IllegalArgumentException("The cpu id $id is not recognized") + } + } + val memories = machine.memories.map { id -> + when (id) { + 1 -> MemoryUnit("Samsung", "PC DRAM K4A4G045WD", 1600.0, 4_000L) + else -> throw IllegalArgumentException("The cpu id $id is not recognized") + } + } + SimpleBareMetalDriver( + coroutineScope, + clock, + UUID.randomUUID(), + "node-${counter++}", + emptyMap(), + cores, + // For now we assume a simple linear load model with an idle draw of ~200W and a maximum + // power draw of 350W. + // Source: https://stackoverflow.com/questions/6128960 + memories, + LinearLoadPowerModel(200.0, 350.0) + ) + } + } + } + } + } + + val provisioningService = SimpleProvisioningService() + for (node in nodes) { + provisioningService.create(node) + } + + val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService) + + val platform = Platform( + UUID.randomUUID(), + "sc20-platform", + listOf( + Zone(UUID.randomUUID(), "zone", serviceRegistry) + ) + ) + + return Environment(setup.name, null, listOf(platform)) + } + + override fun close() {} +} diff --git a/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/PerformanceInterferenceModelReader.kt b/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/PerformanceInterferenceModelReader.kt new file mode 100644 index 00000000..407bc0b4 --- /dev/null +++ b/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/PerformanceInterferenceModelReader.kt @@ -0,0 +1,39 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.format.trace + +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel +import java.io.Closeable +import kotlin.random.Random + +/** + * An interface for reading descriptions of performance interference models into memory. + */ +interface PerformanceInterferenceModelReader : Closeable { + /** + * Construct a [PerformanceInterferenceModel]. + */ + fun construct(random: Random): Map +} diff --git a/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/TraceEntry.kt b/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/TraceEntry.kt new file mode 100644 index 00000000..d4ad33f7 --- /dev/null +++ b/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/TraceEntry.kt @@ -0,0 +1,54 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.format.trace + +import com.atlarge.opendc.core.workload.Workload + +/** + * An entry in a workload trace. + * + * @param T The shape of the workload in this entry. + */ +interface TraceEntry { + /** + * The time of submission of the workload. + */ + val submissionTime: Long + + /** + * The workload in this trace entry. + */ + val workload: T + + /** + * Extract the submission time from this entry. + */ + operator fun component1() = submissionTime + + /** + * Extract the workload from this entry. + */ + operator fun component2() = workload +} diff --git a/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/TraceReader.kt b/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/TraceReader.kt new file mode 100644 index 00000000..6d29cdb4 --- /dev/null +++ b/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/TraceReader.kt @@ -0,0 +1,37 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.format.trace + +import com.atlarge.opendc.core.workload.Workload +import java.io.Closeable + +/** + * An interface for reading [Workload]s into memory. + * + * This interface must guarantee that the entries are delivered in order of submission time. + * + * @param T The shape of the workloads supported by this reader. + */ +interface TraceReader : Iterator>, Closeable diff --git a/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/TraceWriter.kt b/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/TraceWriter.kt new file mode 100644 index 00000000..94ee6f31 --- /dev/null +++ b/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/TraceWriter.kt @@ -0,0 +1,45 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.format.trace + +import com.atlarge.opendc.core.workload.Workload +import java.io.Closeable + +/** + * An interface for persisting workload traces (e.g. to disk). + * + * @param T The type of [Workload] supported by this writer. + */ +interface TraceWriter : Closeable { + /** + * Write an entry to the trace. + * + * Entries must be written in order of submission time. Failing to do so results in a [IllegalArgumentException]. + * + * @param submissionTime The time of submission of the workload. + * @param workload The workload to write to the trace. + */ + fun write(submissionTime: Long, workload: T) +} diff --git a/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/VmPlacementReader.kt b/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/VmPlacementReader.kt new file mode 100644 index 00000000..7caebb76 --- /dev/null +++ b/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/VmPlacementReader.kt @@ -0,0 +1,37 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.format.trace + +import java.io.Closeable + +/** + * An interface for reading VM placement data into memory. + */ +interface VmPlacementReader : Closeable { + /** + * Construct a map of VMs to clusters. + */ + fun construct(): Map +} diff --git a/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt b/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt new file mode 100644 index 00000000..6ee43b6a --- /dev/null +++ b/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt @@ -0,0 +1,175 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.format.trace.bitbrains + +import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment +import com.atlarge.opendc.compute.core.image.VmImage +import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel +import com.atlarge.opendc.compute.core.workload.VmWorkload +import com.atlarge.opendc.core.User +import com.atlarge.opendc.format.trace.TraceEntry +import com.atlarge.opendc.format.trace.TraceReader +import java.io.BufferedReader +import java.io.File +import java.io.FileReader +import java.util.UUID + +/** + * A [TraceReader] for the public VM workload trace format. + * + * @param traceDirectory The directory of the traces. + * @param performanceInterferenceModel The performance model covering the workload in the VM trace. + */ +class BitbrainsTraceReader( + traceDirectory: File, + performanceInterferenceModel: PerformanceInterferenceModel +) : TraceReader { + /** + * The internal iterator to use for this reader. + */ + private val iterator: Iterator> + + /** + * Initialize the reader. + */ + init { + val entries = mutableMapOf>() + + var timestampCol = 0 + var coreCol = 0 + var cpuUsageCol = 0 + var provisionedMemoryCol = 0 + val traceInterval = 5 * 60 * 1000L + + traceDirectory.walk() + .filterNot { it.isDirectory } + .forEach { vmFile -> + println(vmFile) + val flopsHistory = mutableListOf() + var vmId = -1L + var cores = -1 + var requiredMemory = -1L + + BufferedReader(FileReader(vmFile)).use { reader -> + reader.lineSequence() + .filter { line -> + // Ignore comments in the trace + !line.startsWith("#") && line.isNotBlank() + } + .forEachIndexed { idx, line -> + val values = line.split(";\t") + + // Parse GWF header + if (idx == 0) { + val header = values.mapIndexed { col, name -> Pair(name.trim(), col) }.toMap() + timestampCol = header["Timestamp [ms]"]!! + coreCol = header["CPU cores"]!! + cpuUsageCol = header["CPU usage [MHZ]"]!! + provisionedMemoryCol = header["Memory capacity provisioned [KB]"]!! + return@forEachIndexed + } + + vmId = vmFile.nameWithoutExtension.trim().toLong() + val timestamp = values[timestampCol].trim().toLong() - 5 * 60 + cores = values[coreCol].trim().toInt() + val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz + requiredMemory = (values[provisionedMemoryCol].trim().toDouble() / 1000).toLong() + + val flops: Long = (cpuUsage * 5 * 60 * cores).toLong() + + if (flopsHistory.isEmpty()) { + flopsHistory.add(FlopsHistoryFragment(timestamp, flops, traceInterval, cpuUsage, cores)) + } else { + if (flopsHistory.last().flops != flops) { + flopsHistory.add(FlopsHistoryFragment(timestamp, flops, traceInterval, cpuUsage, cores)) + } else { + val oldFragment = flopsHistory.removeAt(flopsHistory.size - 1) + flopsHistory.add( + FlopsHistoryFragment( + oldFragment.tick, + oldFragment.flops + flops, + oldFragment.duration + traceInterval, + cpuUsage, + cores + ) + ) + } + } + } + } + + val uuid = UUID(0L, vmId) + + val relevantPerformanceInterferenceModelItems = + PerformanceInterferenceModel( + performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId.toString()) }.toSortedSet() + ) + + val vmWorkload = VmWorkload( + uuid, + "VM Workload $vmId", + UnnamedUser, + VmImage( + uuid, + vmId.toString(), + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), + flopsHistory.asSequence(), + cores, + requiredMemory + ) + ) + entries[vmId] = TraceEntryImpl( + flopsHistory.firstOrNull()?.tick ?: -1, + vmWorkload + ) + } + + // Create the entry iterator + iterator = entries.values.sortedBy { it.submissionTime }.iterator() + } + + override fun hasNext(): Boolean = iterator.hasNext() + + override fun next(): TraceEntry = iterator.next() + + override fun close() {} + + /** + * An unnamed user. + */ + private object UnnamedUser : User { + override val name: String = "" + override val uid: UUID = UUID.randomUUID() + } + + /** + * An entry in the trace. + */ + private data class TraceEntryImpl( + override var submissionTime: Long, + override val workload: VmWorkload + ) : TraceEntry +} diff --git a/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReader.kt b/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReader.kt new file mode 100644 index 00000000..6db3975e --- /dev/null +++ b/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReader.kt @@ -0,0 +1,172 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.format.trace.gwf + +import com.atlarge.opendc.compute.core.image.FlopsApplicationImage +import com.atlarge.opendc.core.User +import com.atlarge.opendc.format.trace.TraceEntry +import com.atlarge.opendc.format.trace.TraceReader +import com.atlarge.opendc.workflows.workload.Job +import com.atlarge.opendc.workflows.workload.Task +import com.atlarge.opendc.workflows.workload.WORKFLOW_TASK_DEADLINE +import java.io.BufferedReader +import java.io.File +import java.io.InputStream +import java.util.UUID +import kotlin.math.max +import kotlin.math.min + +/** + * A [TraceReader] for the Grid Workload Format. See the Grid Workloads Archive (http://gwa.ewi.tudelft.nl/) for more + * information about the format. + * + * Be aware that in the Grid Workload Format, workflows are not required to be ordered by submission time and therefore + * this reader needs to read the whole trace into memory before an entry can be read. Consider converting the trace to a + * different format for better performance. + * + * @param reader The buffered reader to read the trace with. + */ +class GwfTraceReader(reader: BufferedReader) : TraceReader { + /** + * The internal iterator to use for this reader. + */ + private val iterator: Iterator> + + /** + * Create a [GwfTraceReader] instance from the specified [File]. + * + * @param file The file to read from. + */ + constructor(file: File) : this(file.bufferedReader()) + + /** + * Create a [GwfTraceReader] instance from the specified [InputStream]. + * + * @param input The input stream to read from. + */ + constructor(input: InputStream) : this(input.bufferedReader()) + + /** + * Initialize the reader. + */ + init { + val entries = mutableMapOf() + val tasks = mutableMapOf() + val taskDependencies = mutableMapOf>() + + var workflowIdCol = 0 + var taskIdCol = 0 + var submitTimeCol = 0 + var runtimeCol = 0 + var coreCol = 0 + var dependencyCol = 0 + + try { + reader.lineSequence() + .filter { line -> + // Ignore comments in the trace + !line.startsWith("#") && line.isNotBlank() + } + .forEachIndexed { idx, line -> + val values = line.split(",") + + // Parse GWF header + if (idx == 0) { + val header = values.mapIndexed { col, name -> Pair(name.trim(), col) }.toMap() + workflowIdCol = header["WorkflowID"]!! + taskIdCol = header["JobID"]!! + submitTimeCol = header["SubmitTime"]!! + runtimeCol = header["RunTime"]!! + coreCol = header["NProcs"]!! + dependencyCol = header["Dependencies"]!! + return@forEachIndexed + } + + val workflowId = values[workflowIdCol].trim().toLong() + val taskId = values[taskIdCol].trim().toLong() + val submitTime = values[submitTimeCol].trim().toLong() + val runtime = max(0, values[runtimeCol].trim().toLong()) + val cores = values[coreCol].trim().toInt() + val dependencies = values[dependencyCol].split(" ") + .filter { it.isNotEmpty() } + .map { it.trim().toLong() } + + val flops: Long = 4000 * runtime * cores + + val entry = entries.getOrPut(workflowId) { + TraceEntryImpl(submitTime, Job(UUID(0L, taskId), "", UnnamedUser, HashSet())) + } + val workflow = entry.workload + val task = Task( + UUID(0L, taskId), + "", + FlopsApplicationImage(UUID.randomUUID(), "", emptyMap(), flops, cores), + HashSet(), + mapOf(WORKFLOW_TASK_DEADLINE to runtime) + ) + entry.submissionTime = min(entry.submissionTime, submitTime) + (workflow.tasks as MutableSet).add(task) + tasks[taskId] = task + taskDependencies[task] = dependencies + } + } finally { + reader.close() + } + + // Fix dependencies and dependents for all tasks + taskDependencies.forEach { (task, dependencies) -> + (task.dependencies as MutableSet).addAll( + dependencies.map { taskId -> + tasks[taskId] ?: throw IllegalArgumentException("Dependency task with id $taskId not found") + } + ) + } + + // Create the entry iterator + iterator = entries.values.sortedBy { it.submissionTime }.iterator() + } + + override fun hasNext(): Boolean = iterator.hasNext() + + override fun next(): TraceEntry = iterator.next() + + override fun close() {} + + /** + * An unnamed user. + */ + private object UnnamedUser : User { + override val name: String = "" + override val uid: UUID = UUID.randomUUID() + } + + /** + * An entry in the trace. + */ + private data class TraceEntryImpl( + override var submissionTime: Long, + override val workload: Job + ) : TraceEntry +} diff --git a/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/PerformanceInterferenceEntry.kt b/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/PerformanceInterferenceEntry.kt new file mode 100644 index 00000000..ade47e1b --- /dev/null +++ b/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/PerformanceInterferenceEntry.kt @@ -0,0 +1,7 @@ +package com.atlarge.opendc.format.trace.sc20 + +internal data class PerformanceInterferenceEntry( + val vms: List, + val minServerLoad: Double, + val performanceScore: Double +) diff --git a/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt b/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt new file mode 100644 index 00000000..0e8e1fd2 --- /dev/null +++ b/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt @@ -0,0 +1,68 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.format.trace.sc20 + +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModelItem +import com.atlarge.opendc.format.trace.PerformanceInterferenceModelReader +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper +import com.fasterxml.jackson.module.kotlin.readValue +import java.io.InputStream +import java.util.TreeSet +import kotlin.random.Random + +/** + * A parser for the JSON performance interference setup files used for the SC20 paper. + * + * @param input The input stream to read from. + * @param mapper The Jackson object mapper to use. + */ +class Sc20PerformanceInterferenceReader(input: InputStream, mapper: ObjectMapper = jacksonObjectMapper()) : + PerformanceInterferenceModelReader { + /** + * The computed value from the file. + */ + private val items: Map> + + init { + val entries: List = mapper.readValue(input) + val res = mutableMapOf>() + for (entry in entries) { + val item = PerformanceInterferenceModelItem(TreeSet(entry.vms), entry.minServerLoad, entry.performanceScore) + for (workload in entry.vms) { + res.computeIfAbsent(workload) { TreeSet() }.add(item) + } + } + + items = res + } + + override fun construct(random: Random): Map { + return items.mapValues { PerformanceInterferenceModel(it.value, Random(random.nextInt())) } + } + + override fun close() {} +} diff --git a/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt b/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt new file mode 100644 index 00000000..28dc7793 --- /dev/null +++ b/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt @@ -0,0 +1,210 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.format.trace.sc20 + +import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment +import com.atlarge.opendc.compute.core.image.VmImage +import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel +import com.atlarge.opendc.compute.core.workload.VmWorkload +import com.atlarge.opendc.core.User +import com.atlarge.opendc.format.trace.TraceEntry +import com.atlarge.opendc.format.trace.TraceReader +import java.io.BufferedReader +import java.io.File +import java.io.FileReader +import java.util.UUID +import kotlin.math.max +import kotlin.math.min +import kotlin.random.Random + +/** + * A [TraceReader] for the internal VM workload trace format. + * + * @param traceDirectory The directory of the traces. + * @param performanceInterferenceModel The performance model covering the workload in the VM trace. + */ +class Sc20TraceReader( + traceDirectory: File, + performanceInterferenceModel: PerformanceInterferenceModel, + selectedVms: List, + random: Random +) : TraceReader { + /** + * The internal iterator to use for this reader. + */ + private val iterator: Iterator> + + /** + * Initialize the reader. + */ + init { + val entries = mutableMapOf>() + + val timestampCol = 0 + val cpuUsageCol = 1 + val coreCol = 12 + val vmIdCol = 19 + val provisionedMemoryCol = 20 + val traceInterval = 5 * 60 * 1000L + + val vms = if (selectedVms.isEmpty()) { + traceDirectory.walk() + .filterNot { it.isDirectory } + .filter { it.extension == "csv" || it.extension == "txt" } + .toList() + } else { + selectedVms.map { + File(traceDirectory, it) + } + } + + vms + .forEachIndexed { idx, vmFile -> + println(vmFile) + + var vmId = "" + var maxCores = -1 + var requiredMemory = -1L + var timestamp = -1L + var cores = -1 + var minTime = Long.MAX_VALUE + + BufferedReader(FileReader(vmFile)).use { reader -> + reader.lineSequence() + .filter { line -> + // Ignore comments in the trace + !line.startsWith("#") && line.isNotBlank() + } + .forEach { line -> + val values = line.split(" ") + + vmId = vmFile.name + timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L + cores = values[coreCol].trim().toInt() + requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) + maxCores = max(maxCores, cores) + minTime = min(minTime, timestamp) + } + } + + val flopsFragments = sequence { + var last: FlopsHistoryFragment? = null + + BufferedReader(FileReader(vmFile)).use { reader -> + reader.lineSequence() + .chunked(128) + .forEach { lines -> + for (line in lines) { + // Ignore comments in the trace + if (line.startsWith("#") || line.isBlank()) { + continue + } + + val values = line.split(" ") + val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz + requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) + maxCores = max(maxCores, cores) + + val flops: Long = (cpuUsage * 5 * 60).toLong() + + last = if (last != null && last!!.flops == 0L && flops == 0L) { + val oldFragment = last!! + FlopsHistoryFragment( + oldFragment.tick, + oldFragment.flops + flops, + oldFragment.duration + traceInterval, + cpuUsage, + cores + ) + } else { + val fragment = + FlopsHistoryFragment(timestamp, flops, traceInterval, cpuUsage, cores) + if (last != null) { + yield(last!!) + } + fragment + } + } + } + + if (last != null) { + yield(last!!) + } + } + } + + val uuid = UUID(0, idx.toLong()) + + val relevantPerformanceInterferenceModelItems = + PerformanceInterferenceModel( + performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId) }.toSortedSet(), + Random(random.nextInt()) + ) + val vmWorkload = VmWorkload( + uuid, + "VM Workload $vmId", + UnnamedUser, + VmImage( + uuid, + vmId, + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), + flopsFragments.asSequence(), + maxCores, + requiredMemory + ) + ) + entries[uuid] = TraceEntryImpl( + minTime, + vmWorkload + ) + } + + // Create the entry iterator + iterator = entries.values.sortedBy { it.submissionTime }.iterator() + } + + override fun hasNext(): Boolean = iterator.hasNext() + + override fun next(): TraceEntry = iterator.next() + + override fun close() {} + + /** + * An unnamed user. + */ + private object UnnamedUser : User { + override val name: String = "" + override val uid: UUID = UUID.randomUUID() + } + + /** + * An entry in the trace. + */ + private data class TraceEntryImpl( + override var submissionTime: Long, + override val workload: VmWorkload + ) : TraceEntry +} diff --git a/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20VmPlacementReader.kt b/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20VmPlacementReader.kt new file mode 100644 index 00000000..5295ae03 --- /dev/null +++ b/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20VmPlacementReader.kt @@ -0,0 +1,53 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.format.trace.sc20 + +import com.atlarge.opendc.format.trace.VmPlacementReader +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper +import com.fasterxml.jackson.module.kotlin.readValue +import java.io.InputStream + +/** + * A parser for the JSON VM placement data files used for the SC20 paper. + * + * @param input The input stream to read from. + * @param mapper The Jackson object mapper to use. + */ +class Sc20VmPlacementReader(input: InputStream, mapper: ObjectMapper = jacksonObjectMapper()) : + VmPlacementReader { + /** + * The environment that was read from the file. + */ + private val placements = mapper.readValue>(input) + + override fun construct(): Map { + return placements + .mapKeys { "vm__workload__${it.key}.txt" } + .mapValues { it.value.split("/")[1] } // Clusters have format XX0 / X00 + } + + override fun close() {} +} diff --git a/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReader.kt b/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReader.kt new file mode 100644 index 00000000..f7c74562 --- /dev/null +++ b/simulator/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReader.kt @@ -0,0 +1,208 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.format.trace.swf + +import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment +import com.atlarge.opendc.compute.core.image.VmImage +import com.atlarge.opendc.compute.core.workload.VmWorkload +import com.atlarge.opendc.core.User +import com.atlarge.opendc.format.trace.TraceEntry +import com.atlarge.opendc.format.trace.TraceReader +import java.io.BufferedReader +import java.io.File +import java.io.FileReader +import java.util.UUID + +/** + * A [TraceReader] for reading SWF traces into VM-modeled workloads. + * + * The standard is defined by the PWA, see here: https://www.cse.huji.ac.il/labs/parallel/workload/swf.html + * + * @param file The trace file. + */ +class SwfTraceReader( + file: File, + maxNumCores: Int = -1 +) : TraceReader { + /** + * The internal iterator to use for this reader. + */ + private val iterator: Iterator> + + /** + * Initialize the reader. + */ + init { + val entries = mutableMapOf>() + + val jobNumberCol = 0 + val submitTimeCol = 1 // seconds (begin of trace is 0) + val waitTimeCol = 2 // seconds + val runTimeCol = 3 // seconds + val numAllocatedCoresCol = 4 // We assume that single-core processors were used at the time + val requestedMemoryCol = 9 // KB per processor/core (-1 if not specified) + + val sliceDuration = 5 * 60L + + var jobNumber = -1L + var submitTime = -1L + var waitTime = -1L + var runTime = -1L + var cores = -1 + var memory = -1L + var slicedWaitTime = -1L + var flopsPerSecond = -1L + var flopsPartialSlice = -1L + var flopsFullSlice = -1L + var runtimePartialSliceRemainder = -1L + + BufferedReader(FileReader(file)).use { reader -> + reader.lineSequence() + .filter { line -> + // Ignore comments in the trace + !line.startsWith(";") && line.isNotBlank() + } + .forEach { line -> + val values = line.trim().split("\\s+".toRegex()) + + jobNumber = values[jobNumberCol].trim().toLong() + submitTime = values[submitTimeCol].trim().toLong() + waitTime = values[waitTimeCol].trim().toLong() + runTime = values[runTimeCol].trim().toLong() + cores = values[numAllocatedCoresCol].trim().toInt() + memory = values[requestedMemoryCol].trim().toLong() + + if (maxNumCores != -1 && cores > maxNumCores) { + println("Skipped a task due to processor count ($cores > $maxNumCores).") + return@forEach + } + + if (memory == -1L) { + memory = 1000L * cores // assume 1GB of memory per processor if not specified + } else { + memory /= 1000 // convert KB to MB + } + + val flopsHistory = mutableListOf() + + // Insert waiting time slices + + // We ignore wait time remainders under one + slicedWaitTime = 0L + if (waitTime >= sliceDuration) { + for (tick in submitTime until (submitTime + waitTime - sliceDuration) step sliceDuration) { + flopsHistory.add( + FlopsHistoryFragment( + tick * 1000L, + 0L, + sliceDuration * 1000L, + 0.0, + cores + ) + ) + slicedWaitTime += sliceDuration + } + } + + // Insert run time slices + + flopsPerSecond = 4_000L * cores + runtimePartialSliceRemainder = runTime % sliceDuration + flopsPartialSlice = flopsPerSecond * runtimePartialSliceRemainder + flopsFullSlice = flopsPerSecond * runTime - flopsPartialSlice + + for ( + tick in (submitTime + slicedWaitTime) + until (submitTime + slicedWaitTime + runTime - sliceDuration) + step sliceDuration + ) { + flopsHistory.add( + FlopsHistoryFragment( + tick * 1000L, + flopsFullSlice / sliceDuration, + sliceDuration * 1000L, + 1.0, + cores + ) + ) + } + + if (runtimePartialSliceRemainder > 0) { + flopsHistory.add( + FlopsHistoryFragment( + submitTime + (slicedWaitTime + runTime - runtimePartialSliceRemainder), + flopsPartialSlice, + sliceDuration, + runtimePartialSliceRemainder / sliceDuration.toDouble(), + cores + ) + ) + } + + val uuid = UUID(0L, jobNumber) + val vmWorkload = VmWorkload( + uuid, + "SWF Workload $jobNumber", + UnnamedUser, + VmImage( + uuid, + jobNumber.toString(), + emptyMap(), + flopsHistory.asSequence(), + cores, + memory + ) + ) + + entries[jobNumber] = TraceEntryImpl(submitTime, vmWorkload) + } + } + + // Create the entry iterator + iterator = entries.values.sortedBy { it.submissionTime }.iterator() + } + + override fun hasNext(): Boolean = iterator.hasNext() + + override fun next(): TraceEntry = iterator.next() + + override fun close() {} + + /** + * An unnamed user. + */ + private object UnnamedUser : User { + override val name: String = "" + override val uid: UUID = UUID.randomUUID() + } + + /** + * An entry in the trace. + */ + private data class TraceEntryImpl( + override var submissionTime: Long, + override val workload: VmWorkload + ) : TraceEntry +} -- cgit v1.2.3