summaryrefslogtreecommitdiff
path: root/opendc-simulator/opendc-simulator-compute/src/main
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-10-06 13:12:18 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-10-08 17:11:52 +0200
commit94fa3d33d4ef77aca5e70cc7f91ae9dca71d25e7 (patch)
tree42ec6c1cc5fe027d6e0568986b9a81c03ab90838 /opendc-simulator/opendc-simulator-compute/src/main
parent3098eeb116a80ce12e6575e454d0448867478792 (diff)
perf(simulator): Optimize SimTraceWorkload
This change improves the performance of the SimTraceWorkload class by changing the way trace fragments are read and processed by the CPU consumers.
Diffstat (limited to 'opendc-simulator/opendc-simulator-compute/src/main')
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt233
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceFragment.kt38
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt83
3 files changed, 273 insertions, 81 deletions
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt
new file mode 100644
index 00000000..4f567b55
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt
@@ -0,0 +1,233 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.compute.workload
+
+import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.flow.FlowConnection
+import org.opendc.simulator.flow.FlowSource
+import kotlin.math.min
+
+/**
+ * A workload trace that describes the resource utilization over time in a collection of [SimTraceFragment]s.
+ *
+ * @param usageCol The column containing the CPU usage of each fragment (in MHz).
+ * @param timestampCol The column containing the starting timestamp for each fragment (in epoch millis).
+ * @param deadlineCol The column containing the ending timestamp for each fragment (in epoch millis).
+ * @param coresCol The column containing the utilized cores.
+ * @param size The number of fragments in the trace.
+ */
+public class SimTrace(
+ private val usageCol: DoubleArray,
+ private val timestampCol: LongArray,
+ private val deadlineCol: LongArray,
+ private val coresCol: IntArray,
+ private val size: Int,
+) {
+ init {
+ require(size >= 0) { "Invalid trace size" }
+ require(usageCol.size >= size) { "Invalid number of usage entries" }
+ require(timestampCol.size >= size) { "Invalid number of timestamp entries" }
+ require(deadlineCol.size >= size) { "Invalid number of deadline entries" }
+ require(coresCol.size >= size) { "Invalid number of core entries" }
+ }
+
+ public companion object {
+ /**
+ * Construct a [SimTrace] with the specified fragments.
+ */
+ public fun ofFragments(fragments: List<SimTraceFragment>): SimTrace {
+ val size = fragments.size
+ val usageCol = DoubleArray(size)
+ val timestampCol = LongArray(size)
+ val deadlineCol = LongArray(size)
+ val coresCol = IntArray(size)
+
+ for (i in fragments.indices) {
+ val fragment = fragments[i]
+ usageCol[i] = fragment.usage
+ timestampCol[i] = fragment.timestamp
+ deadlineCol[i] = fragment.timestamp + fragment.duration
+ coresCol[i] = fragment.cores
+ }
+
+ return SimTrace(usageCol, timestampCol, deadlineCol, coresCol, size)
+ }
+
+ /**
+ * Construct a [SimTrace] with the specified fragments.
+ */
+ @JvmStatic
+ public fun ofFragments(vararg fragments: SimTraceFragment): SimTrace {
+ val size = fragments.size
+ val usageCol = DoubleArray(size)
+ val timestampCol = LongArray(size)
+ val deadlineCol = LongArray(size)
+ val coresCol = IntArray(size)
+
+ for (i in fragments.indices) {
+ val fragment = fragments[i]
+ usageCol[i] = fragment.usage
+ timestampCol[i] = fragment.timestamp
+ deadlineCol[i] = fragment.timestamp + fragment.duration
+ coresCol[i] = fragment.cores
+ }
+
+ return SimTrace(usageCol, timestampCol, deadlineCol, coresCol, size)
+ }
+
+ /**
+ * Create a [SimTrace.Builder] instance.
+ */
+ @JvmStatic
+ public fun builder(): Builder = Builder()
+ }
+
+ /**
+ * Construct a new [FlowSource] for the specified [cpu].
+ */
+ public fun newSource(cpu: ProcessingUnit, offset: Long): FlowSource {
+ return CpuConsumer(cpu, offset, usageCol, timestampCol, deadlineCol, coresCol, size)
+ }
+
+ /**
+ * A builder class for a [SimTrace].
+ */
+ public class Builder internal constructor() {
+ /**
+ * The columns of the trace.
+ */
+ private var usageCol: DoubleArray = DoubleArray(16)
+ private var timestampCol: LongArray = LongArray(16)
+ private var deadlineCol: LongArray = LongArray(16)
+ private var coresCol: IntArray = IntArray(16)
+
+ /**
+ * The number of entries in the trace.
+ */
+ private var size = 0
+
+ /**
+ * Add the specified [SimTraceFragment] to the trace.
+ */
+ public fun add(fragment: SimTraceFragment) {
+ add(fragment.timestamp, fragment.timestamp + fragment.duration, fragment.usage, fragment.cores)
+ }
+
+ /**
+ * Add a fragment to the trace.
+ *
+ * @param timestamp Timestamp at which the fragment starts (in epoch millis).
+ * @param deadline Timestamp at which the fragment ends (in epoch millis).
+ * @param usage CPU usage of this fragment.
+ * @param cores Number of cores used.
+ */
+ public fun add(timestamp: Long, deadline: Long, usage: Double, cores: Int) {
+ val size = size
+
+ if (size == usageCol.size) {
+ grow()
+ }
+
+ timestampCol[size] = timestamp
+ deadlineCol[size] = deadline
+ usageCol[size] = usage
+ coresCol[size] = cores
+
+ this.size++
+ }
+
+ /**
+ * Helper function to grow the capacity of the column arrays.
+ */
+ private fun grow() {
+ val arraySize = usageCol.size
+ val newSize = arraySize * 2
+
+ usageCol = usageCol.copyOf(newSize)
+ timestampCol = timestampCol.copyOf(newSize)
+ deadlineCol = deadlineCol.copyOf(newSize)
+ coresCol = coresCol.copyOf(newSize)
+ }
+
+ /**
+ * Construct the immutable [SimTrace].
+ */
+ public fun build(): SimTrace {
+ return SimTrace(usageCol, timestampCol, deadlineCol, coresCol, size)
+ }
+ }
+
+ /**
+ * A CPU consumer for the trace workload.
+ */
+ private class CpuConsumer(
+ cpu: ProcessingUnit,
+ private val offset: Long,
+ private val usageCol: DoubleArray,
+ private val timestampCol: LongArray,
+ private val deadlineCol: LongArray,
+ private val coresCol: IntArray,
+ private val size: Int
+ ) : FlowSource {
+ private val id = cpu.id
+ private val coreCount = cpu.node.coreCount
+
+ /**
+ * The index in the trace.
+ */
+ private var _idx = 0
+
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ val size = size
+ val nowOffset = now - offset
+
+ var idx = _idx
+ val deadlines = deadlineCol
+ var deadline = deadlines[idx]
+
+ while (deadline <= nowOffset && ++idx < size) {
+ deadline = deadlines[idx]
+ }
+
+ if (idx >= size) {
+ conn.close()
+ return Long.MAX_VALUE
+ }
+
+ _idx = idx
+ val timestamp = timestampCol[idx]
+
+ // Fragment is in the future
+ if (timestamp > nowOffset) {
+ conn.push(0.0)
+ return timestamp - nowOffset
+ }
+
+ val cores = min(coreCount, coresCol[idx])
+ val usage = usageCol[idx]
+
+ conn.push(if (id < cores) usage / cores else 0.0)
+ return deadline - nowOffset
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceFragment.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceFragment.kt
new file mode 100644
index 00000000..5285847f
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceFragment.kt
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.compute.workload
+
+/**
+ * A fragment of the workload trace.
+ *
+ * @param timestamp The timestamp at which the fragment starts (in epoch millis).
+ * @param duration The duration of the fragment (in milliseconds).
+ * @param usage The CPU usage during the fragment (in MHz).
+ * @param cores The amount of cores utilized during the fragment.
+ */
+public data class SimTraceFragment(
+ @JvmField val timestamp: Long,
+ @JvmField val duration: Long,
+ @JvmField val usage: Double,
+ @JvmField val cores: Int
+)
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
index 49ae5933..53c98409 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
@@ -23,10 +23,6 @@
package org.opendc.simulator.compute.workload
import org.opendc.simulator.compute.SimMachineContext
-import org.opendc.simulator.compute.model.ProcessingUnit
-import org.opendc.simulator.flow.FlowConnection
-import org.opendc.simulator.flow.FlowSource
-import kotlin.math.min
/**
* A [SimWorkload] that replays a workload trace consisting of multiple fragments, each indicating the resource
@@ -35,89 +31,14 @@ import kotlin.math.min
* @param trace The trace of fragments to use.
* @param offset The offset for the timestamps.
*/
-public class SimTraceWorkload(public val trace: Sequence<Fragment>, private val offset: Long = 0L) : SimWorkload {
- private val iterator = trace.iterator()
- private var fragment: Fragment? = null
-
+public class SimTraceWorkload(private val trace: SimTrace, private val offset: Long = 0L) : SimWorkload {
override fun onStart(ctx: SimMachineContext) {
val lifecycle = SimWorkloadLifecycle(ctx)
for (cpu in ctx.cpus) {
- cpu.startConsumer(lifecycle.waitFor(Consumer(cpu.model)))
+ cpu.startConsumer(lifecycle.waitFor(trace.newSource(cpu.model, offset)))
}
}
override fun toString(): String = "SimTraceWorkload"
-
- /**
- * Obtain the fragment with a timestamp equal or greater than [now].
- */
- private fun pullFragment(now: Long): Fragment? {
- // Return the most recent fragment if its starting time + duration is later than `now`
- var fragment = fragment
- if (fragment != null && fragment.timestamp + offset + fragment.duration > now) {
- return fragment
- }
-
- while (iterator.hasNext()) {
- fragment = iterator.next()
- if (fragment.timestamp + offset + fragment.duration > now) {
- this.fragment = fragment
- return fragment
- }
- }
-
- this.fragment = null
- return null
- }
-
- private inner class Consumer(cpu: ProcessingUnit) : FlowSource {
- private val offset = this@SimTraceWorkload.offset
- private val id = cpu.id
- private val coreCount = cpu.node.coreCount
-
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
- val fragment = pullFragment(now)
-
- if (fragment == null) {
- conn.close()
- return Long.MAX_VALUE
- }
-
- val timestamp = fragment.timestamp + offset
-
- // Fragment is in the future
- if (timestamp > now) {
- conn.push(0.0)
- return timestamp - now
- }
-
- val cores = min(coreCount, fragment.cores)
- val usage = if (fragment.cores > 0)
- fragment.usage / cores
- else
- 0.0
- val deadline = timestamp + fragment.duration
- val duration = deadline - now
-
- conn.push(if (id < cores && usage > 0.0) usage else 0.0)
-
- return duration
- }
- }
-
- /**
- * A fragment of the workload.
- *
- * @param timestamp The timestamp at which the fragment starts.
- * @param duration The duration of the fragment.
- * @param usage The CPU usage during the fragment.
- * @param cores The amount of cores utilized during the fragment.
- */
- public data class Fragment(
- @JvmField val timestamp: Long,
- @JvmField val duration: Long,
- @JvmField val usage: Double,
- @JvmField val cores: Int
- )
}