summaryrefslogtreecommitdiff
path: root/opendc/opendc-format
diff options
context:
space:
mode:
authorGeorgios Andreadis <g.andreadis@student.tudelft.nl>2020-04-20 15:21:35 +0200
committerGeorgios Andreadis <g.andreadis@student.tudelft.nl>2020-04-20 15:21:35 +0200
commit6981d5581e2ce5c6df42dfbf133c350bd9c35a0f (patch)
tree74e8993babb28dd56950f1da69eda2d97735a0e8 /opendc/opendc-format
parent60372f0022d423efd5267ef4008d9afcbe870911 (diff)
parent3e056406616860c77168d827f1ca9d8d3c79c08e (diff)
Merge branch 'bug/experiment-issues' into '2.x'
Address issues during experiments See merge request opendc/opendc-simulator!61
Diffstat (limited to 'opendc/opendc-format')
-rw-r--r--opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt5
-rw-r--r--opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/PerformanceInterferenceModelReader.kt2
-rw-r--r--opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt7
-rw-r--r--opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt86
-rw-r--r--opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt13
5 files changed, 76 insertions, 37 deletions
diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt
index 89a59e1c..2ef0db97 100644
--- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt
+++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt
@@ -41,6 +41,7 @@ import com.atlarge.opendc.format.environment.EnvironmentReader
import java.io.BufferedReader
import java.io.File
import java.io.FileReader
+import java.util.Random
import java.util.UUID
/**
@@ -67,6 +68,7 @@ class Sc20ClusterEnvironmentReader(
var coresPerHost: Int
val nodes = mutableListOf<SimpleBareMetalDriver>()
+ val random = Random(0)
BufferedReader(FileReader(environmentFile)).use { reader ->
reader.lineSequence()
@@ -87,6 +89,7 @@ class Sc20ClusterEnvironmentReader(
return@forEachIndexed
}
+ clusterIdx++
clusterId = values[clusterIdCol].trim()
speed = values[speedCol].trim().toDouble() * 1000.0
numberOfHosts = values[numberOfHostsCol].trim().toInt()
@@ -100,7 +103,7 @@ class Sc20ClusterEnvironmentReader(
nodes.add(
SimpleBareMetalDriver(
dom.newDomain("node-$clusterId-$it"),
- UUID((clusterIdx++).toLong(), it.toLong()),
+ UUID(random.nextLong(), random.nextLong()),
"node-$clusterId-$it",
mapOf(NODE_CLUSTER to clusterId),
List(coresPerHost) { coreId ->
diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/PerformanceInterferenceModelReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/PerformanceInterferenceModelReader.kt
index f9ebba3d..a653e643 100644
--- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/PerformanceInterferenceModelReader.kt
+++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/PerformanceInterferenceModelReader.kt
@@ -24,7 +24,7 @@
package com.atlarge.opendc.format.trace
-import com.atlarge.opendc.core.workload.PerformanceInterferenceModel
+import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
import java.io.Closeable
/**
diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt
index daa1fdf8..8562cefe 100644
--- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt
+++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt
@@ -24,13 +24,14 @@
package com.atlarge.opendc.format.trace.sc20
-import com.atlarge.opendc.core.workload.PerformanceInterferenceModel
-import com.atlarge.opendc.core.workload.PerformanceInterferenceModelItem
+import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
+import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModelItem
import com.atlarge.opendc.format.trace.PerformanceInterferenceModelReader
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import java.io.InputStream
+import java.util.TreeSet
/**
* A parser for the JSON performance interference setup files used for the SC20 paper.
@@ -49,7 +50,7 @@ class Sc20PerformanceInterferenceReader(input: InputStream, mapper: ObjectMapper
return PerformanceInterferenceModel(
performanceInterferenceModel.map { item ->
PerformanceInterferenceModelItem(
- item.vms.toSet(),
+ TreeSet(item.vms),
item.minServerLoad,
item.performanceScore
)
diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt
index c40cb039..2e2159ba 100644
--- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt
+++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt
@@ -28,8 +28,8 @@ import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment
import com.atlarge.opendc.compute.core.image.VmImage
import com.atlarge.opendc.compute.core.workload.VmWorkload
import com.atlarge.opendc.core.User
-import com.atlarge.opendc.core.workload.IMAGE_PERF_INTERFERENCE_MODEL
-import com.atlarge.opendc.core.workload.PerformanceInterferenceModel
+import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL
+import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
import com.atlarge.opendc.format.trace.TraceEntry
import com.atlarge.opendc.format.trace.TraceReader
import java.io.BufferedReader
@@ -37,6 +37,8 @@ import java.io.File
import java.io.FileReader
import java.util.UUID
import kotlin.math.max
+import kotlin.math.min
+import kotlin.random.Random
/**
* A [TraceReader] for the internal VM workload trace format.
@@ -47,7 +49,8 @@ import kotlin.math.max
class Sc20TraceReader(
traceDirectory: File,
performanceInterferenceModel: PerformanceInterferenceModel,
- selectedVms: List<String>
+ selectedVms: List<String>,
+ random: Random
) : TraceReader<VmWorkload> {
/**
* The internal iterator to use for this reader.
@@ -81,10 +84,13 @@ class Sc20TraceReader(
vms
.forEachIndexed { idx, vmFile ->
println(vmFile)
- val flopsHistory = mutableListOf<FlopsHistoryFragment>()
+
var vmId = ""
var maxCores = -1
var requiredMemory = -1L
+ var timestamp = -1L
+ var cores = -1
+ var minTime = Long.MAX_VALUE
BufferedReader(FileReader(vmFile)).use { reader ->
reader.lineSequence()
@@ -96,54 +102,82 @@ class Sc20TraceReader(
val values = line.split(" ")
vmId = vmFile.name
- val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L
- val cores = values[coreCol].trim().toInt()
- val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz
+ timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L
+ cores = values[coreCol].trim().toInt()
requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong())
maxCores = max(maxCores, cores)
+ minTime = min(minTime, timestamp)
+ }
+ }
- val flops: Long = (cpuUsage * 5 * 60).toLong()
-
- if (flopsHistory.isEmpty()) {
- flopsHistory.add(FlopsHistoryFragment(timestamp, flops, traceInterval, cpuUsage, cores))
- } else {
- // Restrict merging to empty fragments for now
- if (flopsHistory.last().flops == 0L && flops == 0L) {
- val oldFragment = flopsHistory.removeAt(flopsHistory.size - 1)
- flopsHistory.add(
+ val flopsFragments = sequence {
+ var last: FlopsHistoryFragment? = null
+
+ BufferedReader(FileReader(vmFile)).use { reader ->
+ reader.lineSequence()
+ .chunked(128)
+ .forEach { lines ->
+ // val res = ArrayList<FlopsHistoryFragment>(lines.size)
+ for (line in lines) {
+ // Ignore comments in the trace
+ if (line.startsWith("#") || line.isBlank()) {
+ continue
+ }
+
+ val values = line.split(" ")
+ val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz
+ requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong())
+ maxCores = max(maxCores, cores)
+
+ val flops: Long = (cpuUsage * 5 * 60).toLong()
+
+ last = if (last != null && last!!.flops == 0L && flops == 0L) {
+ val oldFragment = last!!
FlopsHistoryFragment(
oldFragment.tick,
oldFragment.flops + flops,
oldFragment.duration + traceInterval,
cpuUsage,
- cores)
- )
- } else {
- flopsHistory.add(FlopsHistoryFragment(timestamp, flops, traceInterval, cpuUsage, cores))
+ cores
+ )
+ } else {
+ val fragment =
+ FlopsHistoryFragment(timestamp, flops, traceInterval, cpuUsage, cores)
+ if (last != null) {
+ yield(last!!)
+ }
+ fragment
+ }
}
+ // yieldAll(res)
}
+
+ if (last != null) {
+ yield(last!!)
}
+ }
}
val uuid = UUID(0, idx.toLong())
- val relevantPerformanceInterferenceModelItems = PerformanceInterferenceModel(
- performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId) }.toSet()
- )
-
+ val relevantPerformanceInterferenceModelItems =
+ PerformanceInterferenceModel(
+ performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId) }.toSet(),
+ Random(random.nextInt())
+ )
val vmWorkload = VmWorkload(
uuid, "VM Workload $vmId", UnnamedUser,
VmImage(
uuid,
vmId,
mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems),
- flopsHistory,
+ flopsFragments.asSequence(),
maxCores,
requiredMemory
)
)
entries[uuid] = TraceEntryImpl(
- flopsHistory.firstOrNull()?.tick ?: -1,
+ minTime,
vmWorkload
)
}
diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt
index fbe77654..fe1049d9 100644
--- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt
+++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt
@@ -28,8 +28,8 @@ import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment
import com.atlarge.opendc.compute.core.image.VmImage
import com.atlarge.opendc.compute.core.workload.VmWorkload
import com.atlarge.opendc.core.User
-import com.atlarge.opendc.core.workload.IMAGE_PERF_INTERFERENCE_MODEL
-import com.atlarge.opendc.core.workload.PerformanceInterferenceModel
+import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL
+import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
import com.atlarge.opendc.format.trace.TraceEntry
import com.atlarge.opendc.format.trace.TraceReader
import java.io.BufferedReader
@@ -123,9 +123,10 @@ class VmTraceReader(
val uuid = UUID(0L, vmId)
- val relevantPerformanceInterferenceModelItems = PerformanceInterferenceModel(
- performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId.toString()) }.toSet()
- )
+ val relevantPerformanceInterferenceModelItems =
+ PerformanceInterferenceModel(
+ performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId.toString()) }.toSet()
+ )
val vmWorkload = VmWorkload(
uuid, "VM Workload $vmId", UnnamedUser,
@@ -133,7 +134,7 @@ class VmTraceReader(
uuid,
vmId.toString(),
mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems),
- flopsHistory,
+ flopsHistory.asSequence(),
cores,
requiredMemory
)