summaryrefslogtreecommitdiff
path: root/opendc-compute
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-21 12:04:15 +0200
committerGitHub <noreply@github.com>2021-09-21 12:04:15 +0200
commit322d91db03a7d74a00ec623ce624f979c0b77c03 (patch)
tree73201888564accde4cfa107f4ffdb15e9f93d45c /opendc-compute
parent453c25c4b453fa0af26bebbd8863abfb79218119 (diff)
parent68ef3700ed2f69bcf0118bb69eda71e6b1f4d54f (diff)
merge: Add support for trace writing
This pull request extends the trace API to support writing new traces. - Unify columns of different tables - Support column lookup via index - Use index lookup in trace loader - Add property for describing partition keys - Simplify TraceFormat SPI interface - Add support for writing traces **Breaking API Changes** - `TraceFormat` SPI interface has been redesigned.
Diffstat (limited to 'opendc-compute')
-rw-r--r--opendc-compute/opendc-compute-workload/build.gradle.kts2
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeSchedulers.kt86
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt46
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt2
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt4
5 files changed, 117 insertions, 23 deletions
diff --git a/opendc-compute/opendc-compute-workload/build.gradle.kts b/opendc-compute/opendc-compute-workload/build.gradle.kts
index e82cf203..28a5e1da 100644
--- a/opendc-compute/opendc-compute-workload/build.gradle.kts
+++ b/opendc-compute/opendc-compute-workload/build.gradle.kts
@@ -32,7 +32,7 @@ dependencies {
api(platform(projects.opendcPlatform))
api(projects.opendcCompute.opendcComputeSimulator)
- implementation(projects.opendcTrace.opendcTraceOpendc)
+ implementation(projects.opendcTrace.opendcTraceApi)
implementation(projects.opendcTrace.opendcTraceParquet)
implementation(projects.opendcSimulator.opendcSimulatorCore)
implementation(projects.opendcSimulator.opendcSimulatorCompute)
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeSchedulers.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeSchedulers.kt
new file mode 100644
index 00000000..c94f30e4
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeSchedulers.kt
@@ -0,0 +1,86 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+@file:JvmName("ComputeSchedulers")
+package org.opendc.compute.workload
+
+import org.opendc.compute.service.scheduler.ComputeScheduler
+import org.opendc.compute.service.scheduler.FilterScheduler
+import org.opendc.compute.service.scheduler.ReplayScheduler
+import org.opendc.compute.service.scheduler.filters.ComputeFilter
+import org.opendc.compute.service.scheduler.filters.RamFilter
+import org.opendc.compute.service.scheduler.filters.VCpuFilter
+import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
+import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher
+import org.opendc.compute.service.scheduler.weights.RamWeigher
+import org.opendc.compute.service.scheduler.weights.VCpuWeigher
+import java.util.*
+
+/**
+ * Create a [ComputeScheduler] for the experiment.
+ */
+public fun createComputeScheduler(name: String, seeder: Random, placements: Map<String, String> = emptyMap()): ComputeScheduler {
+ val cpuAllocationRatio = 16.0
+ val ramAllocationRatio = 1.5
+ return when (name) {
+ "mem" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(RamWeigher(multiplier = 1.0))
+ )
+ "mem-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(RamWeigher(multiplier = -1.0))
+ )
+ "core-mem" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(CoreRamWeigher(multiplier = 1.0))
+ )
+ "core-mem-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(CoreRamWeigher(multiplier = -1.0))
+ )
+ "active-servers" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(InstanceCountWeigher(multiplier = -1.0))
+ )
+ "active-servers-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(InstanceCountWeigher(multiplier = 1.0))
+ )
+ "provisioned-cores" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = 1.0))
+ )
+ "provisioned-cores-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = -1.0))
+ )
+ "random" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = emptyList(),
+ subsetSize = Int.MAX_VALUE,
+ random = Random(seeder.nextLong())
+ )
+ "replay" -> ReplayScheduler(placements)
+ else -> throw IllegalArgumentException("Unknown policy $name")
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
index c92b212f..7c579e39 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
@@ -25,8 +25,9 @@ package org.opendc.compute.workload
import mu.KotlinLogging
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.trace.*
-import org.opendc.trace.opendc.OdcVmTraceFormat
import java.io.File
+import java.time.Duration
+import java.time.Instant
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import kotlin.math.roundToLong
@@ -43,11 +44,6 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
private val logger = KotlinLogging.logger {}
/**
- * The [OdcVmTraceFormat] instance to load the traces
- */
- private val format = OdcVmTraceFormat()
-
- /**
* The cache of workloads.
*/
private val cache = ConcurrentHashMap<String, List<VirtualMachine>>()
@@ -58,15 +54,21 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
private fun parseFragments(trace: Trace): Map<String, List<SimTraceWorkload.Fragment>> {
val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader()
+ val idCol = reader.resolve(RESOURCE_ID)
+ val timestampCol = reader.resolve(RESOURCE_STATE_TIMESTAMP)
+ val durationCol = reader.resolve(RESOURCE_STATE_DURATION)
+ val coresCol = reader.resolve(RESOURCE_CPU_COUNT)
+ val usageCol = reader.resolve(RESOURCE_STATE_CPU_USAGE)
+
val fragments = mutableMapOf<String, MutableList<SimTraceWorkload.Fragment>>()
return try {
while (reader.nextRow()) {
- val id = reader.get(RESOURCE_STATE_ID)
- val time = reader.get(RESOURCE_STATE_TIMESTAMP)
- val duration = reader.get(RESOURCE_STATE_DURATION)
- val cores = reader.getInt(RESOURCE_STATE_CPU_COUNT)
- val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE)
+ val id = reader.get(idCol) as String
+ val time = reader.get(timestampCol) as Instant
+ val duration = reader.get(durationCol) as Duration
+ val cores = reader.getInt(coresCol)
+ val cpuUsage = reader.getDouble(usageCol)
val fragment = SimTraceWorkload.Fragment(
time.toEpochMilli(),
@@ -90,21 +92,27 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
private fun parseMeta(trace: Trace, fragments: Map<String, List<SimTraceWorkload.Fragment>>): List<VirtualMachine> {
val reader = checkNotNull(trace.getTable(TABLE_RESOURCES)).newReader()
+ val idCol = reader.resolve(RESOURCE_ID)
+ val startTimeCol = reader.resolve(RESOURCE_START_TIME)
+ val stopTimeCol = reader.resolve(RESOURCE_STOP_TIME)
+ val coresCol = reader.resolve(RESOURCE_CPU_COUNT)
+ val memCol = reader.resolve(RESOURCE_MEM_CAPACITY)
+
var counter = 0
val entries = mutableListOf<VirtualMachine>()
return try {
while (reader.nextRow()) {
- val id = reader.get(RESOURCE_ID)
+ val id = reader.get(idCol) as String
if (!fragments.containsKey(id)) {
continue
}
- val submissionTime = reader.get(RESOURCE_START_TIME)
- val endTime = reader.get(RESOURCE_STOP_TIME)
- val maxCores = reader.getInt(RESOURCE_CPU_COUNT)
- val requiredMemory = reader.getDouble(RESOURCE_MEM_CAPACITY) / 1000.0 // Convert from KB to MB
+ val submissionTime = reader.get(startTimeCol) as Instant
+ val endTime = reader.get(stopTimeCol) as Instant
+ val maxCores = reader.getInt(coresCol)
+ val requiredMemory = reader.getDouble(memCol) / 1000.0 // Convert from KB to MB
val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray())
val vmFragments = fragments.getValue(id).asSequence()
@@ -137,15 +145,15 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
}
/**
- * Load the trace with the specified [name].
+ * Load the trace with the specified [name] and [format].
*/
- public fun get(name: String): List<VirtualMachine> {
+ public fun get(name: String, format: String): List<VirtualMachine> {
return cache.computeIfAbsent(name) {
val path = baseDir.resolve(it)
logger.info { "Loading trace $it at $path" }
- val trace = format.open(path.toURI().toURL())
+ val trace = Trace.open(path, format)
val fragments = parseFragments(trace)
parseMeta(trace, fragments)
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt
index f58ce587..2f4935ca 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt
@@ -31,7 +31,7 @@ import org.opendc.compute.workload.internal.TraceComputeWorkload
/**
* Construct a workload from a trace.
*/
-public fun trace(name: String): ComputeWorkload = TraceComputeWorkload(name)
+public fun trace(name: String, format: String = "opendc-vm"): ComputeWorkload = TraceComputeWorkload(name, format)
/**
* Construct a composite workload with the specified fractions.
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt
index d657ff01..c20cb8f3 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt
@@ -30,8 +30,8 @@ import java.util.*
/**
* A [ComputeWorkload] from a trace.
*/
-internal class TraceComputeWorkload(val name: String) : ComputeWorkload {
+internal class TraceComputeWorkload(val name: String, val format: String) : ComputeWorkload {
override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> {
- return loader.get(name)
+ return loader.get(name, format)
}
}