From 55a4c8208cc44ac626f7b8c61a19d5ec725ec936 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 20 Sep 2021 11:48:18 +0200 Subject: refactor(trace): Unify columns of different tables This change unifies columns of different tables used by trace formats. This concretely means that instead of having columns specific per table (e.g., RESOURCE_ID and RESOURCE_STATE_ID), with this changes these columns are shared between the tables with a single definition (RESOURCE_ID). --- .../main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'opendc-compute') diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt index c92b212f..8a2585ce 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt @@ -62,10 +62,10 @@ public class ComputeWorkloadLoader(private val baseDir: File) { return try { while (reader.nextRow()) { - val id = reader.get(RESOURCE_STATE_ID) + val id = reader.get(RESOURCE_ID) val time = reader.get(RESOURCE_STATE_TIMESTAMP) val duration = reader.get(RESOURCE_STATE_DURATION) - val cores = reader.getInt(RESOURCE_STATE_CPU_COUNT) + val cores = reader.getInt(RESOURCE_CPU_COUNT) val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE) val fragment = SimTraceWorkload.Fragment( -- cgit v1.2.3 From e765316cfc089dd50602759ea7afbcc7a3cd4c00 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 20 Sep 2021 15:17:49 +0200 Subject: perf(compute): Use index lookup in trace loader This change updates the ComputeWorkloadLoader to use index column lookups in order to prevent having to lookup the index for every row. --- .../compute/workload/ComputeWorkloadLoader.kt | 34 +++++++++++++++------- 1 file changed, 24 insertions(+), 10 deletions(-) (limited to 'opendc-compute') diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt index 8a2585ce..ab7f051f 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt @@ -27,6 +27,8 @@ import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.trace.* import org.opendc.trace.opendc.OdcVmTraceFormat import java.io.File +import java.time.Duration +import java.time.Instant import java.util.* import java.util.concurrent.ConcurrentHashMap import kotlin.math.roundToLong @@ -58,15 +60,21 @@ public class ComputeWorkloadLoader(private val baseDir: File) { private fun parseFragments(trace: Trace): Map> { val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() + val idCol = reader.resolve(RESOURCE_ID) + val timestampCol = reader.resolve(RESOURCE_STATE_TIMESTAMP) + val durationCol = reader.resolve(RESOURCE_STATE_DURATION) + val coresCol = reader.resolve(RESOURCE_CPU_COUNT) + val usageCol = reader.resolve(RESOURCE_STATE_CPU_USAGE) + val fragments = mutableMapOf>() return try { while (reader.nextRow()) { - val id = reader.get(RESOURCE_ID) - val time = reader.get(RESOURCE_STATE_TIMESTAMP) - val duration = reader.get(RESOURCE_STATE_DURATION) - val cores = reader.getInt(RESOURCE_CPU_COUNT) - val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE) + val id = reader.get(idCol) as String + val time = reader.get(timestampCol) as Instant + val duration = reader.get(durationCol) as Duration + val cores = reader.getInt(coresCol) + val cpuUsage = reader.getDouble(usageCol) val fragment = SimTraceWorkload.Fragment( time.toEpochMilli(), @@ -90,21 +98,27 @@ public class ComputeWorkloadLoader(private val baseDir: File) { private fun parseMeta(trace: Trace, fragments: Map>): List { val reader = checkNotNull(trace.getTable(TABLE_RESOURCES)).newReader() + val idCol = reader.resolve(RESOURCE_ID) + val startTimeCol = reader.resolve(RESOURCE_START_TIME) + val stopTimeCol = reader.resolve(RESOURCE_STOP_TIME) + val coresCol = reader.resolve(RESOURCE_CPU_COUNT) + val memCol = reader.resolve(RESOURCE_MEM_CAPACITY) + var counter = 0 val entries = mutableListOf() return try { while (reader.nextRow()) { - val id = reader.get(RESOURCE_ID) + val id = reader.get(idCol) as String if (!fragments.containsKey(id)) { continue } - val submissionTime = reader.get(RESOURCE_START_TIME) - val endTime = reader.get(RESOURCE_STOP_TIME) - val maxCores = reader.getInt(RESOURCE_CPU_COUNT) - val requiredMemory = reader.getDouble(RESOURCE_MEM_CAPACITY) / 1000.0 // Convert from KB to MB + val submissionTime = reader.get(startTimeCol) as Instant + val endTime = reader.get(stopTimeCol) as Instant + val maxCores = reader.getInt(coresCol) + val requiredMemory = reader.getDouble(memCol) / 1000.0 // Convert from KB to MB val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray()) val vmFragments = fragments.getValue(id).asSequence() -- cgit v1.2.3 From c7fff03408ee3109d0a39a96c043584a2d8f67ca Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 20 Sep 2021 22:04:23 +0200 Subject: refactor(trace): Simplify TraceFormat SPI interface This change simplifies the TraceFormat SPI interface by reducing the number of interfaces that implementors need to implement to only TraceFormat. --- .../kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) (limited to 'opendc-compute') diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt index ab7f051f..6dba41e6 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt @@ -25,7 +25,6 @@ package org.opendc.compute.workload import mu.KotlinLogging import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.trace.* -import org.opendc.trace.opendc.OdcVmTraceFormat import java.io.File import java.time.Duration import java.time.Instant @@ -44,11 +43,6 @@ public class ComputeWorkloadLoader(private val baseDir: File) { */ private val logger = KotlinLogging.logger {} - /** - * The [OdcVmTraceFormat] instance to load the traces - */ - private val format = OdcVmTraceFormat() - /** * The cache of workloads. */ @@ -159,7 +153,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) { logger.info { "Loading trace $it at $path" } - val trace = format.open(path.toURI().toURL()) + val trace = Trace.open(path, format = "opendc-vm") val fragments = parseFragments(trace) parseMeta(trace, fragments) } -- cgit v1.2.3 From 68ef3700ed2f69bcf0118bb69eda71e6b1f4d54f Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 21 Sep 2021 11:34:34 +0200 Subject: feat(trace): Add support for writing traces This change adds a new API for writing traces in a trace format. Currently, writing is only supported by the OpenDC VM format, but over time the other formats will also have support for writing added. --- .../opendc-compute-workload/build.gradle.kts | 2 +- .../opendc/compute/workload/ComputeSchedulers.kt | 86 ++++++++++++++++++++++ .../compute/workload/ComputeWorkloadLoader.kt | 6 +- .../opendc/compute/workload/ComputeWorkloads.kt | 2 +- .../workload/internal/TraceComputeWorkload.kt | 4 +- 5 files changed, 93 insertions(+), 7 deletions(-) create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeSchedulers.kt (limited to 'opendc-compute') diff --git a/opendc-compute/opendc-compute-workload/build.gradle.kts b/opendc-compute/opendc-compute-workload/build.gradle.kts index e82cf203..28a5e1da 100644 --- a/opendc-compute/opendc-compute-workload/build.gradle.kts +++ b/opendc-compute/opendc-compute-workload/build.gradle.kts @@ -32,7 +32,7 @@ dependencies { api(platform(projects.opendcPlatform)) api(projects.opendcCompute.opendcComputeSimulator) - implementation(projects.opendcTrace.opendcTraceOpendc) + implementation(projects.opendcTrace.opendcTraceApi) implementation(projects.opendcTrace.opendcTraceParquet) implementation(projects.opendcSimulator.opendcSimulatorCore) implementation(projects.opendcSimulator.opendcSimulatorCompute) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeSchedulers.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeSchedulers.kt new file mode 100644 index 00000000..c94f30e4 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeSchedulers.kt @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("ComputeSchedulers") +package org.opendc.compute.workload + +import org.opendc.compute.service.scheduler.ComputeScheduler +import org.opendc.compute.service.scheduler.FilterScheduler +import org.opendc.compute.service.scheduler.ReplayScheduler +import org.opendc.compute.service.scheduler.filters.ComputeFilter +import org.opendc.compute.service.scheduler.filters.RamFilter +import org.opendc.compute.service.scheduler.filters.VCpuFilter +import org.opendc.compute.service.scheduler.weights.CoreRamWeigher +import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher +import org.opendc.compute.service.scheduler.weights.RamWeigher +import org.opendc.compute.service.scheduler.weights.VCpuWeigher +import java.util.* + +/** + * Create a [ComputeScheduler] for the experiment. + */ +public fun createComputeScheduler(name: String, seeder: Random, placements: Map = emptyMap()): ComputeScheduler { + val cpuAllocationRatio = 16.0 + val ramAllocationRatio = 1.5 + return when (name) { + "mem" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(RamWeigher(multiplier = 1.0)) + ) + "mem-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(RamWeigher(multiplier = -1.0)) + ) + "core-mem" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(CoreRamWeigher(multiplier = 1.0)) + ) + "core-mem-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(CoreRamWeigher(multiplier = -1.0)) + ) + "active-servers" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(InstanceCountWeigher(multiplier = -1.0)) + ) + "active-servers-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(InstanceCountWeigher(multiplier = 1.0)) + ) + "provisioned-cores" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = 1.0)) + ) + "provisioned-cores-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = -1.0)) + ) + "random" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = emptyList(), + subsetSize = Int.MAX_VALUE, + random = Random(seeder.nextLong()) + ) + "replay" -> ReplayScheduler(placements) + else -> throw IllegalArgumentException("Unknown policy $name") + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt index 6dba41e6..7c579e39 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt @@ -145,15 +145,15 @@ public class ComputeWorkloadLoader(private val baseDir: File) { } /** - * Load the trace with the specified [name]. + * Load the trace with the specified [name] and [format]. */ - public fun get(name: String): List { + public fun get(name: String, format: String): List { return cache.computeIfAbsent(name) { val path = baseDir.resolve(it) logger.info { "Loading trace $it at $path" } - val trace = Trace.open(path, format = "opendc-vm") + val trace = Trace.open(path, format) val fragments = parseFragments(trace) parseMeta(trace, fragments) } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt index f58ce587..2f4935ca 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt @@ -31,7 +31,7 @@ import org.opendc.compute.workload.internal.TraceComputeWorkload /** * Construct a workload from a trace. */ -public fun trace(name: String): ComputeWorkload = TraceComputeWorkload(name) +public fun trace(name: String, format: String = "opendc-vm"): ComputeWorkload = TraceComputeWorkload(name, format) /** * Construct a composite workload with the specified fractions. diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt index d657ff01..c20cb8f3 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt @@ -30,8 +30,8 @@ import java.util.* /** * A [ComputeWorkload] from a trace. */ -internal class TraceComputeWorkload(val name: String) : ComputeWorkload { +internal class TraceComputeWorkload(val name: String, val format: String) : ComputeWorkload { override fun resolve(loader: ComputeWorkloadLoader, random: Random): List { - return loader.get(name) + return loader.get(name, format) } } -- cgit v1.2.3