summaryrefslogtreecommitdiff
path: root/opendc-compute
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-compute')
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt2
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt (renamed from opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceReader.kt)17
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt (renamed from opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/RawParquetTraceReader.kt)81
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt34
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt62
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModel.kt3
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModels.kt11
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt49
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt66
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt143
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt61
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt (renamed from opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceEntry.kt)27
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/StreamingParquetTraceReader.kt261
13 files changed, 479 insertions, 338 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt
index 87903623..fcd9dd7e 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt
@@ -24,8 +24,8 @@ package org.opendc.compute.simulator.failure
import org.apache.commons.math3.distribution.RealDistribution
import org.opendc.compute.simulator.SimHost
+import java.util.*
import kotlin.math.roundToInt
-import kotlin.random.Random
/**
* A [VictimSelector] that stochastically selects a set of hosts to be failed.
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt
index b0795d61..78002c2f 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceReader.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt
@@ -20,13 +20,16 @@
* SOFTWARE.
*/
-package org.opendc.compute.workload.trace
+package org.opendc.compute.workload
+
+import java.util.*
/**
- * An interface for reading workloads into memory.
- *
- * This interface must guarantee that the entries are delivered in order of submission time.
- *
- * @param T The shape of the workloads supported by this reader.
+ * An interface that describes how a workload is resolved.
*/
-public interface TraceReader<T> : Iterator<TraceEntry<T>>, AutoCloseable
+public interface ComputeWorkload {
+ /**
+ * Resolve the workload into a list of [VirtualMachine]s to simulate.
+ */
+ public fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine>
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/RawParquetTraceReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
index ae20482d..46176609 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/RawParquetTraceReader.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
@@ -20,30 +20,42 @@
* SOFTWARE.
*/
-package org.opendc.compute.workload.trace
+package org.opendc.compute.workload
+import mu.KotlinLogging
import org.opendc.compute.workload.trace.bp.BPTraceFormat
import org.opendc.simulator.compute.workload.SimTraceWorkload
-import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.trace.*
import java.io.File
-import java.util.UUID
+import java.util.*
+import java.util.concurrent.ConcurrentHashMap
+import kotlin.math.roundToLong
/**
- * A [TraceReader] for the internal VM workload trace format.
+ * A helper class for loading compute workload traces into memory.
*
- * @param path The directory of the traces.
+ * @param baseDir The directory containing the traces.
*/
-public class RawParquetTraceReader(private val path: File) {
+public class ComputeWorkloadLoader(private val baseDir: File) {
/**
- * The [Trace] that represents this trace.
+ * The logger for this instance.
*/
- private val trace = BPTraceFormat().open(path.toURI().toURL())
+ private val logger = KotlinLogging.logger {}
+
+ /**
+ * The [BPTraceFormat] instance to load the traces
+ */
+ private val format = BPTraceFormat()
+
+ /**
+ * The cache of workloads.
+ */
+ private val cache = ConcurrentHashMap<String, List<VirtualMachine>>()
/**
* Read the fragments into memory.
*/
- private fun parseFragments(): Map<String, List<SimTraceWorkload.Fragment>> {
+ private fun parseFragments(trace: Trace): Map<String, List<SimTraceWorkload.Fragment>> {
val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader()
val fragments = mutableMapOf<String, MutableList<SimTraceWorkload.Fragment>>()
@@ -75,11 +87,11 @@ public class RawParquetTraceReader(private val path: File) {
/**
* Read the metadata into a workload.
*/
- private fun parseMeta(fragments: Map<String, List<SimTraceWorkload.Fragment>>): List<TraceEntry<SimWorkload>> {
+ private fun parseMeta(trace: Trace, fragments: Map<String, List<SimTraceWorkload.Fragment>>): List<VirtualMachine> {
val reader = checkNotNull(trace.getTable(TABLE_RESOURCES)).newReader()
var counter = 0
- val entries = mutableListOf<TraceEntry<SimWorkload>>()
+ val entries = mutableListOf<VirtualMachine>()
return try {
while (reader.nextRow()) {
@@ -96,23 +108,25 @@ public class RawParquetTraceReader(private val path: File) {
val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray())
val vmFragments = fragments.getValue(id).asSequence()
- val totalLoad = vmFragments.sumOf { it.usage } * 5 * 60 // avg MHz * duration = MFLOPs
- val workload = SimTraceWorkload(vmFragments)
+ val totalLoad = vmFragments.sumOf { (it.usage * it.duration) / 1000.0 } // avg MHz * duration = MFLOPs
+
entries.add(
- TraceEntry(
- uid, id, submissionTime.toEpochMilli(), workload,
- mapOf(
- "submit-time" to submissionTime.toEpochMilli(),
- "end-time" to endTime.toEpochMilli(),
- "total-load" to totalLoad,
- "cores" to maxCores,
- "required-memory" to requiredMemory.toLong(),
- "workload" to workload
- )
+ VirtualMachine(
+ uid,
+ id,
+ maxCores,
+ requiredMemory.roundToLong(),
+ totalLoad,
+ submissionTime,
+ endTime,
+ vmFragments
)
)
}
+ // Make sure the virtual machines are ordered by start time
+ entries.sortBy { it.startTime }
+
entries
} catch (e: Exception) {
e.printStackTrace()
@@ -123,17 +137,24 @@ public class RawParquetTraceReader(private val path: File) {
}
/**
- * The entries in the trace.
+ * Load the trace with the specified [name].
*/
- private val entries: List<TraceEntry<SimWorkload>>
+ public fun get(name: String): List<VirtualMachine> {
+ return cache.computeIfAbsent(name) {
+ val path = baseDir.resolve(it)
+
+ logger.info { "Loading trace $it at $path" }
- init {
- val fragments = parseFragments()
- entries = parseMeta(fragments)
+ val trace = format.open(path.toURI().toURL())
+ val fragments = parseFragments(trace)
+ parseMeta(trace, fragments)
+ }
}
/**
- * Read the entries in the trace.
+ * Clear the workload cache.
*/
- public fun read(): List<TraceEntry<SimWorkload>> = entries
+ public fun reset() {
+ cache.clear()
+ }
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt
index 6da0f49a..ed45bd8a 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt
@@ -34,14 +34,13 @@ import org.opendc.compute.service.ComputeService
import org.opendc.compute.service.scheduler.ComputeScheduler
import org.opendc.compute.simulator.SimHost
import org.opendc.compute.workload.topology.HostSpec
-import org.opendc.compute.workload.trace.TraceReader
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.compute.workload.SimTraceWorkload
-import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.resources.SimResourceInterpreter
import org.opendc.telemetry.compute.*
import org.opendc.telemetry.sdk.toOtelClock
import java.time.Clock
+import java.util.*
import kotlin.coroutines.CoroutineContext
import kotlin.math.max
@@ -90,10 +89,11 @@ public class ComputeWorkloadRunner(
}
/**
- * Run a simulation of the [ComputeService] by replaying the workload trace given by [reader].
+ * Run a simulation of the [ComputeService] by replaying the workload trace given by [trace].
*/
- public suspend fun run(reader: TraceReader<SimWorkload>) {
- val injector = failureModel?.createInjector(context, clock, service)
+ public suspend fun run(trace: List<VirtualMachine>, seed: Long) {
+ val random = Random(seed)
+ val injector = failureModel?.createInjector(context, clock, service, random)
val client = service.newClient()
// Create new image for the virtual machine
@@ -106,35 +106,36 @@ public class ComputeWorkloadRunner(
var offset = Long.MIN_VALUE
- while (reader.hasNext()) {
- val entry = reader.next()
+ for (entry in trace.sortedBy { it.startTime }) {
+ val now = clock.millis()
+ val start = entry.startTime.toEpochMilli()
if (offset < 0) {
- offset = entry.start - clock.millis()
+ offset = start - now
}
// Make sure the trace entries are ordered by submission time
- assert(entry.start - offset >= 0) { "Invalid trace order" }
- delay(max(0, (entry.start - offset) - clock.millis()))
+ assert(start - offset >= 0) { "Invalid trace order" }
+ delay(max(0, (start - offset) - now))
launch {
val workloadOffset = -offset + 300001
- val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace, workloadOffset)
+ val workload = SimTraceWorkload(entry.trace, workloadOffset)
val server = client.newServer(
entry.name,
image,
client.newFlavor(
entry.name,
- entry.meta["cores"] as Int,
- entry.meta["required-memory"] as Long
+ entry.cpuCount,
+ entry.memCapacity
),
- meta = entry.meta + mapOf("workload" to workload)
+ meta = mapOf("workload" to workload)
)
// Wait for the server reach its end time
- val endTime = entry.meta["end-time"] as Long
- delay(endTime + workloadOffset - clock.millis() + 1)
+ val endTime = entry.stopTime.toEpochMilli()
+ delay(endTime + workloadOffset - clock.millis() + 5 * 60 * 1000)
// Delete the server after reaching the end-time of the virtual machine
server.delete()
@@ -145,7 +146,6 @@ public class ComputeWorkloadRunner(
yield()
} finally {
injector?.close()
- reader.close()
client.close()
}
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt
new file mode 100644
index 00000000..f58ce587
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt
@@ -0,0 +1,62 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+@file:JvmName("ComputeWorkloads")
+package org.opendc.compute.workload
+
+import org.opendc.compute.workload.internal.CompositeComputeWorkload
+import org.opendc.compute.workload.internal.HpcSampledComputeWorkload
+import org.opendc.compute.workload.internal.LoadSampledComputeWorkload
+import org.opendc.compute.workload.internal.TraceComputeWorkload
+
+/**
+ * Construct a workload from a trace.
+ */
+public fun trace(name: String): ComputeWorkload = TraceComputeWorkload(name)
+
+/**
+ * Construct a composite workload with the specified fractions.
+ */
+public fun composite(vararg pairs: Pair<ComputeWorkload, Double>): ComputeWorkload {
+ return CompositeComputeWorkload(pairs.toMap())
+}
+
+/**
+ * Sample a workload by a [fraction] of the total load.
+ */
+public fun ComputeWorkload.sampleByLoad(fraction: Double): ComputeWorkload {
+ return LoadSampledComputeWorkload(this, fraction)
+}
+
+/**
+ * Sample a workload by a [fraction] of the HPC VMs (count)
+ */
+public fun ComputeWorkload.sampleByHpc(fraction: Double): ComputeWorkload {
+ return HpcSampledComputeWorkload(this, fraction)
+}
+
+/**
+ * Sample a workload by a [fraction] of the HPC load
+ */
+public fun ComputeWorkload.sampleByHpcLoad(fraction: Double): ComputeWorkload {
+ return HpcSampledComputeWorkload(this, fraction, sampleLoad = true)
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModel.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModel.kt
index 43dd8321..4d9ef15d 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModel.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModel.kt
@@ -25,6 +25,7 @@ package org.opendc.compute.workload
import org.opendc.compute.service.ComputeService
import org.opendc.compute.simulator.failure.HostFaultInjector
import java.time.Clock
+import java.util.*
import kotlin.coroutines.CoroutineContext
/**
@@ -34,5 +35,5 @@ public interface FailureModel {
/**
* Construct a [HostFaultInjector] for the specified [service].
*/
- public fun createInjector(context: CoroutineContext, clock: Clock, service: ComputeService): HostFaultInjector
+ public fun createInjector(context: CoroutineContext, clock: Clock, service: ComputeService, random: Random): HostFaultInjector
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModels.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModels.kt
index 55c61be1..be7120b9 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModels.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModels.kt
@@ -32,9 +32,9 @@ import org.opendc.compute.simulator.failure.StartStopHostFault
import org.opendc.compute.simulator.failure.StochasticVictimSelector
import java.time.Clock
import java.time.Duration
+import java.util.*
import kotlin.coroutines.CoroutineContext
import kotlin.math.ln
-import kotlin.random.Random
/**
* Obtain a [FailureModel] based on the GRID'5000 failure trace.
@@ -42,14 +42,15 @@ import kotlin.random.Random
* This fault injector uses parameters from the GRID'5000 failure trace as described in
* "A Framework for the Study of Grid Inter-Operation Mechanisms", A. Iosup, 2009.
*/
-public fun grid5000(failureInterval: Duration, seed: Int): FailureModel {
+public fun grid5000(failureInterval: Duration): FailureModel {
return object : FailureModel {
override fun createInjector(
context: CoroutineContext,
clock: Clock,
- service: ComputeService
+ service: ComputeService,
+ random: Random
): HostFaultInjector {
- val rng = Well19937c(seed)
+ val rng = Well19937c(random.nextLong())
val hosts = service.hosts.map { it as SimHost }.toSet()
// Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009
@@ -59,7 +60,7 @@ public fun grid5000(failureInterval: Duration, seed: Int): FailureModel {
clock,
hosts,
iat = LogNormalDistribution(rng, ln(failureInterval.toHours().toDouble()), 1.03),
- selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), Random(seed)),
+ selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), random),
fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71))
)
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt
new file mode 100644
index 00000000..40484b68
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt
@@ -0,0 +1,49 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload
+
+import org.opendc.simulator.compute.workload.SimTraceWorkload
+import java.time.Instant
+import java.util.*
+
+/**
+ * A virtual machine workload.
+ *
+ * @param uid The unique identifier of the virtual machine.
+ * @param name The name of the virtual machine.
+ * @param cpuCount The number of vCPUs in the VM.
+ * @param memCapacity The provisioned memory for the VM.
+ * @param startTime The start time of the VM.
+ * @param stopTime The stop time of the VM.
+ * @param trace The trace fragments that belong to this VM.
+ */
+public data class VirtualMachine(
+ val uid: UUID,
+ val name: String,
+ val cpuCount: Int,
+ val memCapacity: Long,
+ val totalLoad: Double,
+ val startTime: Instant,
+ val stopTime: Instant,
+ val trace: Sequence<SimTraceWorkload.Fragment>,
+)
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt
new file mode 100644
index 00000000..9b2bec55
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt
@@ -0,0 +1,66 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.internal
+
+import mu.KotlinLogging
+import org.opendc.compute.workload.ComputeWorkload
+import org.opendc.compute.workload.ComputeWorkloadLoader
+import org.opendc.compute.workload.VirtualMachine
+import java.util.*
+
+/**
+ * A [ComputeWorkload] that samples multiple workloads based on the total load of all workloads.
+ */
+internal class CompositeComputeWorkload(val sources: Map<ComputeWorkload, Double>) : ComputeWorkload {
+ /**
+ * The logging instance of this class.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> {
+ val traces = sources.map { (source, fraction) -> fraction to source.resolve(loader, random) }
+
+ val totalLoad = traces.sumOf { (_, vms) -> vms.sumOf { it.totalLoad } }
+
+ val res = mutableListOf<VirtualMachine>()
+
+ for ((fraction, vms) in traces) {
+ var currentLoad = 0.0
+
+ for (entry in vms) {
+ val entryLoad = entry.totalLoad
+ if ((currentLoad + entryLoad) / totalLoad > fraction) {
+ break
+ }
+
+ currentLoad += entryLoad
+ res += entry
+ }
+ }
+
+ val vmCount = traces.sumOf { (_, vms) -> vms.size }
+ logger.info { "Sampled $vmCount VMs into subset of ${res.size} VMs" }
+
+ return res
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt
new file mode 100644
index 00000000..52f4c672
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt
@@ -0,0 +1,143 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.internal
+
+import mu.KotlinLogging
+import org.opendc.compute.workload.ComputeWorkload
+import org.opendc.compute.workload.ComputeWorkloadLoader
+import org.opendc.compute.workload.VirtualMachine
+import java.util.*
+
+/**
+ * A [ComputeWorkload] that samples HPC VMs in the workload.
+ *
+ * @param fraction The fraction of load/virtual machines to sample
+ * @param sampleLoad A flag to indicate that the sampling should be based on the total load or on the number of VMs.
+ */
+internal class HpcSampledComputeWorkload(val source: ComputeWorkload, val fraction: Double, val sampleLoad: Boolean = false) : ComputeWorkload {
+ /**
+ * The logging instance of this class.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
+ * The pattern to match compute nodes in the workload.
+ */
+ private val pattern = Regex("^(ComputeNode|cn).*")
+
+ override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> {
+ val vms = source.resolve(loader, random)
+
+ val (hpc, nonHpc) = vms.partition { entry ->
+ val name = entry.name
+ name.matches(pattern)
+ }
+
+ val hpcSequence = generateSequence(0) { it + 1 }
+ .map { index ->
+ val res = mutableListOf<VirtualMachine>()
+ hpc.mapTo(res) { sample(it, index) }
+ res.shuffle(random)
+ res
+ }
+ .flatten()
+
+ val nonHpcSequence = generateSequence(0) { it + 1 }
+ .map { index ->
+ val res = mutableListOf<VirtualMachine>()
+ nonHpc.mapTo(res) { sample(it, index) }
+ res.shuffle(random)
+ res
+ }
+ .flatten()
+
+ logger.debug { "Found ${hpc.size} HPC workloads and ${nonHpc.size} non-HPC workloads" }
+
+ val totalLoad = vms.sumOf { it.totalLoad }
+
+ logger.debug { "Total trace load: $totalLoad" }
+ var hpcCount = 0
+ var hpcLoad = 0.0
+ var nonHpcCount = 0
+ var nonHpcLoad = 0.0
+
+ val res = mutableListOf<VirtualMachine>()
+
+ if (sampleLoad) {
+ var currentLoad = 0.0
+ for (entry in hpcSequence) {
+ val entryLoad = entry.totalLoad
+ if ((currentLoad + entryLoad) / totalLoad > fraction) {
+ break
+ }
+
+ hpcLoad += entryLoad
+ hpcCount += 1
+ currentLoad += entryLoad
+ res += entry
+ }
+
+ for (entry in nonHpcSequence) {
+ val entryLoad = entry.totalLoad
+ if ((currentLoad + entryLoad) / totalLoad > 1) {
+ break
+ }
+
+ nonHpcLoad += entryLoad
+ nonHpcCount += 1
+ currentLoad += entryLoad
+ res += entry
+ }
+ } else {
+ hpcSequence
+ .take((fraction * vms.size).toInt())
+ .forEach { entry ->
+ hpcLoad += entry.totalLoad
+ hpcCount += 1
+ res.add(entry)
+ }
+
+ nonHpcSequence
+ .take(((1 - fraction) * vms.size).toInt())
+ .forEach { entry ->
+ nonHpcLoad += entry.totalLoad
+ nonHpcCount += 1
+ res.add(entry)
+ }
+ }
+
+ logger.debug { "HPC $hpcCount (load $hpcLoad) and non-HPC $nonHpcCount (load $nonHpcLoad)" }
+ logger.debug { "Total sampled load: ${hpcLoad + nonHpcLoad}" }
+ logger.info { "Sampled ${vms.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }
+
+ return res
+ }
+
+ /**
+ * Sample a random trace entry.
+ */
+ private fun sample(entry: VirtualMachine, i: Int): VirtualMachine {
+ val uid = UUID.nameUUIDFromBytes("${entry.uid}-$i".toByteArray())
+ return entry.copy(uid = uid)
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt
new file mode 100644
index 00000000..ef6de729
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt
@@ -0,0 +1,61 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.internal
+
+import mu.KotlinLogging
+import org.opendc.compute.workload.ComputeWorkload
+import org.opendc.compute.workload.ComputeWorkloadLoader
+import org.opendc.compute.workload.VirtualMachine
+import java.util.*
+
+/**
+ * A [ComputeWorkload] that is sampled based on total load.
+ */
+internal class LoadSampledComputeWorkload(val source: ComputeWorkload, val fraction: Double) : ComputeWorkload {
+ /**
+ * The logging instance of this class.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> {
+ val vms = source.resolve(loader, random)
+ val res = mutableListOf<VirtualMachine>()
+
+ val totalLoad = vms.sumOf { it.totalLoad }
+ var currentLoad = 0.0
+
+ for (entry in vms) {
+ val entryLoad = entry.totalLoad
+ if ((currentLoad + entryLoad) / totalLoad > fraction) {
+ break
+ }
+
+ currentLoad += entryLoad
+ res += entry
+ }
+
+ logger.info { "Sampled ${vms.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }
+
+ return res
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceEntry.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt
index bfa2d051..d657ff01 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceEntry.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt
@@ -20,23 +20,18 @@
* SOFTWARE.
*/
-package org.opendc.compute.workload.trace
+package org.opendc.compute.workload.internal
-import java.util.UUID
+import org.opendc.compute.workload.ComputeWorkload
+import org.opendc.compute.workload.ComputeWorkloadLoader
+import org.opendc.compute.workload.VirtualMachine
+import java.util.*
/**
- * An entry in a workload trace.
- *
- * @param uid The unique identifier of the entry.
- * @param name The name of the entry.
- * @param start The start time of the workload.
- * @param workload The workload of the entry.
- * @param meta The meta-data associated with the workload.
+ * A [ComputeWorkload] from a trace.
*/
-public data class TraceEntry<out T>(
- val uid: UUID,
- val name: String,
- val start: Long,
- val workload: T,
- val meta: Map<String, Any>
-)
+internal class TraceComputeWorkload(val name: String) : ComputeWorkload {
+ override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> {
+ return loader.get(name)
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/StreamingParquetTraceReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/StreamingParquetTraceReader.kt
deleted file mode 100644
index 36cd0a7d..00000000
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/StreamingParquetTraceReader.kt
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.workload.trace
-
-import mu.KotlinLogging
-import org.apache.avro.generic.GenericData
-import org.apache.parquet.avro.AvroParquetReader
-import org.apache.parquet.filter2.compat.FilterCompat
-import org.apache.parquet.filter2.predicate.FilterApi
-import org.apache.parquet.filter2.predicate.Statistics
-import org.apache.parquet.filter2.predicate.UserDefinedPredicate
-import org.apache.parquet.io.api.Binary
-import org.opendc.simulator.compute.workload.SimTraceWorkload
-import org.opendc.simulator.compute.workload.SimWorkload
-import org.opendc.trace.util.parquet.LocalInputFile
-import java.io.File
-import java.io.Serializable
-import java.util.SortedSet
-import java.util.TreeSet
-import java.util.UUID
-import java.util.concurrent.ArrayBlockingQueue
-import kotlin.concurrent.thread
-
-/**
- * A [TraceReader] for the internal VM workload trace format that streams workloads on the fly.
- *
- * @param traceFile The directory of the traces.
- * @param selectedVms The list of VMs to read from the trace.
- */
-public class StreamingParquetTraceReader(traceFile: File, selectedVms: List<String> = emptyList()) : TraceReader<SimWorkload> {
- private val logger = KotlinLogging.logger {}
-
- /**
- * The internal iterator to use for this reader.
- */
- private val iterator: Iterator<TraceEntry<SimWorkload>>
-
- /**
- * The intermediate buffer to store the read records in.
- */
- private val queue = ArrayBlockingQueue<Pair<String, SimTraceWorkload.Fragment>>(1024)
-
- /**
- * An optional filter for filtering the selected VMs
- */
- private val filter =
- if (selectedVms.isEmpty())
- null
- else
- FilterCompat.get(
- FilterApi.userDefined(
- FilterApi.binaryColumn("id"),
- SelectedVmFilter(
- TreeSet(selectedVms)
- )
- )
- )
-
- /**
- * A poisonous fragment.
- */
- private val poison = Pair("\u0000", SimTraceWorkload.Fragment(0L, 0, 0.0, 0))
-
- /**
- * The thread to read the records in.
- */
- private val readerThread = thread(start = true, name = "sc20-reader") {
- val reader = AvroParquetReader
- .builder<GenericData.Record>(LocalInputFile(File(traceFile, "trace.parquet")))
- .disableCompatibility()
- .withFilter(filter)
- .build()
-
- try {
- while (true) {
- val record = reader.read()
-
- if (record == null) {
- queue.put(poison)
- break
- }
-
- val id = record["id"].toString()
- val time = record["time"] as Long
- val duration = record["duration"] as Long
- val cores = record["cores"] as Int
- val cpuUsage = record["cpuUsage"] as Double
-
- val fragment = SimTraceWorkload.Fragment(
- time,
- duration,
- cpuUsage,
- cores
- )
-
- queue.put(id to fragment)
- }
- } catch (e: InterruptedException) {
- // Do not rethrow this
- } finally {
- reader.close()
- }
- }
-
- /**
- * Fill the buffers with the VMs
- */
- private fun pull(buffers: Map<String, List<MutableList<SimTraceWorkload.Fragment>>>) {
- if (!hasNext) {
- return
- }
-
- val fragments = mutableListOf<Pair<String, SimTraceWorkload.Fragment>>()
- queue.drainTo(fragments)
-
- for ((id, fragment) in fragments) {
- if (id == poison.first) {
- hasNext = false
- return
- }
- buffers[id]?.forEach { it.add(fragment) }
- }
- }
-
- /**
- * A flag to indicate whether the reader has more entries.
- */
- private var hasNext: Boolean = true
-
- /**
- * Initialize the reader.
- */
- init {
- val takenIds = mutableSetOf<UUID>()
- val entries = mutableMapOf<String, GenericData.Record>()
- val buffers = mutableMapOf<String, MutableList<MutableList<SimTraceWorkload.Fragment>>>()
-
- val metaReader = AvroParquetReader
- .builder<GenericData.Record>(LocalInputFile(File(traceFile, "meta.parquet")))
- .disableCompatibility()
- .withFilter(filter)
- .build()
-
- while (true) {
- val record = metaReader.read() ?: break
- val id = record["id"].toString()
- entries[id] = record
- }
-
- metaReader.close()
-
- val selection = selectedVms.ifEmpty { entries.keys }
-
- // Create the entry iterator
- iterator = selection.asSequence()
- .mapNotNull { entries[it] }
- .mapIndexed { index, record ->
- val id = record["id"].toString()
- val submissionTime = record["submissionTime"] as Long
- val endTime = record["endTime"] as Long
- val maxCores = record["maxCores"] as Int
- val requiredMemory = record["requiredMemory"] as Long
- val uid = UUID.nameUUIDFromBytes("$id-$index".toByteArray())
-
- assert(uid !in takenIds)
- takenIds += uid
-
- logger.info { "Processing VM $id" }
-
- val internalBuffer = mutableListOf<SimTraceWorkload.Fragment>()
- val externalBuffer = mutableListOf<SimTraceWorkload.Fragment>()
- buffers.getOrPut(id) { mutableListOf() }.add(externalBuffer)
- val fragments = sequence {
- var time = submissionTime
- repeat@ while (true) {
- if (externalBuffer.isEmpty()) {
- if (hasNext) {
- pull(buffers)
- continue
- } else {
- break
- }
- }
-
- internalBuffer.addAll(externalBuffer)
- externalBuffer.clear()
-
- for (fragment in internalBuffer) {
- yield(fragment)
-
- time += fragment.duration
- if (time >= endTime) {
- break@repeat
- }
- }
-
- internalBuffer.clear()
- }
-
- buffers.remove(id)
- }
- val workload = SimTraceWorkload(fragments)
- val meta = mapOf(
- "cores" to maxCores,
- "required-memory" to requiredMemory,
- "workload" to workload
- )
-
- TraceEntry(uid, id, submissionTime, workload, meta)
- }
- .sortedBy { it.start }
- .toList()
- .iterator()
- }
-
- override fun hasNext(): Boolean = iterator.hasNext()
-
- override fun next(): TraceEntry<SimWorkload> = iterator.next()
-
- override fun close() {
- readerThread.interrupt()
- }
-
- private class SelectedVmFilter(val selectedVms: SortedSet<String>) : UserDefinedPredicate<Binary>(), Serializable {
- override fun keep(value: Binary?): Boolean = value != null && selectedVms.contains(value.toStringUsingUTF8())
-
- override fun canDrop(statistics: Statistics<Binary>): Boolean {
- val min = statistics.min
- val max = statistics.max
-
- return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isEmpty()
- }
-
- override fun inverseCanDrop(statistics: Statistics<Binary>): Boolean {
- val min = statistics.min
- val max = statistics.max
-
- return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isNotEmpty()
- }
- }
-}