summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt32
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt61
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt6
-rw-r--r--opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTraceWorkload.kt5
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt13
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt233
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceFragment.kt38
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt83
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt52
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt14
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt40
11 files changed, 405 insertions, 172 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index 26089b6d..a0ff9228 100644
--- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -39,6 +39,8 @@ import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.workload.SimTrace
+import org.opendc.simulator.compute.workload.SimTraceFragment
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.flow.FlowEngine
@@ -105,11 +107,11 @@ internal class SimHostTest {
emptyMap(),
mapOf(
"workload" to SimTraceWorkload(
- sequenceOf(
- SimTraceWorkload.Fragment(0, duration * 1000, 2 * 28.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 2 * 3500.0, 2),
- SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 0.0, 2),
- SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 2 * 183.0, 2)
+ SimTrace.ofFragments(
+ SimTraceFragment(0, duration * 1000, 2 * 28.0, 2),
+ SimTraceFragment(duration * 1000, duration * 1000, 2 * 3500.0, 2),
+ SimTraceFragment(duration * 2000, duration * 1000, 0.0, 2),
+ SimTraceFragment(duration * 3000, duration * 1000, 2 * 183.0, 2)
),
offset = 1
)
@@ -121,11 +123,11 @@ internal class SimHostTest {
emptyMap(),
mapOf(
"workload" to SimTraceWorkload(
- sequenceOf(
- SimTraceWorkload.Fragment(0, duration * 1000, 2 * 28.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 2 * 3100.0, 2),
- SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 0.0, 2),
- SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 2 * 73.0, 2)
+ SimTrace.ofFragments(
+ SimTraceFragment(0, duration * 1000, 2 * 28.0, 2),
+ SimTraceFragment(duration * 1000, duration * 1000, 2 * 3100.0, 2),
+ SimTraceFragment(duration * 2000, duration * 1000, 0.0, 2),
+ SimTraceFragment(duration * 3000, duration * 1000, 2 * 73.0, 2)
),
offset = 1
)
@@ -217,11 +219,11 @@ internal class SimHostTest {
emptyMap(),
mapOf(
"workload" to SimTraceWorkload(
- sequenceOf(
- SimTraceWorkload.Fragment(0, duration * 1000, 2 * 28.0, 2),
- SimTraceWorkload.Fragment(duration * 1000L, duration * 1000, 2 * 3500.0, 2),
- SimTraceWorkload.Fragment(duration * 2000L, duration * 1000, 0.0, 2),
- SimTraceWorkload.Fragment(duration * 3000L, duration * 1000, 2 * 183.0, 2)
+ SimTrace.ofFragments(
+ SimTraceFragment(0, duration * 1000, 2 * 28.0, 2),
+ SimTraceFragment(duration * 1000L, duration * 1000, 2 * 3500.0, 2),
+ SimTraceFragment(duration * 2000L, duration * 1000, 0.0, 2),
+ SimTraceFragment(duration * 3000L, duration * 1000, 2 * 183.0, 2)
),
offset = 1
)
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
index 7c579e39..1a6624f7 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
@@ -23,13 +23,14 @@
package org.opendc.compute.workload
import mu.KotlinLogging
-import org.opendc.simulator.compute.workload.SimTraceWorkload
+import org.opendc.simulator.compute.workload.SimTrace
import org.opendc.trace.*
import java.io.File
import java.time.Duration
import java.time.Instant
import java.util.*
import java.util.concurrent.ConcurrentHashMap
+import kotlin.math.max
import kotlin.math.roundToLong
/**
@@ -51,7 +52,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
/**
* Read the fragments into memory.
*/
- private fun parseFragments(trace: Trace): Map<String, List<SimTraceWorkload.Fragment>> {
+ private fun parseFragments(trace: Trace): Map<String, Builder> {
val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader()
val idCol = reader.resolve(RESOURCE_ID)
@@ -60,7 +61,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
val coresCol = reader.resolve(RESOURCE_CPU_COUNT)
val usageCol = reader.resolve(RESOURCE_STATE_CPU_USAGE)
- val fragments = mutableMapOf<String, MutableList<SimTraceWorkload.Fragment>>()
+ val fragments = mutableMapOf<String, Builder>()
return try {
while (reader.nextRow()) {
@@ -70,14 +71,10 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
val cores = reader.getInt(coresCol)
val cpuUsage = reader.getDouble(usageCol)
- val fragment = SimTraceWorkload.Fragment(
- time.toEpochMilli(),
- duration.toMillis(),
- cpuUsage,
- cores
- )
-
- fragments.computeIfAbsent(id) { mutableListOf() }.add(fragment)
+ val timeMs = time.toEpochMilli()
+ val deadlineMs = timeMs + duration.toMillis()
+ val builder = fragments.computeIfAbsent(id) { Builder() }
+ builder.add(timeMs, deadlineMs, cpuUsage, cores)
}
fragments
@@ -89,7 +86,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
/**
* Read the metadata into a workload.
*/
- private fun parseMeta(trace: Trace, fragments: Map<String, List<SimTraceWorkload.Fragment>>): List<VirtualMachine> {
+ private fun parseMeta(trace: Trace, fragments: Map<String, Builder>): List<VirtualMachine> {
val reader = checkNotNull(trace.getTable(TABLE_RESOURCES)).newReader()
val idCol = reader.resolve(RESOURCE_ID)
@@ -115,8 +112,8 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
val requiredMemory = reader.getDouble(memCol) / 1000.0 // Convert from KB to MB
val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray())
- val vmFragments = fragments.getValue(id).asSequence()
- val totalLoad = vmFragments.sumOf { (it.usage * it.duration) / 1000.0 } // avg MHz * duration = MFLOPs
+ val builder = fragments.getValue(id)
+ val totalLoad = builder.totalLoad
entries.add(
VirtualMachine(
@@ -127,7 +124,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
totalLoad,
submissionTime,
endTime,
- vmFragments
+ builder.build()
)
)
}
@@ -165,4 +162,38 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
public fun reset() {
cache.clear()
}
+
+ /**
+ * A builder for a VM trace.
+ */
+ private class Builder {
+ /**
+ * The total load of the trace.
+ */
+ @JvmField var totalLoad: Double = 0.0
+
+ /**
+ * The internal builder for the trace.
+ */
+ private val builder = SimTrace.builder()
+
+ /**
+ * Add a fragment to the trace.
+ *
+ * @param timestamp Timestamp at which the fragment starts (in epoch millis).
+ * @param deadline Timestamp at which the fragment ends (in epoch millis).
+ * @param usage CPU usage of this fragment.
+ * @param cores Number of cores used.
+ */
+ fun add(timestamp: Long, deadline: Long, usage: Double, cores: Int) {
+ val duration = max(0, deadline - timestamp)
+ totalLoad += (usage * duration) / 1000.0 // avg MHz * duration = MFLOPs
+ builder.add(timestamp, deadline, usage, cores)
+ }
+
+ /**
+ * Build the trace.
+ */
+ fun build(): SimTrace = builder.build()
+ }
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt
index 40484b68..5dd239f6 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt
@@ -22,7 +22,7 @@
package org.opendc.compute.workload
-import org.opendc.simulator.compute.workload.SimTraceWorkload
+import org.opendc.simulator.compute.workload.SimTrace
import java.time.Instant
import java.util.*
@@ -35,7 +35,7 @@ import java.util.*
* @param memCapacity The provisioned memory for the VM.
* @param startTime The start time of the VM.
* @param stopTime The stop time of the VM.
- * @param trace The trace fragments that belong to this VM.
+ * @param trace The trace that belong to this VM.
*/
public data class VirtualMachine(
val uid: UUID,
@@ -45,5 +45,5 @@ public data class VirtualMachine(
val totalLoad: Double,
val startTime: Instant,
val stopTime: Instant,
- val trace: Sequence<SimTraceWorkload.Fragment>,
+ val trace: SimTrace,
)
diff --git a/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTraceWorkload.kt b/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTraceWorkload.kt
index a119a219..bbe130e3 100644
--- a/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTraceWorkload.kt
+++ b/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTraceWorkload.kt
@@ -23,12 +23,15 @@
package org.opendc.experiments.serverless.trace
import org.opendc.faas.simulator.workload.SimFaaSWorkload
+import org.opendc.simulator.compute.workload.SimTrace
+import org.opendc.simulator.compute.workload.SimTraceFragment
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.compute.workload.SimWorkload
/**
* A [SimFaaSWorkload] for a [FunctionTrace].
*/
-public class FunctionTraceWorkload(trace: FunctionTrace) : SimFaaSWorkload, SimWorkload by SimTraceWorkload(trace.samples.asSequence().map { SimTraceWorkload.Fragment(it.timestamp, it.duration, it.cpuUsage, 1) }) {
+class FunctionTraceWorkload(trace: FunctionTrace) :
+ SimFaaSWorkload, SimWorkload by SimTraceWorkload(SimTrace.ofFragments(trace.samples.map { SimTraceFragment(it.timestamp, it.duration, it.cpuUsage, 1) })) {
override suspend fun invoke() {}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
index b8e0227a..cb52d24f 100644
--- a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
@@ -22,7 +22,6 @@
package org.opendc.simulator.compute
-import javafx.application.Application.launch
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
@@ -34,6 +33,7 @@ import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.ConstantPowerModel
import org.opendc.simulator.compute.power.SimplePowerDriver
+import org.opendc.simulator.compute.workload.SimTrace
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.flow.FlowEngine
@@ -48,7 +48,7 @@ import java.util.concurrent.TimeUnit
@OptIn(ExperimentalCoroutinesApi::class)
class SimMachineBenchmarks {
private lateinit var machineModel: MachineModel
- private lateinit var trace: Sequence<SimTraceWorkload.Fragment>
+ private lateinit var trace: SimTrace
@Setup
fun setUp() {
@@ -60,8 +60,13 @@ class SimMachineBenchmarks {
)
val random = ThreadLocalRandom.current()
- val entries = List(10000) { SimTraceWorkload.Fragment(it * 1000L, 1000, random.nextDouble(0.0, 4500.0), 1) }
- trace = entries.asSequence()
+ val builder = SimTrace.builder()
+ repeat(10000) {
+ val timestamp = it.toLong()
+ val deadline = timestamp + 1000
+ builder.add(timestamp, deadline, random.nextDouble(0.0, 4500.0), 1)
+ }
+ trace = builder.build()
}
@Benchmark
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt
new file mode 100644
index 00000000..4f567b55
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt
@@ -0,0 +1,233 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.compute.workload
+
+import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.flow.FlowConnection
+import org.opendc.simulator.flow.FlowSource
+import kotlin.math.min
+
+/**
+ * A workload trace that describes the resource utilization over time in a collection of [SimTraceFragment]s.
+ *
+ * @param usageCol The column containing the CPU usage of each fragment (in MHz).
+ * @param timestampCol The column containing the starting timestamp for each fragment (in epoch millis).
+ * @param deadlineCol The column containing the ending timestamp for each fragment (in epoch millis).
+ * @param coresCol The column containing the utilized cores.
+ * @param size The number of fragments in the trace.
+ */
+public class SimTrace(
+ private val usageCol: DoubleArray,
+ private val timestampCol: LongArray,
+ private val deadlineCol: LongArray,
+ private val coresCol: IntArray,
+ private val size: Int,
+) {
+ init {
+ require(size >= 0) { "Invalid trace size" }
+ require(usageCol.size >= size) { "Invalid number of usage entries" }
+ require(timestampCol.size >= size) { "Invalid number of timestamp entries" }
+ require(deadlineCol.size >= size) { "Invalid number of deadline entries" }
+ require(coresCol.size >= size) { "Invalid number of core entries" }
+ }
+
+ public companion object {
+ /**
+ * Construct a [SimTrace] with the specified fragments.
+ */
+ public fun ofFragments(fragments: List<SimTraceFragment>): SimTrace {
+ val size = fragments.size
+ val usageCol = DoubleArray(size)
+ val timestampCol = LongArray(size)
+ val deadlineCol = LongArray(size)
+ val coresCol = IntArray(size)
+
+ for (i in fragments.indices) {
+ val fragment = fragments[i]
+ usageCol[i] = fragment.usage
+ timestampCol[i] = fragment.timestamp
+ deadlineCol[i] = fragment.timestamp + fragment.duration
+ coresCol[i] = fragment.cores
+ }
+
+ return SimTrace(usageCol, timestampCol, deadlineCol, coresCol, size)
+ }
+
+ /**
+ * Construct a [SimTrace] with the specified fragments.
+ */
+ @JvmStatic
+ public fun ofFragments(vararg fragments: SimTraceFragment): SimTrace {
+ val size = fragments.size
+ val usageCol = DoubleArray(size)
+ val timestampCol = LongArray(size)
+ val deadlineCol = LongArray(size)
+ val coresCol = IntArray(size)
+
+ for (i in fragments.indices) {
+ val fragment = fragments[i]
+ usageCol[i] = fragment.usage
+ timestampCol[i] = fragment.timestamp
+ deadlineCol[i] = fragment.timestamp + fragment.duration
+ coresCol[i] = fragment.cores
+ }
+
+ return SimTrace(usageCol, timestampCol, deadlineCol, coresCol, size)
+ }
+
+ /**
+ * Create a [SimTrace.Builder] instance.
+ */
+ @JvmStatic
+ public fun builder(): Builder = Builder()
+ }
+
+ /**
+ * Construct a new [FlowSource] for the specified [cpu].
+ */
+ public fun newSource(cpu: ProcessingUnit, offset: Long): FlowSource {
+ return CpuConsumer(cpu, offset, usageCol, timestampCol, deadlineCol, coresCol, size)
+ }
+
+ /**
+ * A builder class for a [SimTrace].
+ */
+ public class Builder internal constructor() {
+ /**
+ * The columns of the trace.
+ */
+ private var usageCol: DoubleArray = DoubleArray(16)
+ private var timestampCol: LongArray = LongArray(16)
+ private var deadlineCol: LongArray = LongArray(16)
+ private var coresCol: IntArray = IntArray(16)
+
+ /**
+ * The number of entries in the trace.
+ */
+ private var size = 0
+
+ /**
+ * Add the specified [SimTraceFragment] to the trace.
+ */
+ public fun add(fragment: SimTraceFragment) {
+ add(fragment.timestamp, fragment.timestamp + fragment.duration, fragment.usage, fragment.cores)
+ }
+
+ /**
+ * Add a fragment to the trace.
+ *
+ * @param timestamp Timestamp at which the fragment starts (in epoch millis).
+ * @param deadline Timestamp at which the fragment ends (in epoch millis).
+ * @param usage CPU usage of this fragment.
+ * @param cores Number of cores used.
+ */
+ public fun add(timestamp: Long, deadline: Long, usage: Double, cores: Int) {
+ val size = size
+
+ if (size == usageCol.size) {
+ grow()
+ }
+
+ timestampCol[size] = timestamp
+ deadlineCol[size] = deadline
+ usageCol[size] = usage
+ coresCol[size] = cores
+
+ this.size++
+ }
+
+ /**
+ * Helper function to grow the capacity of the column arrays.
+ */
+ private fun grow() {
+ val arraySize = usageCol.size
+ val newSize = arraySize * 2
+
+ usageCol = usageCol.copyOf(newSize)
+ timestampCol = timestampCol.copyOf(newSize)
+ deadlineCol = deadlineCol.copyOf(newSize)
+ coresCol = coresCol.copyOf(newSize)
+ }
+
+ /**
+ * Construct the immutable [SimTrace].
+ */
+ public fun build(): SimTrace {
+ return SimTrace(usageCol, timestampCol, deadlineCol, coresCol, size)
+ }
+ }
+
+ /**
+ * A CPU consumer for the trace workload.
+ */
+ private class CpuConsumer(
+ cpu: ProcessingUnit,
+ private val offset: Long,
+ private val usageCol: DoubleArray,
+ private val timestampCol: LongArray,
+ private val deadlineCol: LongArray,
+ private val coresCol: IntArray,
+ private val size: Int
+ ) : FlowSource {
+ private val id = cpu.id
+ private val coreCount = cpu.node.coreCount
+
+ /**
+ * The index in the trace.
+ */
+ private var _idx = 0
+
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ val size = size
+ val nowOffset = now - offset
+
+ var idx = _idx
+ val deadlines = deadlineCol
+ var deadline = deadlines[idx]
+
+ while (deadline <= nowOffset && ++idx < size) {
+ deadline = deadlines[idx]
+ }
+
+ if (idx >= size) {
+ conn.close()
+ return Long.MAX_VALUE
+ }
+
+ _idx = idx
+ val timestamp = timestampCol[idx]
+
+ // Fragment is in the future
+ if (timestamp > nowOffset) {
+ conn.push(0.0)
+ return timestamp - nowOffset
+ }
+
+ val cores = min(coreCount, coresCol[idx])
+ val usage = usageCol[idx]
+
+ conn.push(if (id < cores) usage / cores else 0.0)
+ return deadline - nowOffset
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceFragment.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceFragment.kt
new file mode 100644
index 00000000..5285847f
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceFragment.kt
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.compute.workload
+
+/**
+ * A fragment of the workload trace.
+ *
+ * @param timestamp The timestamp at which the fragment starts (in epoch millis).
+ * @param duration The duration of the fragment (in milliseconds).
+ * @param usage The CPU usage during the fragment (in MHz).
+ * @param cores The amount of cores utilized during the fragment.
+ */
+public data class SimTraceFragment(
+ @JvmField val timestamp: Long,
+ @JvmField val duration: Long,
+ @JvmField val usage: Double,
+ @JvmField val cores: Int
+)
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
index 49ae5933..53c98409 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
@@ -23,10 +23,6 @@
package org.opendc.simulator.compute.workload
import org.opendc.simulator.compute.SimMachineContext
-import org.opendc.simulator.compute.model.ProcessingUnit
-import org.opendc.simulator.flow.FlowConnection
-import org.opendc.simulator.flow.FlowSource
-import kotlin.math.min
/**
* A [SimWorkload] that replays a workload trace consisting of multiple fragments, each indicating the resource
@@ -35,89 +31,14 @@ import kotlin.math.min
* @param trace The trace of fragments to use.
* @param offset The offset for the timestamps.
*/
-public class SimTraceWorkload(public val trace: Sequence<Fragment>, private val offset: Long = 0L) : SimWorkload {
- private val iterator = trace.iterator()
- private var fragment: Fragment? = null
-
+public class SimTraceWorkload(private val trace: SimTrace, private val offset: Long = 0L) : SimWorkload {
override fun onStart(ctx: SimMachineContext) {
val lifecycle = SimWorkloadLifecycle(ctx)
for (cpu in ctx.cpus) {
- cpu.startConsumer(lifecycle.waitFor(Consumer(cpu.model)))
+ cpu.startConsumer(lifecycle.waitFor(trace.newSource(cpu.model, offset)))
}
}
override fun toString(): String = "SimTraceWorkload"
-
- /**
- * Obtain the fragment with a timestamp equal or greater than [now].
- */
- private fun pullFragment(now: Long): Fragment? {
- // Return the most recent fragment if its starting time + duration is later than `now`
- var fragment = fragment
- if (fragment != null && fragment.timestamp + offset + fragment.duration > now) {
- return fragment
- }
-
- while (iterator.hasNext()) {
- fragment = iterator.next()
- if (fragment.timestamp + offset + fragment.duration > now) {
- this.fragment = fragment
- return fragment
- }
- }
-
- this.fragment = null
- return null
- }
-
- private inner class Consumer(cpu: ProcessingUnit) : FlowSource {
- private val offset = this@SimTraceWorkload.offset
- private val id = cpu.id
- private val coreCount = cpu.node.coreCount
-
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
- val fragment = pullFragment(now)
-
- if (fragment == null) {
- conn.close()
- return Long.MAX_VALUE
- }
-
- val timestamp = fragment.timestamp + offset
-
- // Fragment is in the future
- if (timestamp > now) {
- conn.push(0.0)
- return timestamp - now
- }
-
- val cores = min(coreCount, fragment.cores)
- val usage = if (fragment.cores > 0)
- fragment.usage / cores
- else
- 0.0
- val deadline = timestamp + fragment.duration
- val duration = deadline - now
-
- conn.push(if (id < cores && usage > 0.0) usage else 0.0)
-
- return duration
- }
- }
-
- /**
- * A fragment of the workload.
- *
- * @param timestamp The timestamp at which the fragment starts.
- * @param duration The duration of the fragment.
- * @param usage The CPU usage during the fragment.
- * @param cores The amount of cores utilized during the fragment.
- */
- public data class Fragment(
- @JvmField val timestamp: Long,
- @JvmField val duration: Long,
- @JvmField val usage: Double,
- @JvmField val cores: Int
- )
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt
index 9db2e6ec..6f32cf46 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt
@@ -38,6 +38,8 @@ import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.ConstantPowerModel
import org.opendc.simulator.compute.power.SimplePowerDriver
+import org.opendc.simulator.compute.workload.SimTrace
+import org.opendc.simulator.compute.workload.SimTraceFragment
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.flow.FlowEngine
@@ -66,11 +68,11 @@ internal class SimFairShareHypervisorTest {
val duration = 5 * 60L
val workloadA =
SimTraceWorkload(
- sequenceOf(
- SimTraceWorkload.Fragment(0, duration * 1000, 28.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 3500.0, 1),
- SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 0.0, 1),
- SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 183.0, 1)
+ SimTrace.ofFragments(
+ SimTraceFragment(0, duration * 1000, 28.0, 1),
+ SimTraceFragment(duration * 1000, duration * 1000, 3500.0, 1),
+ SimTraceFragment(duration * 2000, duration * 1000, 0.0, 1),
+ SimTraceFragment(duration * 3000, duration * 1000, 183.0, 1)
),
)
@@ -106,20 +108,20 @@ internal class SimFairShareHypervisorTest {
val duration = 5 * 60L
val workloadA =
SimTraceWorkload(
- sequenceOf(
- SimTraceWorkload.Fragment(0, duration * 1000, 28.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 3500.0, 1),
- SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 0.0, 1),
- SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 183.0, 1)
+ SimTrace.ofFragments(
+ SimTraceFragment(0, duration * 1000, 28.0, 1),
+ SimTraceFragment(duration * 1000, duration * 1000, 3500.0, 1),
+ SimTraceFragment(duration * 2000, duration * 1000, 0.0, 1),
+ SimTraceFragment(duration * 3000, duration * 1000, 183.0, 1)
),
)
val workloadB =
SimTraceWorkload(
- sequenceOf(
- SimTraceWorkload.Fragment(0, duration * 1000, 28.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 3100.0, 1),
- SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 0.0, 1),
- SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 73.0, 1)
+ SimTrace.ofFragments(
+ SimTraceFragment(0, duration * 1000, 28.0, 1),
+ SimTraceFragment(duration * 1000, duration * 1000, 3100.0, 1),
+ SimTraceFragment(duration * 2000, duration * 1000, 0.0, 1),
+ SimTraceFragment(duration * 3000, duration * 1000, 73.0, 1)
)
)
@@ -201,20 +203,20 @@ internal class SimFairShareHypervisorTest {
val duration = 5 * 60L
val workloadA =
SimTraceWorkload(
- sequenceOf(
- SimTraceWorkload.Fragment(0, duration * 1000, 0.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 28.0, 1),
- SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 3500.0, 1),
- SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 183.0, 1)
+ SimTrace.ofFragments(
+ SimTraceFragment(0, duration * 1000, 0.0, 1),
+ SimTraceFragment(duration * 1000, duration * 1000, 28.0, 1),
+ SimTraceFragment(duration * 2000, duration * 1000, 3500.0, 1),
+ SimTraceFragment(duration * 3000, duration * 1000, 183.0, 1)
),
)
val workloadB =
SimTraceWorkload(
- sequenceOf(
- SimTraceWorkload.Fragment(0, duration * 1000, 0.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 28.0, 1),
- SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 3100.0, 1),
- SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 73.0, 1)
+ SimTrace.ofFragments(
+ SimTraceFragment(0, duration * 1000, 0.0, 1),
+ SimTraceFragment(duration * 1000, duration * 1000, 28.0, 1),
+ SimTraceFragment(duration * 2000, duration * 1000, 3100.0, 1),
+ SimTraceFragment(duration * 3000, duration * 1000, 73.0, 1)
)
)
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt
index b05ffd22..02d308ff 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt
@@ -36,9 +36,7 @@ import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.ConstantPowerModel
import org.opendc.simulator.compute.power.SimplePowerDriver
-import org.opendc.simulator.compute.workload.SimFlopsWorkload
-import org.opendc.simulator.compute.workload.SimRuntimeWorkload
-import org.opendc.simulator.compute.workload.SimTraceWorkload
+import org.opendc.simulator.compute.workload.*
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.flow.FlowEngine
@@ -66,11 +64,11 @@ internal class SimSpaceSharedHypervisorTest {
val duration = 5 * 60L
val workloadA =
SimTraceWorkload(
- sequenceOf(
- SimTraceWorkload.Fragment(0, duration * 1000, 28.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 3500.0, 1),
- SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 0.0, 1),
- SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 183.0, 1)
+ SimTrace.ofFragments(
+ SimTraceFragment(0, duration * 1000, 28.0, 1),
+ SimTraceFragment(duration * 1000, duration * 1000, 3500.0, 1),
+ SimTraceFragment(duration * 2000, duration * 1000, 0.0, 1),
+ SimTraceFragment(duration * 3000, duration * 1000, 183.0, 1)
),
)
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt
index cdbffe4b..574860e8 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt
@@ -58,11 +58,11 @@ class SimTraceWorkloadTest {
)
val workload = SimTraceWorkload(
- sequenceOf(
- SimTraceWorkload.Fragment(0, 1000, 2 * 28.0, 2),
- SimTraceWorkload.Fragment(1000, 1000, 2 * 3100.0, 2),
- SimTraceWorkload.Fragment(2000, 1000, 0.0, 2),
- SimTraceWorkload.Fragment(3000, 1000, 2 * 73.0, 2)
+ SimTrace.ofFragments(
+ SimTraceFragment(0, 1000, 2 * 28.0, 2),
+ SimTraceFragment(1000, 1000, 2 * 3100.0, 2),
+ SimTraceFragment(2000, 1000, 0.0, 2),
+ SimTraceFragment(3000, 1000, 2 * 73.0, 2)
),
offset = 0
)
@@ -85,11 +85,11 @@ class SimTraceWorkloadTest {
)
val workload = SimTraceWorkload(
- sequenceOf(
- SimTraceWorkload.Fragment(0, 1000, 2 * 28.0, 2),
- SimTraceWorkload.Fragment(1000, 1000, 2 * 3100.0, 2),
- SimTraceWorkload.Fragment(2000, 1000, 0.0, 2),
- SimTraceWorkload.Fragment(3000, 1000, 2 * 73.0, 2)
+ SimTrace.ofFragments(
+ SimTraceFragment(0, 1000, 2 * 28.0, 2),
+ SimTraceFragment(1000, 1000, 2 * 3100.0, 2),
+ SimTraceFragment(2000, 1000, 0.0, 2),
+ SimTraceFragment(3000, 1000, 2 * 73.0, 2)
),
offset = 1000
)
@@ -112,11 +112,11 @@ class SimTraceWorkloadTest {
)
val workload = SimTraceWorkload(
- sequenceOf(
- SimTraceWorkload.Fragment(0, 1000, 2 * 28.0, 2),
- SimTraceWorkload.Fragment(1000, 1000, 2 * 3100.0, 2),
- SimTraceWorkload.Fragment(2000, 1000, 0.0, 2),
- SimTraceWorkload.Fragment(3000, 1000, 2 * 73.0, 2)
+ SimTrace.ofFragments(
+ SimTraceFragment(0, 1000, 2 * 28.0, 2),
+ SimTraceFragment(1000, 1000, 2 * 3100.0, 2),
+ SimTraceFragment(2000, 1000, 0.0, 2),
+ SimTraceFragment(3000, 1000, 2 * 73.0, 2)
),
offset = 0
)
@@ -140,11 +140,11 @@ class SimTraceWorkloadTest {
)
val workload = SimTraceWorkload(
- sequenceOf(
- SimTraceWorkload.Fragment(0, 1000, 2 * 28.0, 2),
- SimTraceWorkload.Fragment(1000, 1000, 2 * 3100.0, 2),
- SimTraceWorkload.Fragment(2000, 1000, 0.0, 0),
- SimTraceWorkload.Fragment(3000, 1000, 2 * 73.0, 2)
+ SimTrace.ofFragments(
+ SimTraceFragment(0, 1000, 2 * 28.0, 2),
+ SimTraceFragment(1000, 1000, 2 * 3100.0, 2),
+ SimTraceFragment(2000, 1000, 0.0, 0),
+ SimTraceFragment(3000, 1000, 2 * 73.0, 2)
),
offset = 0
)