summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGeorgios Andreadis <g.andreadis@student.tudelft.nl>2020-04-20 15:21:35 +0200
committerGeorgios Andreadis <g.andreadis@student.tudelft.nl>2020-04-20 15:21:35 +0200
commit6981d5581e2ce5c6df42dfbf133c350bd9c35a0f (patch)
tree74e8993babb28dd56950f1da69eda2d97735a0e8
parent60372f0022d423efd5267ef4008d9afcbe870911 (diff)
parent3e056406616860c77168d827f1ca9d8d3c79c08e (diff)
Merge branch 'bug/experiment-issues' into '2.x'
Address issues during experiments See merge request opendc/opendc-simulator!61
-rw-r--r--.gitignore4
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt2
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt129
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt6
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt64
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt44
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt7
-rw-r--r--opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/PerformanceInterferenceModel.kt83
-rw-r--r--opendc/opendc-experiments-sc20/build.gradle.kts6
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt101
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt290
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt65
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt193
-rw-r--r--opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt5
-rw-r--r--opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/PerformanceInterferenceModelReader.kt2
-rw-r--r--opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt7
-rw-r--r--opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt86
-rw-r--r--opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt13
18 files changed, 894 insertions, 213 deletions
diff --git a/.gitignore b/.gitignore
index 2cc67507..4ec6f778 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,5 +1,9 @@
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*
+
+# data
+data/
+
### JetBrains
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and Webstorm
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
index b0688f99..36bbfa45 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
@@ -14,7 +14,7 @@ class VmImage(
public override val uid: UUID,
public override val name: String,
public override val tags: TagContainer,
- public val flopsHistory: List<FlopsHistoryFragment>,
+ public val flopsHistory: Sequence<FlopsHistoryFragment>,
public val maxCores: Int,
public val requiredMemory: Long
) : Image {
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt
new file mode 100644
index 00000000..45024a49
--- /dev/null
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt
@@ -0,0 +1,129 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.compute.core.workload
+
+import com.atlarge.opendc.compute.core.Server
+import java.util.SortedSet
+import java.util.TreeSet
+import kotlin.random.Random
+
+/**
+ * Meta-data key for the [PerformanceInterferenceModel] of an image.
+ */
+const val IMAGE_PERF_INTERFERENCE_MODEL = "image:performance-interference"
+
+/**
+ * Performance Interference Model describing the variability incurred by different sets of workloads if colocated.
+ *
+ * @param items The [PerformanceInterferenceModelItem]s that make up this model.
+ */
+class PerformanceInterferenceModel(
+ items: Set<PerformanceInterferenceModelItem>,
+ val random: Random = Random(0)
+) {
+ private var intersectingItems: List<PerformanceInterferenceModelItem> = emptyList()
+ private val comparator = Comparator<PerformanceInterferenceModelItem> { lhs, rhs ->
+ var cmp = lhs.performanceScore.compareTo(rhs.performanceScore)
+ if (cmp != 0) {
+ return@Comparator cmp
+ }
+
+ cmp = lhs.minServerLoad.compareTo(rhs.minServerLoad)
+ if (cmp != 0) {
+ return@Comparator cmp
+ }
+
+ lhs.hashCode().compareTo(rhs.hashCode())
+ }
+ val items = TreeSet(comparator)
+ private val colocatedWorkloads = TreeSet<String>()
+
+ init {
+ this.items.addAll(items)
+ }
+
+ fun vmStarted(server: Server) {
+ colocatedWorkloads.add(server.image.name)
+ intersectingItems = items.filter { item -> doesMatch(item) }
+ }
+
+ fun vmStopped(server: Server) {
+ colocatedWorkloads.remove(server.image.name)
+ intersectingItems = items.filter { item -> doesMatch(item) }
+ }
+
+ private fun doesMatch(item: PerformanceInterferenceModelItem): Boolean {
+ var count = 0
+ for (name in item.workloadNames.subSet(colocatedWorkloads.first(), colocatedWorkloads.last() + "\u0000")) {
+ if (name in colocatedWorkloads)
+ count++
+ if (count > 1)
+ return true
+ }
+ return false
+ }
+
+ fun apply(currentServerLoad: Double): Double {
+ if (intersectingItems.isEmpty()) {
+ return 1.0
+ }
+ val score = intersectingItems
+ .firstOrNull { it.minServerLoad <= currentServerLoad }
+
+ // Apply performance penalty to (on average) only one of the VMs
+ return if (score != null && random.nextInt(score.workloadNames.size) == 0) {
+ score.performanceScore
+ } else {
+ 1.0
+ }
+ }
+}
+
+/**
+ * Model describing how a specific set of workloads causes performance variability for each workload.
+ *
+ * @param workloadNames The names of the workloads that together cause performance variability for each workload in the set.
+ * @param minServerLoad The minimum total server load at which this interference is activated and noticeable.
+ * @param performanceScore The performance score that should be applied to each workload's performance. 1 means no
+ * influence, <1 means that performance degrades, and >1 means that performance improves.
+ */
+data class PerformanceInterferenceModelItem(
+ val workloadNames: SortedSet<String>,
+ val minServerLoad: Double,
+ val performanceScore: Double
+) {
+ override fun equals(other: Any?): Boolean {
+ if (this === other) return true
+ if (javaClass != other?.javaClass) return false
+
+ other as PerformanceInterferenceModelItem
+
+ if (workloadNames != other.workloadNames) return false
+
+ return true
+ }
+
+ override fun hashCode(): Int = workloadNames.hashCode()
+}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
index 844938db..08f04760 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
@@ -120,7 +120,7 @@ public class SimpleBareMetalDriver(
/**
* The internal random instance.
*/
- private val random = Random(0)
+ private val random = Random(uid.leastSignificantBits xor uid.mostSignificantBits)
override suspend fun init(): Node = withContext(domain.coroutineContext) {
nodeState.value
@@ -134,7 +134,7 @@ public class SimpleBareMetalDriver(
val events = EventFlow<ServerEvent>()
val server = Server(
- UUID(node.uid.leastSignificantBits xor node.uid.mostSignificantBits, random.nextLong()),
+ UUID(random.nextLong(), random.nextLong()),
node.name,
emptyMap(),
flavor,
@@ -151,7 +151,7 @@ public class SimpleBareMetalDriver(
override suspend fun stop(): Node = withContext(domain.coroutineContext) {
val node = nodeState.value
- if (node.state == NodeState.SHUTOFF || node.state == NodeState.ERROR) {
+ if (node.state == NodeState.SHUTOFF) {
return@withContext node
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
index 73f9dd5c..53fa463b 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
@@ -40,31 +40,26 @@ import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.compute.virt.HypervisorEvent
import com.atlarge.opendc.core.services.ServiceKey
import com.atlarge.opendc.core.services.ServiceRegistry
-import com.atlarge.opendc.core.workload.IMAGE_PERF_INTERFERENCE_MODEL
-import com.atlarge.opendc.core.workload.PerformanceInterferenceModel
-import kotlinx.coroutines.CancellableContinuation
+import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL
+import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.Job
-import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
-import kotlinx.coroutines.flow.filter
-import kotlinx.coroutines.flow.launchIn
-import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.selects.SelectClause0
import kotlinx.coroutines.selects.select
-import kotlinx.coroutines.suspendCancellableCoroutine
-import kotlinx.coroutines.withContext
import java.util.Objects
import java.util.TreeSet
import java.util.UUID
+import kotlin.coroutines.Continuation
import kotlin.coroutines.resume
+import kotlin.coroutines.suspendCoroutine
import kotlin.math.ceil
import kotlin.math.max
import kotlin.math.min
@@ -101,15 +96,6 @@ class SimpleVirtDriver(
override val events: Flow<HypervisorEvent> = eventFlow
init {
- events.filter { it is HypervisorEvent.VmsUpdated }.onEach {
- val imagesRunning = vms.map { it.server.image }.toSet()
- vms.forEach {
- val performanceModel =
- it.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel?
- performanceModel?.computeIntersectingItems(imagesRunning)
- }
- }.launchIn(this)
-
launch {
try {
scheduler()
@@ -140,6 +126,7 @@ class SimpleVirtDriver(
)
availableMemory -= requiredMemory
vms.add(VmServerContext(server, events, simulationContext.domain))
+ vmStarted(server)
eventFlow.emit(HypervisorEvent.VmsUpdated(this, vms.size, availableMemory))
return server
}
@@ -148,6 +135,22 @@ class SimpleVirtDriver(
eventFlow.close()
}
+ private fun vmStarted(server: Server) {
+ vms.forEach {
+ val performanceModel =
+ it.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel?
+ performanceModel?.vmStarted(server)
+ }
+ }
+
+ private fun vmStopped(server: Server) {
+ vms.forEach {
+ val performanceModel =
+ it.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel?
+ performanceModel?.vmStopped(server)
+ }
+ }
+
/**
* A scheduling command processed by the scheduler.
*/
@@ -240,7 +243,7 @@ class SimpleVirtDriver(
}
// XXX We set the minimum duration to 5 minutes here to prevent the rounding issues that are occurring with the FLOPs.
- duration = max(300.0, ceil(duration))
+ duration = 300.0
val totalAllocatedUsage = maxUsage - availableUsage
var totalAllocatedBurst = 0L
@@ -325,14 +328,14 @@ class SimpleVirtDriver(
requests.removeAll(vmRequests)
// Return vCPU `run` call: the requested burst was completed or deadline was exceeded
- vm.cont?.resume(Unit)
+ vm.chan?.resume(Unit)
}
}
eventFlow.emit(
HypervisorEvent.SliceFinished(
this@SimpleVirtDriver,
- totalRequestedSubBurst,
+ totalRequestedBurst,
min(totalRequestedSubBurst, totalGrantedBurst), // We can run more than requested due to timing
totalOvercommissionedBurst,
totalInterferedBurst, // Might be smaller than zero due to FP rounding errors,
@@ -371,7 +374,7 @@ class SimpleVirtDriver(
val vm: VmServerContext,
val vcpu: ProcessingUnit,
var burst: Long,
- val limit: Double
+ var limit: Double
) {
/**
* The usage that was actually granted.
@@ -395,7 +398,7 @@ class SimpleVirtDriver(
private var finalized: Boolean = false
lateinit var burst: LongArray
var deadline: Long = 0L
- var cont: CancellableContinuation<Unit>? = null
+ var chan: Continuation<Unit>? = null
private var initialized: Boolean = false
internal val job: Job = launch {
@@ -443,6 +446,7 @@ class SimpleVirtDriver(
server = server.copy(state = serverState)
availableMemory += server.flavor.memorySize
vms.remove(this)
+ vmStopped(server)
eventFlow.emit(HypervisorEvent.VmsUpdated(this@SimpleVirtDriver, vms.size, availableMemory))
events.close()
}
@@ -451,6 +455,7 @@ class SimpleVirtDriver(
require(burst.size == limit.size) { "Array dimensions do not match" }
this.deadline = deadline
this.burst = burst
+
val requests = cpus.asSequence()
.take(burst.size)
.mapIndexed { i, cpu ->
@@ -465,16 +470,13 @@ class SimpleVirtDriver(
// Wait until the burst has been run or the coroutine is cancelled
try {
- schedulingQueue.send(SchedulerCommand.Schedule(this, requests))
- suspendCancellableCoroutine<Unit> { cont = it }
+ schedulingQueue.offer(SchedulerCommand.Schedule(this, requests))
+ suspendCoroutine<Unit> { chan = it }
} catch (e: CancellationException) {
// Deschedule the VM
- withContext(NonCancellable) {
- requests.forEach { it.isCancelled = true }
- schedulingQueue.send(SchedulerCommand.Interrupt)
- suspendCancellableCoroutine<Unit> { cont = it }
- }
-
+ requests.forEach { it.isCancelled = true }
+ schedulingQueue.offer(SchedulerCommand.Interrupt)
+ suspendCoroutine<Unit> { chan = it }
e.assertFailure()
}
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
index 6200ad7c..520f6dc5 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
@@ -1,7 +1,6 @@
package com.atlarge.opendc.compute.virt.service
import com.atlarge.odcsim.SimulationContext
-import com.atlarge.odcsim.flow.EventFlow
import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.Server
@@ -20,7 +19,6 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
-import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
@@ -28,6 +26,7 @@ import kotlinx.coroutines.suspendCancellableCoroutine
import kotlinx.coroutines.withContext
import kotlin.coroutines.Continuation
import kotlin.coroutines.resume
+import kotlin.math.max
@OptIn(ExperimentalCoroutinesApi::class)
class SimpleVirtProvisioningService(
@@ -55,7 +54,14 @@ class SimpleVirtProvisioningService(
*/
private val activeImages: MutableSet<ImageView> = mutableSetOf()
- override val hypervisorEvents: Flow<HypervisorEvent> = EventFlow()
+ public var submittedVms = 0L
+ public var queuedVms = 0L
+ public var runningVms = 0L
+ public var finishedVms = 0L
+ public var unscheduledVms = 0L
+
+ private var maxCores = 0
+ private var maxMemory = 0L
/**
* The allocation logic to use.
@@ -87,6 +93,8 @@ class SimpleVirtProvisioningService(
image: Image,
flavor: Flavor
): Server = withContext(coroutineContext) {
+ submittedVms++
+ queuedVms++
suspendCancellableCoroutine<Server> { cont ->
val vmInstance = ImageView(name, image, flavor, cont)
incomingImages += vmInstance
@@ -121,15 +129,24 @@ class SimpleVirtProvisioningService(
}
private suspend fun schedule() {
- val log = simulationContext.log
+ val clock = simulationContext.clock
val imagesToBeScheduled = incomingImages.toSet()
for (imageInstance in imagesToBeScheduled) {
val requiredMemory = (imageInstance.image as VmImage).requiredMemory
- val selectedHv = allocationLogic.select(availableHypervisors, imageInstance) ?: break
+ val selectedHv = allocationLogic.select(availableHypervisors, imageInstance)
+
+ if (selectedHv == null) {
+ if (requiredMemory > maxMemory || imageInstance.flavor.cpuCount > maxCores) {
+ unscheduledVms++
+ println("[${clock.millis()}] CANNOT SPAWN ${imageInstance.image}")
+ }
+
+ break
+ }
try {
- log.info("Spawning ${imageInstance.image} on ${selectedHv.server}")
+ println("[${clock.millis()}] SPAWN ${imageInstance.image} on ${selectedHv.server.uid} ${selectedHv.server.name} ${selectedHv.server.flavor}")
incomingImages -= imageInstance
// Speculatively update the hypervisor view information to prevent other images in the queue from
@@ -145,6 +162,8 @@ class SimpleVirtProvisioningService(
)
imageInstance.server = server
imageInstance.continuation.resume(server)
+ queuedVms--
+ runningVms++
activeImages += imageInstance
server.events
@@ -152,6 +171,10 @@ class SimpleVirtProvisioningService(
when (event) {
is ServerEvent.StateChanged -> {
if (event.server.state == ServerState.SHUTOFF) {
+ println("[${clock.millis()}] FINISH ${event.server.uid} ${event.server.name} ${event.server.flavor}")
+ runningVms--
+ finishedVms++
+
activeImages -= imageInstance
selectedHv.provisionedCores -= server.flavor.cpuCount
@@ -170,6 +193,8 @@ class SimpleVirtProvisioningService(
selectedHv.numberOfActiveServers--
selectedHv.provisionedCores -= imageInstance.flavor.cpuCount
selectedHv.availableMemory += requiredMemory
+ } catch (e: Throwable) {
+ e.printStackTrace()
}
}
}
@@ -188,13 +213,18 @@ class SimpleVirtProvisioningService(
server.flavor.memorySize,
0
)
+ maxCores = max(maxCores, server.flavor.cpuCount)
+ maxMemory = max(maxMemory, server.flavor.memorySize)
hypervisors[server] = hv
}
}
ServerState.SHUTOFF, ServerState.ERROR -> {
val hv = hypervisors[server] ?: return
availableHypervisors -= hv
- requestCycle()
+
+ if (incomingImages.isNotEmpty()) {
+ requestCycle()
+ }
}
else -> throw IllegalStateException()
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt
index 550048a4..2ad7df84 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt
@@ -3,10 +3,8 @@ package com.atlarge.opendc.compute.virt.service
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.image.Image
-import com.atlarge.opendc.compute.virt.HypervisorEvent
import com.atlarge.opendc.compute.virt.driver.VirtDriver
import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy
-import kotlinx.coroutines.flow.Flow
/**
* A service for VM provisioning on a cloud.
@@ -18,11 +16,6 @@ interface VirtProvisioningService {
val allocationPolicy: AllocationPolicy
/**
- * The events emitted by the hypervisors.
- */
- public val hypervisorEvents: Flow<HypervisorEvent>
-
- /**
* Obtain the active hypervisors for this provisioner.
*/
public suspend fun drivers(): Set<VirtDriver>
diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/PerformanceInterferenceModel.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/PerformanceInterferenceModel.kt
deleted file mode 100644
index 04056394..00000000
--- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/PerformanceInterferenceModel.kt
+++ /dev/null
@@ -1,83 +0,0 @@
-package com.atlarge.opendc.core.workload
-
-import com.atlarge.opendc.core.resource.Resource
-import kotlin.random.Random
-
-/**
- * Meta-data key for the [PerformanceInterferenceModel] of an image.
- */
-const val IMAGE_PERF_INTERFERENCE_MODEL = "image:performance-interference"
-
-/**
- * Performance Interference Model describing the variability incurred by different sets of workloads if colocated.
- *
- * @param items The [PerformanceInterferenceModelItem]s that make up this model.
- */
-data class PerformanceInterferenceModel(
- val items: Set<PerformanceInterferenceModelItem>,
- val random: Random = Random(0)
-) {
- private var intersectingItems: List<PerformanceInterferenceModelItem> = emptyList()
- private var comparator = Comparator<PerformanceInterferenceModelItem> { lhs, rhs ->
- var cmp = lhs.performanceScore.compareTo(rhs.performanceScore)
- if (cmp != 0) {
- return@Comparator cmp
- }
-
- cmp = lhs.minServerLoad.compareTo(rhs.minServerLoad)
- if (cmp != 0) {
- return@Comparator cmp
- }
-
- 0
- }
-
- fun computeIntersectingItems(colocatedWorkloads: Set<Resource>) {
- val colocatedWorkloadIds = colocatedWorkloads.map { it.name }
- intersectingItems = items.filter { item ->
- colocatedWorkloadIds.intersect(item.workloadNames).size > 1
- }.sortedWith(comparator)
- }
-
- fun apply(currentServerLoad: Double): Double {
- if (intersectingItems.isEmpty()) {
- return 1.0
- }
- val score = intersectingItems
- .firstOrNull { it.minServerLoad <= currentServerLoad }
-
- // Apply performance penalty to (on average) only one of the VMs
- return if (score != null && random.nextInt(score.workloadNames.size) == 0) {
- score.performanceScore
- } else {
- 1.0
- }
- }
-}
-
-/**
- * Model describing how a specific set of workloads causes performance variability for each workload.
- *
- * @param workloadNames The names of the workloads that together cause performance variability for each workload in the set.
- * @param minServerLoad The minimum total server load at which this interference is activated and noticeable.
- * @param performanceScore The performance score that should be applied to each workload's performance. 1 means no
- * influence, <1 means that performance degrades, and >1 means that performance improves.
- */
-data class PerformanceInterferenceModelItem(
- val workloadNames: Set<String>,
- val minServerLoad: Double,
- val performanceScore: Double
-) {
- override fun equals(other: Any?): Boolean {
- if (this === other) return true
- if (javaClass != other?.javaClass) return false
-
- other as PerformanceInterferenceModelItem
-
- if (workloadNames != other.workloadNames) return false
-
- return true
- }
-
- override fun hashCode(): Int = workloadNames.hashCode()
-}
diff --git a/opendc/opendc-experiments-sc20/build.gradle.kts b/opendc/opendc-experiments-sc20/build.gradle.kts
index 28b8ae12..8611ffa7 100644
--- a/opendc/opendc-experiments-sc20/build.gradle.kts
+++ b/opendc/opendc-experiments-sc20/build.gradle.kts
@@ -32,6 +32,7 @@ plugins {
application {
mainClassName = "com.atlarge.opendc.experiments.sc20.TestExperimentKt"
+ applicationDefaultJvmArgs = listOf("-Xmx2500M", "-Xms1800M")
}
dependencies {
@@ -40,7 +41,10 @@ dependencies {
implementation(kotlin("stdlib"))
implementation("com.xenomachina:kotlin-argparser:2.0.7")
api("com.fasterxml.jackson.module:jackson-module-kotlin:2.9.8")
-
+ implementation("org.apache.parquet:parquet-avro:1.11.0")
+ implementation("org.apache.hadoop:hadoop-client:3.2.1") {
+ exclude(group = "org.slf4j", module = "slf4j-log4j12")
+ }
runtimeOnly("org.slf4j:slf4j-simple:${Library.SLF4J}")
runtimeOnly(project(":odcsim:odcsim-engine-omega"))
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt
index 7e6398bb..bac0de21 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt
@@ -6,23 +6,71 @@ import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.metal.driver.BareMetalDriver
import com.atlarge.opendc.compute.virt.driver.VirtDriver
import kotlinx.coroutines.flow.first
-import java.io.BufferedWriter
+import org.apache.avro.SchemaBuilder
+import org.apache.avro.generic.GenericData
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.parquet.hadoop.metadata.CompressionCodecName
import java.io.Closeable
-import java.io.FileWriter
+import java.util.concurrent.ArrayBlockingQueue
+import kotlin.concurrent.thread
class Sc20Monitor(
destination: String
) : Closeable {
- private val outputFile = BufferedWriter(FileWriter(destination))
private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>()
-
- init {
- outputFile.write("time,duration,requestedBurst,grantedBurst,overcommissionedBurst,interferedBurst,cpuUsage,cpuDemand,numberOfDeployedImages,server,hostState,hostUsage,powerDraw\n")
+ private val schema = SchemaBuilder
+ .record("slice")
+ .namespace("com.atlarge.opendc.experiments.sc20")
+ .fields()
+ .name("time").type().longType().noDefault()
+ .name("duration").type().longType().noDefault()
+ .name("requestedBurst").type().longType().noDefault()
+ .name("grantedBurst").type().longType().noDefault()
+ .name("overcommissionedBurst").type().longType().noDefault()
+ .name("interferedBurst").type().longType().noDefault()
+ .name("cpuUsage").type().doubleType().noDefault()
+ .name("cpuDemand").type().doubleType().noDefault()
+ .name("numberOfDeployedImages").type().intType().noDefault()
+ .name("server").type().stringType().noDefault()
+ .name("hostState").type().stringType().noDefault()
+ .name("hostUsage").type().doubleType().noDefault()
+ .name("powerDraw").type().doubleType().noDefault()
+ .name("totalSubmittedVms").type().longType().noDefault()
+ .name("totalQueuedVms").type().longType().noDefault()
+ .name("totalRunningVms").type().longType().noDefault()
+ .name("totalFinishedVms").type().longType().noDefault()
+ .endRecord()
+ private val writer = AvroParquetWriter.builder<GenericData.Record>(Path(destination))
+ .withSchema(schema)
+ .withCompressionCodec(CompressionCodecName.SNAPPY)
+ .withPageSize(4 * 1024 * 1024) // For compression
+ .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
+ .build()
+ private val queue = ArrayBlockingQueue<GenericData.Record>(2048)
+ private val writerThread = thread(start = true, name = "sc20-writer") {
+ try {
+ while (true) {
+ val record = queue.take()
+ writer.write(record)
+ }
+ } catch (e: InterruptedException) {
+ // Do not rethrow this
+ } finally {
+ writer.close()
+ }
}
suspend fun onVmStateChanged(server: Server) {}
- suspend fun serverStateChanged(driver: VirtDriver, server: Server) {
+ suspend fun serverStateChanged(
+ driver: VirtDriver,
+ server: Server,
+ submittedVms: Long,
+ queuedVms: Long,
+ runningVms: Long,
+ finishedVms: Long
+ ) {
val lastServerState = lastServerStates[server]
if (server.state == ServerState.SHUTOFF && lastServerState != null) {
val duration = simulationContext.clock.millis() - lastServerState.second
@@ -36,6 +84,10 @@ class Sc20Monitor(
0.0,
0,
server,
+ submittedVms,
+ queuedVms,
+ runningVms,
+ finishedVms,
duration
)
}
@@ -55,21 +107,44 @@ class Sc20Monitor(
cpuDemand: Double,
numberOfDeployedImages: Int,
hostServer: Server,
+ submittedVms: Long,
+ queuedVms: Long,
+ runningVms: Long,
+ finishedVms: Long,
duration: Long = 5 * 60 * 1000L
) {
- lastServerStates.remove(hostServer)
-
// Assume for now that the host is not virtualized and measure the current power draw
val driver = hostServer.services[BareMetalDriver.Key]
val usage = driver.usage.first()
val powerDraw = driver.powerDraw.first()
- outputFile.write("$time,$duration,$requestedBurst,$grantedBurst,$overcommissionedBurst,$interferedBurst,$cpuUsage,$cpuDemand,$numberOfDeployedImages,${hostServer.uid},${hostServer.state},$usage,$powerDraw")
- outputFile.newLine()
+ val record = GenericData.Record(schema)
+ record.put("time", time)
+ record.put("duration", duration)
+ record.put("requestedBurst", requestedBurst)
+ record.put("grantedBurst", grantedBurst)
+ record.put("overcommissionedBurst", overcommissionedBurst)
+ record.put("interferedBurst", interferedBurst)
+ record.put("cpuUsage", cpuUsage)
+ record.put("cpuDemand", cpuDemand)
+ record.put("numberOfDeployedImages", numberOfDeployedImages)
+ record.put("server", hostServer.uid)
+ record.put("hostState", hostServer.state)
+ record.put("hostUsage", usage)
+ record.put("powerDraw", powerDraw)
+ record.put("totalSubmittedVms", submittedVms)
+ record.put("totalQueuedVms", queuedVms)
+ record.put("totalRunningVms", runningVms)
+ record.put("totalFinishedVms", finishedVms)
+
+ queue.put(record)
}
override fun close() {
- outputFile.flush()
- outputFile.close()
+ // Busy loop to wait for writer thread to finish
+ while (queue.isNotEmpty()) {
+ Thread.sleep(500)
+ }
+ writerThread.interrupt()
}
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt
new file mode 100644
index 00000000..0a7718e9
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt
@@ -0,0 +1,290 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20
+
+import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment
+import com.atlarge.opendc.compute.core.image.VmImage
+import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL
+import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
+import com.atlarge.opendc.compute.core.workload.VmWorkload
+import com.atlarge.opendc.core.User
+import com.atlarge.opendc.format.trace.TraceEntry
+import com.atlarge.opendc.format.trace.TraceReader
+import org.apache.avro.generic.GenericData
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.avro.AvroParquetReader
+import org.apache.parquet.filter2.compat.FilterCompat
+import org.apache.parquet.filter2.predicate.FilterApi
+import org.apache.parquet.filter2.predicate.Statistics
+import org.apache.parquet.filter2.predicate.UserDefinedPredicate
+import org.apache.parquet.io.api.Binary
+import java.io.File
+import java.io.Serializable
+import java.util.SortedSet
+import java.util.TreeSet
+import java.util.UUID
+import java.util.concurrent.ArrayBlockingQueue
+import kotlin.concurrent.thread
+import kotlin.random.Random
+
+/**
+ * A [TraceReader] for the internal VM workload trace format.
+ *
+ * @param traceFile The directory of the traces.
+ * @param performanceInterferenceModel The performance model covering the workload in the VM trace.
+ */
+@OptIn(ExperimentalStdlibApi::class)
+class Sc20ParquetTraceReader(
+ traceFile: File,
+ performanceInterferenceModel: PerformanceInterferenceModel,
+ selectedVms: List<String>,
+ random: Random
+) : TraceReader<VmWorkload> {
+ /**
+ * The internal iterator to use for this reader.
+ */
+ private val iterator: Iterator<TraceEntry<VmWorkload>>
+
+ /**
+ * The intermediate buffer to store the read records in.
+ */
+ private val queue = ArrayBlockingQueue<Pair<String, FlopsHistoryFragment>>(1024)
+
+ /**
+ * An optional filter for filtering the selected VMs
+ */
+ private val filter =
+ if (selectedVms.isEmpty())
+ null
+ else
+ FilterCompat.get(FilterApi.userDefined(FilterApi.binaryColumn("id"), SelectedVmFilter(TreeSet(selectedVms))))
+
+ /**
+ * A poisonous fragment.
+ */
+ private val poison = Pair("\u0000", FlopsHistoryFragment(0, 0, 0, 0.0, 0))
+
+ /**
+ * The thread to read the records in.
+ */
+ private val readerThread = thread(start = true, name = "sc20-reader") {
+ val reader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "trace.parquet"))
+ .disableCompatibility()
+ .run { if (filter != null) withFilter(filter) else this }
+ .build()
+
+ try {
+ while (true) {
+ val record = reader.read()
+
+ if (record == null) {
+ queue.put(poison)
+ break
+ }
+
+ val id = record["id"].toString()
+ val tick = record["time"] as Long
+ val duration = record["duration"] as Long
+ val cores = record["cores"] as Int
+ val cpuUsage = record["cpuUsage"] as Double
+ val flops = record["flops"] as Long
+
+ val fragment = FlopsHistoryFragment(
+ tick,
+ flops,
+ duration,
+ cpuUsage,
+ cores
+ )
+
+ queue.put(id to fragment)
+ }
+ } catch (e: InterruptedException) {
+ // Do not rethrow this
+ } finally {
+ reader.close()
+ }
+ }
+
+ /**
+ * Fill the buffers with the VMs
+ */
+ private fun pull(buffers: Map<String, List<MutableList<FlopsHistoryFragment>>>) {
+ if (!hasNext) {
+ return
+ }
+
+ val fragments = mutableListOf<Pair<String, FlopsHistoryFragment>>()
+ queue.drainTo(fragments)
+
+ for ((id, fragment) in fragments) {
+ if (id == poison.first) {
+ hasNext = false
+ return
+ }
+ buffers[id]?.forEach { it.add(fragment) }
+ }
+ }
+
+ /**
+ * A flag to indicate whether the reader has more entries.
+ */
+ private var hasNext: Boolean = true
+
+ /**
+ * Initialize the reader.
+ */
+ init {
+ val takenIds = mutableSetOf<UUID>()
+ val entries = mutableMapOf<String, GenericData.Record>()
+ val buffers = mutableMapOf<String, MutableList<MutableList<FlopsHistoryFragment>>>()
+
+ val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "meta.parquet"))
+ .disableCompatibility()
+ .run { if (filter != null) withFilter(filter) else this }
+ .build()
+
+ while (true) {
+ val record = metaReader.read() ?: break
+ val id = record["id"].toString()
+ entries[id] = record
+ }
+
+ metaReader.close()
+
+ val selection = if (selectedVms.isEmpty()) entries.keys else selectedVms
+
+ // Create the entry iterator
+ iterator = selection.asSequence()
+ .mapNotNull { entries[it] }
+ .mapIndexed { index, record ->
+ val id = record["id"].toString()
+ val submissionTime = record["submissionTime"] as Long
+ val endTime = record["endTime"] as Long
+ val maxCores = record["maxCores"] as Int
+ val requiredMemory = record["requiredMemory"] as Long
+ val uid = UUID.nameUUIDFromBytes("$id-$index".toByteArray())
+
+ assert(uid !in takenIds)
+ takenIds += uid
+
+ println(id)
+
+ val internalBuffer = mutableListOf<FlopsHistoryFragment>()
+ val externalBuffer = mutableListOf<FlopsHistoryFragment>()
+ buffers.getOrPut(id) { mutableListOf() }.add(externalBuffer)
+ val fragments = sequence<FlopsHistoryFragment> {
+ repeat@while (true) {
+ if (externalBuffer.isEmpty()) {
+ if (hasNext) {
+ pull(buffers)
+ continue
+ } else {
+ break
+ }
+ }
+
+ internalBuffer.addAll(externalBuffer)
+ externalBuffer.clear()
+
+ for (fragment in internalBuffer) {
+ yield(fragment)
+
+ if (fragment.tick >= endTime) {
+ break@repeat
+ }
+ }
+
+ internalBuffer.clear()
+ }
+
+ buffers.remove(id)
+ }
+ val relevantPerformanceInterferenceModelItems =
+ PerformanceInterferenceModel(
+ performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSet(),
+ Random(random.nextInt())
+ )
+ val vmWorkload = VmWorkload(
+ uid, "VM Workload $id", UnnamedUser,
+ VmImage(
+ uid,
+ id,
+ mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems),
+ fragments,
+ maxCores,
+ requiredMemory
+ )
+ )
+
+ TraceEntryImpl(submissionTime, vmWorkload)
+ }
+ .sortedBy { it.submissionTime }
+ .toList()
+ .iterator()
+ }
+
+ override fun hasNext(): Boolean = iterator.hasNext()
+
+ override fun next(): TraceEntry<VmWorkload> = iterator.next()
+
+ override fun close() {
+ readerThread.interrupt()
+ }
+
+ private class SelectedVmFilter(val selectedVms: SortedSet<String>) : UserDefinedPredicate<Binary>(), Serializable {
+ override fun keep(value: Binary?): Boolean = value != null && selectedVms.contains(value.toStringUsingUTF8())
+
+ override fun canDrop(statistics: Statistics<Binary>): Boolean {
+ val min = statistics.min
+ val max = statistics.max
+
+ return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isEmpty()
+ }
+
+ override fun inverseCanDrop(statistics: Statistics<Binary>): Boolean {
+ val min = statistics.min
+ val max = statistics.max
+
+ return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isNotEmpty()
+ }
+ }
+
+ /**
+ * An unnamed user.
+ */
+ private object UnnamedUser : User {
+ override val name: String = "<unnamed>"
+ override val uid: UUID = UUID.randomUUID()
+ }
+
+ /**
+ * An entry in the trace.
+ */
+ private data class TraceEntryImpl(
+ override var submissionTime: Long,
+ override val workload: VmWorkload
+ ) : TraceEntry<VmWorkload>
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
index c75bde30..028cfb9a 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
@@ -29,7 +29,6 @@ import com.atlarge.odcsim.SimulationEngineProvider
import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.ServerEvent
-import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.metal.NODE_CLUSTER
import com.atlarge.opendc.compute.metal.service.ProvisioningService
import com.atlarge.opendc.compute.virt.HypervisorEvent
@@ -45,7 +44,6 @@ import com.atlarge.opendc.core.failure.FailureDomain
import com.atlarge.opendc.core.failure.FaultInjector
import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader
-import com.atlarge.opendc.format.trace.sc20.Sc20TraceReader
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import com.xenomachina.argparser.ArgParser
@@ -70,13 +68,15 @@ class ExperimentParameters(parser: ArgParser) {
val environmentFile by parser.storing("path to the environment file")
val performanceInterferenceFile by parser.storing("path to the performance interference file").default { null }
val outputFile by parser.storing("path to where the output should be stored")
- .default { "sc20-experiment-results.csv" }
+ .default { "data/results-${System.currentTimeMillis()}.parquet" }
val selectedVms by parser.storing("the VMs to run") { parseVMs(this) }
.default { emptyList() }
val selectedVmsFile by parser.storing("path to a file containing the VMs to run") {
parseVMs(FileReader(File(this)).readText())
}
.default { emptyList() }
+ val seed by parser.storing("the random seed") { toInt() }
+ .default(0)
val failures by parser.flagging("-x", "--failures", help = "enable (correlated) machine failures")
val allocationPolicy by parser.storing("name of VM allocation policy to use").default("core-mem")
@@ -118,6 +118,14 @@ fun createFaultInjector(domain: Domain, random: Random): FaultInjector {
@OptIn(ExperimentalCoroutinesApi::class)
fun main(args: Array<String>) {
ArgParser(args).parseInto(::ExperimentParameters).run {
+ println("trace-directory: $traceDirectory")
+ println("environment-file: $environmentFile")
+ println("performance-interference-file: $performanceInterferenceFile")
+ println("selected-vms-file: $selectedVmsFile")
+ println("seed: $seed")
+ println("failures: $failures")
+ println("allocation-policy: $allocationPolicy")
+
val start = System.currentTimeMillis()
val monitor = Sc20Monitor(outputFile)
@@ -135,7 +143,7 @@ fun main(args: Array<String>) {
"active-servers-inv" to NumberOfActiveServersAllocationPolicy(true),
"provisioned-cores" to ProvisionedCoresAllocationPolicy(),
"provisioned-cores-inv" to ProvisionedCoresAllocationPolicy(true),
- "random" to RandomAllocationPolicy()
+ "random" to RandomAllocationPolicy(Random(seed))
)
if (allocationPolicy !in allocationPolicies) {
@@ -174,20 +182,16 @@ fun main(args: Array<String>) {
delay(10)
val hypervisors = scheduler.drivers()
- var availableHypervisors = hypervisors.size
// Monitor hypervisor events
for (hypervisor in hypervisors) {
// TODO Do not expose VirtDriver directly but use Hypervisor class.
- monitor.serverStateChanged(hypervisor, (hypervisor as SimpleVirtDriver).server)
+ monitor.serverStateChanged(hypervisor, (hypervisor as SimpleVirtDriver).server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms)
hypervisor.server.events
.onEach { event ->
when (event) {
is ServerEvent.StateChanged -> {
- monitor.serverStateChanged(hypervisor, event.server)
-
- if (event.server.state == ServerState.ERROR)
- availableHypervisors -= 1
+ monitor.serverStateChanged(hypervisor, event.server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms)
}
}
}
@@ -204,7 +208,11 @@ fun main(args: Array<String>) {
event.cpuUsage,
event.cpuDemand,
event.numberOfDeployedImages,
- event.hostServer
+ event.hostServer,
+ scheduler.submittedVms,
+ scheduler.queuedVms,
+ scheduler.runningVms,
+ scheduler.finishedVms
)
}
}
@@ -216,7 +224,7 @@ fun main(args: Array<String>) {
val domain = root.newDomain(name = "failures")
domain.launch {
chan.receive()
- val random = Random(0)
+ val random = Random(seed)
val injectors = mutableMapOf<String, FaultInjector>()
for (node in bareMetalProvisioner.nodes()) {
val cluster = node.metadata[NODE_CLUSTER] as String
@@ -229,15 +237,13 @@ fun main(args: Array<String>) {
null
}
- val finish = Channel<Unit>(Channel.RENDEZVOUS)
-
- var submitted = 0
- var finished = 0
- val reader = Sc20TraceReader(File(traceDirectory), performanceInterferenceModel, getSelectedVmList())
+ var submitted = 0L
+ val finished = Channel<Unit>(Channel.RENDEZVOUS)
+ val reader = Sc20ParquetTraceReader(File(traceDirectory), performanceInterferenceModel, getSelectedVmList(), Random(seed))
while (reader.hasNext()) {
val (time, workload) = reader.next()
- delay(max(0, time - simulationContext.clock.millis()))
submitted++
+ delay(max(0, time - simulationContext.clock.millis()))
launch {
chan.send(Unit)
val server = scheduler.deploy(
@@ -247,27 +253,26 @@ fun main(args: Array<String>) {
// Monitor server events
server.events
.onEach {
- if (it is ServerEvent.StateChanged)
+ if (it is ServerEvent.StateChanged) {
monitor.onVmStateChanged(it.server)
-
- // Detect whether the VM has finished running
- if (it.server.state == ServerState.SHUTOFF) {
- finished++
}
- if (finished == submitted && !reader.hasNext()) {
- finish.send(Unit)
- }
+ finished.send(Unit)
}
.collect()
}
}
- finish.receive()
- scheduler.terminate()
+ while (scheduler.finishedVms + scheduler.unscheduledVms != submitted || reader.hasNext()) {
+ finished.receive()
+ }
+
+ println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}")
+
failureDomain?.cancel()
- println(simulationContext.clock.instant())
- println("${System.currentTimeMillis() - start} milliseconds")
+ scheduler.terminate()
+ reader.close()
+ println("[${simulationContext.clock.millis()}] DONE ${System.currentTimeMillis() - start} milliseconds")
}
runBlocking {
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt
new file mode 100644
index 00000000..d005a157
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt
@@ -0,0 +1,193 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20
+
+import org.apache.avro.SchemaBuilder
+import org.apache.avro.generic.GenericData
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.parquet.hadoop.metadata.CompressionCodecName
+import java.io.BufferedReader
+import java.io.File
+import java.io.FileReader
+import kotlin.math.max
+import kotlin.math.min
+
+/**
+ *
+ * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
+ */
+fun main() {
+ val metaSchema = SchemaBuilder
+ .record("meta")
+ .namespace("com.atlarge.opendc.format.sc20")
+ .fields()
+ .name("id").type().stringType().noDefault()
+ .name("submissionTime").type().longType().noDefault()
+ .name("endTime").type().longType().noDefault()
+ .name("maxCores").type().intType().noDefault()
+ .name("requiredMemory").type().longType().noDefault()
+ .endRecord()
+ val schema = SchemaBuilder
+ .record("trace")
+ .namespace("com.atlarge.opendc.format.sc20")
+ .fields()
+ .name("id").type().stringType().noDefault()
+ .name("time").type().longType().noDefault()
+ .name("duration").type().longType().noDefault()
+ .name("cores").type().intType().noDefault()
+ .name("cpuUsage").type().doubleType().noDefault()
+ .name("flops").type().longType().noDefault()
+ .endRecord()
+
+ val timestampCol = 0
+ val cpuUsageCol = 1
+ val coreCol = 12
+ val vmIdCol = 19
+ val provisionedMemoryCol = 20
+ val traceInterval = 5 * 60 * 1000L
+
+ val dest = File("../traces/solvinity/small-parquet")
+ val traceDirectory = File("../traces/solvinity/small")
+ val vms =
+ traceDirectory.walk()
+ .filterNot { it.isDirectory }
+ .filter { it.extension == "csv" || it.extension == "txt" }
+ .toList()
+
+ val metaWriter = AvroParquetWriter.builder<GenericData.Record>(Path(dest.absolutePath, "meta.parquet"))
+ .withSchema(metaSchema)
+ .withCompressionCodec(CompressionCodecName.SNAPPY)
+ .withPageSize(4 * 1024 * 1024) // For compression
+ .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
+ .build()
+
+ val allFragments = mutableListOf<Fragment>()
+
+ vms
+ .forEachIndexed { idx, vmFile ->
+ println(vmFile)
+
+ var vmId = ""
+ var maxCores = -1
+ var requiredMemory = -1L
+ var cores = -1
+ var minTime = Long.MAX_VALUE
+
+ val flopsFragments = sequence {
+ var last: Fragment? = null
+
+ BufferedReader(FileReader(vmFile)).use { reader ->
+ reader.lineSequence()
+ .chunked(128)
+ .forEach { lines ->
+ for (line in lines) {
+ // Ignore comments in the trace
+ if (line.startsWith("#") || line.isBlank()) {
+ continue
+ }
+
+ val values = line.split(" ")
+
+ vmId = vmFile.name
+ val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L
+ cores = values[coreCol].trim().toInt()
+ requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong())
+ maxCores = max(maxCores, cores)
+ minTime = min(minTime, timestamp)
+ val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz
+ requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong())
+ maxCores = max(maxCores, cores)
+
+ val flops: Long = (cpuUsage * 5 * 60).toLong()
+
+ last = if (last != null && last!!.flops == 0L && flops == 0L) {
+ val oldFragment = last!!
+ Fragment(
+ vmId,
+ oldFragment.tick,
+ oldFragment.flops + flops,
+ oldFragment.duration + traceInterval,
+ cpuUsage,
+ cores
+ )
+ } else {
+ val fragment =
+ Fragment(vmId, timestamp, flops, traceInterval, cpuUsage, cores)
+ if (last != null) {
+ yield(last!!)
+ }
+ fragment
+ }
+ }
+ }
+ }
+
+ if (last != null) {
+ yield(last!!)
+ }
+ }
+
+ var maxTime = Long.MIN_VALUE
+ flopsFragments.forEach { fragment ->
+ allFragments.add(fragment)
+ maxTime = max(maxTime, fragment.tick)
+ }
+
+ val metaRecord = GenericData.Record(metaSchema)
+ metaRecord.put("id", vmId)
+ metaRecord.put("submissionTime", minTime)
+ metaRecord.put("endTime", maxTime)
+ metaRecord.put("maxCores", maxCores)
+ metaRecord.put("requiredMemory", requiredMemory)
+ metaWriter.write(metaRecord)
+ }
+
+ val writer = AvroParquetWriter.builder<GenericData.Record>(Path(dest.absolutePath, "trace.parquet"))
+ .withSchema(schema)
+ .withCompressionCodec(CompressionCodecName.SNAPPY)
+ .withPageSize(4 * 1024 * 1024) // For compression
+ .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
+ .build()
+
+ allFragments.sortWith(compareBy<Fragment> { it.tick }.thenBy { it.id })
+
+ for (fragment in allFragments) {
+ val record = GenericData.Record(schema)
+ record.put("id", fragment.id)
+ record.put("time", fragment.tick)
+ record.put("duration", fragment.duration)
+ record.put("cores", fragment.cores)
+ record.put("cpuUsage", fragment.usage)
+ record.put("flops", fragment.flops)
+
+ writer.write(record)
+ }
+
+ writer.close()
+ metaWriter.close()
+}
+
+data class Fragment(val id: String, val tick: Long, val flops: Long, val duration: Long, val usage: Double, val cores: Int)
diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt
index 89a59e1c..2ef0db97 100644
--- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt
+++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt
@@ -41,6 +41,7 @@ import com.atlarge.opendc.format.environment.EnvironmentReader
import java.io.BufferedReader
import java.io.File
import java.io.FileReader
+import java.util.Random
import java.util.UUID
/**
@@ -67,6 +68,7 @@ class Sc20ClusterEnvironmentReader(
var coresPerHost: Int
val nodes = mutableListOf<SimpleBareMetalDriver>()
+ val random = Random(0)
BufferedReader(FileReader(environmentFile)).use { reader ->
reader.lineSequence()
@@ -87,6 +89,7 @@ class Sc20ClusterEnvironmentReader(
return@forEachIndexed
}
+ clusterIdx++
clusterId = values[clusterIdCol].trim()
speed = values[speedCol].trim().toDouble() * 1000.0
numberOfHosts = values[numberOfHostsCol].trim().toInt()
@@ -100,7 +103,7 @@ class Sc20ClusterEnvironmentReader(
nodes.add(
SimpleBareMetalDriver(
dom.newDomain("node-$clusterId-$it"),
- UUID((clusterIdx++).toLong(), it.toLong()),
+ UUID(random.nextLong(), random.nextLong()),
"node-$clusterId-$it",
mapOf(NODE_CLUSTER to clusterId),
List(coresPerHost) { coreId ->
diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/PerformanceInterferenceModelReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/PerformanceInterferenceModelReader.kt
index f9ebba3d..a653e643 100644
--- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/PerformanceInterferenceModelReader.kt
+++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/PerformanceInterferenceModelReader.kt
@@ -24,7 +24,7 @@
package com.atlarge.opendc.format.trace
-import com.atlarge.opendc.core.workload.PerformanceInterferenceModel
+import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
import java.io.Closeable
/**
diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt
index daa1fdf8..8562cefe 100644
--- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt
+++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt
@@ -24,13 +24,14 @@
package com.atlarge.opendc.format.trace.sc20
-import com.atlarge.opendc.core.workload.PerformanceInterferenceModel
-import com.atlarge.opendc.core.workload.PerformanceInterferenceModelItem
+import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
+import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModelItem
import com.atlarge.opendc.format.trace.PerformanceInterferenceModelReader
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import java.io.InputStream
+import java.util.TreeSet
/**
* A parser for the JSON performance interference setup files used for the SC20 paper.
@@ -49,7 +50,7 @@ class Sc20PerformanceInterferenceReader(input: InputStream, mapper: ObjectMapper
return PerformanceInterferenceModel(
performanceInterferenceModel.map { item ->
PerformanceInterferenceModelItem(
- item.vms.toSet(),
+ TreeSet(item.vms),
item.minServerLoad,
item.performanceScore
)
diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt
index c40cb039..2e2159ba 100644
--- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt
+++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt
@@ -28,8 +28,8 @@ import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment
import com.atlarge.opendc.compute.core.image.VmImage
import com.atlarge.opendc.compute.core.workload.VmWorkload
import com.atlarge.opendc.core.User
-import com.atlarge.opendc.core.workload.IMAGE_PERF_INTERFERENCE_MODEL
-import com.atlarge.opendc.core.workload.PerformanceInterferenceModel
+import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL
+import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
import com.atlarge.opendc.format.trace.TraceEntry
import com.atlarge.opendc.format.trace.TraceReader
import java.io.BufferedReader
@@ -37,6 +37,8 @@ import java.io.File
import java.io.FileReader
import java.util.UUID
import kotlin.math.max
+import kotlin.math.min
+import kotlin.random.Random
/**
* A [TraceReader] for the internal VM workload trace format.
@@ -47,7 +49,8 @@ import kotlin.math.max
class Sc20TraceReader(
traceDirectory: File,
performanceInterferenceModel: PerformanceInterferenceModel,
- selectedVms: List<String>
+ selectedVms: List<String>,
+ random: Random
) : TraceReader<VmWorkload> {
/**
* The internal iterator to use for this reader.
@@ -81,10 +84,13 @@ class Sc20TraceReader(
vms
.forEachIndexed { idx, vmFile ->
println(vmFile)
- val flopsHistory = mutableListOf<FlopsHistoryFragment>()
+
var vmId = ""
var maxCores = -1
var requiredMemory = -1L
+ var timestamp = -1L
+ var cores = -1
+ var minTime = Long.MAX_VALUE
BufferedReader(FileReader(vmFile)).use { reader ->
reader.lineSequence()
@@ -96,54 +102,82 @@ class Sc20TraceReader(
val values = line.split(" ")
vmId = vmFile.name
- val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L
- val cores = values[coreCol].trim().toInt()
- val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz
+ timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L
+ cores = values[coreCol].trim().toInt()
requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong())
maxCores = max(maxCores, cores)
+ minTime = min(minTime, timestamp)
+ }
+ }
- val flops: Long = (cpuUsage * 5 * 60).toLong()
-
- if (flopsHistory.isEmpty()) {
- flopsHistory.add(FlopsHistoryFragment(timestamp, flops, traceInterval, cpuUsage, cores))
- } else {
- // Restrict merging to empty fragments for now
- if (flopsHistory.last().flops == 0L && flops == 0L) {
- val oldFragment = flopsHistory.removeAt(flopsHistory.size - 1)
- flopsHistory.add(
+ val flopsFragments = sequence {
+ var last: FlopsHistoryFragment? = null
+
+ BufferedReader(FileReader(vmFile)).use { reader ->
+ reader.lineSequence()
+ .chunked(128)
+ .forEach { lines ->
+ // val res = ArrayList<FlopsHistoryFragment>(lines.size)
+ for (line in lines) {
+ // Ignore comments in the trace
+ if (line.startsWith("#") || line.isBlank()) {
+ continue
+ }
+
+ val values = line.split(" ")
+ val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz
+ requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong())
+ maxCores = max(maxCores, cores)
+
+ val flops: Long = (cpuUsage * 5 * 60).toLong()
+
+ last = if (last != null && last!!.flops == 0L && flops == 0L) {
+ val oldFragment = last!!
FlopsHistoryFragment(
oldFragment.tick,
oldFragment.flops + flops,
oldFragment.duration + traceInterval,
cpuUsage,
- cores)
- )
- } else {
- flopsHistory.add(FlopsHistoryFragment(timestamp, flops, traceInterval, cpuUsage, cores))
+ cores
+ )
+ } else {
+ val fragment =
+ FlopsHistoryFragment(timestamp, flops, traceInterval, cpuUsage, cores)
+ if (last != null) {
+ yield(last!!)
+ }
+ fragment
+ }
}
+ // yieldAll(res)
}
+
+ if (last != null) {
+ yield(last!!)
}
+ }
}
val uuid = UUID(0, idx.toLong())
- val relevantPerformanceInterferenceModelItems = PerformanceInterferenceModel(
- performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId) }.toSet()
- )
-
+ val relevantPerformanceInterferenceModelItems =
+ PerformanceInterferenceModel(
+ performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId) }.toSet(),
+ Random(random.nextInt())
+ )
val vmWorkload = VmWorkload(
uuid, "VM Workload $vmId", UnnamedUser,
VmImage(
uuid,
vmId,
mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems),
- flopsHistory,
+ flopsFragments.asSequence(),
maxCores,
requiredMemory
)
)
entries[uuid] = TraceEntryImpl(
- flopsHistory.firstOrNull()?.tick ?: -1,
+ minTime,
vmWorkload
)
}
diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt
index fbe77654..fe1049d9 100644
--- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt
+++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt
@@ -28,8 +28,8 @@ import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment
import com.atlarge.opendc.compute.core.image.VmImage
import com.atlarge.opendc.compute.core.workload.VmWorkload
import com.atlarge.opendc.core.User
-import com.atlarge.opendc.core.workload.IMAGE_PERF_INTERFERENCE_MODEL
-import com.atlarge.opendc.core.workload.PerformanceInterferenceModel
+import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL
+import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
import com.atlarge.opendc.format.trace.TraceEntry
import com.atlarge.opendc.format.trace.TraceReader
import java.io.BufferedReader
@@ -123,9 +123,10 @@ class VmTraceReader(
val uuid = UUID(0L, vmId)
- val relevantPerformanceInterferenceModelItems = PerformanceInterferenceModel(
- performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId.toString()) }.toSet()
- )
+ val relevantPerformanceInterferenceModelItems =
+ PerformanceInterferenceModel(
+ performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId.toString()) }.toSet()
+ )
val vmWorkload = VmWorkload(
uuid, "VM Workload $vmId", UnnamedUser,
@@ -133,7 +134,7 @@ class VmTraceReader(
uuid,
vmId.toString(),
mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems),
- flopsHistory,
+ flopsHistory.asSequence(),
cores,
requiredMemory
)