summaryrefslogtreecommitdiff
path: root/simulator/opendc-experiments/opendc-experiments-capelin/src
diff options
context:
space:
mode:
Diffstat (limited to 'simulator/opendc-experiments/opendc-experiments-capelin/src')
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt19
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt9
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt128
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt2
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt30
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt14
6 files changed, 51 insertions, 151 deletions
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt
index 799de60f..5f8002e2 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt
@@ -89,11 +89,10 @@ public class ExperimentMetricExporter(
hostMetric.interferedBurst,
hostMetric.cpuUsage,
hostMetric.cpuDemand,
+ hostMetric.powerDraw,
hostMetric.numberOfDeployedImages,
host
)
-
- monitor.reportPowerConsumption(host, hostMetric.powerDraw)
}
}
@@ -104,19 +103,9 @@ public class ExperimentMetricExporter(
val hostMetric = hostMetrics[uid]
if (hostMetric != null) {
- block(hostMetric, point.sum)
- }
- }
- }
-
- private fun mapDoubleSum(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) {
- val points = data?.doubleSumData?.points ?: emptyList()
- for (point in points) {
- val uid = point.labels["host"]
- val hostMetric = hostMetrics[uid]
-
- if (hostMetric != null) {
- block(hostMetric, point.value)
+ // Take the average of the summary
+ val avg = (point.percentileValues[0].value + point.percentileValues[1].value) / 2
+ block(hostMetric, avg)
}
}
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
index 5e75c890..68631dee 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
@@ -42,11 +42,6 @@ public interface ExperimentMonitor : AutoCloseable {
public fun reportHostStateChange(time: Long, host: Host, newState: HostState) {}
/**
- * Report the power consumption of a host.
- */
- public fun reportPowerConsumption(host: Host, draw: Double) {}
-
- /**
* This method is invoked for a host for each slice that is finishes.
*/
public fun reportHostSlice(
@@ -57,9 +52,9 @@ public interface ExperimentMonitor : AutoCloseable {
interferedBurst: Long,
cpuUsage: Double,
cpuDemand: Double,
+ powerDraw: Double,
numberOfDeployedImages: Int,
- host: Host,
- duration: Long = 5 * 60 * 1000L
+ host: Host
) {
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
index 0e675d87..983b4cff 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
@@ -50,52 +50,11 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
File(base, "provisioner-metrics/$partition/data.parquet"),
bufferSize
)
- private val currentHostEvent = mutableMapOf<Host, HostEvent>()
- private var startTime = -1L
- override fun reportVmStateChange(time: Long, server: Server, newState: ServerState) {
- if (startTime < 0) {
- startTime = time
-
- // Update timestamp of initial event
- currentHostEvent.replaceAll { _, v -> v.copy(timestamp = startTime) }
- }
- }
+ override fun reportVmStateChange(time: Long, server: Server, newState: ServerState) {}
override fun reportHostStateChange(time: Long, host: Host, newState: HostState) {
logger.debug { "Host ${host.uid} changed state $newState [$time]" }
-
- val previousEvent = currentHostEvent[host]
-
- val roundedTime = previousEvent?.let {
- val duration = time - it.timestamp
- val k = 5 * 60 * 1000L // 5 min in ms
- val rem = duration % k
-
- if (rem == 0L) {
- time
- } else {
- it.timestamp + duration + k - rem
- }
- } ?: time
-
- reportHostSlice(
- roundedTime,
- 0,
- 0,
- 0,
- 0,
- 0.0,
- 0.0,
- 0,
- host
- )
- }
-
- private val lastPowerConsumption = mutableMapOf<Host, Double>()
-
- override fun reportPowerConsumption(host: Host, draw: Double) {
- lastPowerConsumption[host] = draw
}
override fun reportHostSlice(
@@ -106,69 +65,26 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
interferedBurst: Long,
cpuUsage: Double,
cpuDemand: Double,
+ powerDraw: Double,
numberOfDeployedImages: Int,
- host: Host,
- duration: Long
+ host: Host
) {
- val previousEvent = currentHostEvent[host]
- when {
- previousEvent == null -> {
- val event = HostEvent(
- time,
- 5 * 60 * 1000L,
- host,
- numberOfDeployedImages,
- requestedBurst,
- grantedBurst,
- overcommissionedBurst,
- interferedBurst,
- cpuUsage,
- cpuDemand,
- lastPowerConsumption[host] ?: 200.0,
- host.model.cpuCount
- )
-
- currentHostEvent[host] = event
- }
- previousEvent.timestamp == time -> {
- val event = HostEvent(
- time,
- previousEvent.duration,
- host,
- numberOfDeployedImages,
- requestedBurst,
- grantedBurst,
- overcommissionedBurst,
- interferedBurst,
- cpuUsage,
- cpuDemand,
- lastPowerConsumption[host] ?: 200.0,
- host.model.cpuCount
- )
-
- currentHostEvent[host] = event
- }
- else -> {
- hostWriter.write(previousEvent)
-
- val event = HostEvent(
- time,
- time - previousEvent.timestamp,
- host,
- numberOfDeployedImages,
- requestedBurst,
- grantedBurst,
- overcommissionedBurst,
- interferedBurst,
- cpuUsage,
- cpuDemand,
- lastPowerConsumption[host] ?: 200.0,
- host.model.cpuCount
- )
-
- currentHostEvent[host] = event
- }
- }
+ hostWriter.write(
+ HostEvent(
+ time,
+ 5 * 60 * 1000L,
+ host,
+ numberOfDeployedImages,
+ requestedBurst,
+ grantedBurst,
+ overcommissionedBurst,
+ interferedBurst,
+ cpuUsage,
+ cpuDemand,
+ powerDraw,
+ host.model.cpuCount
+ )
+ )
}
override fun reportProvisionerMetrics(
@@ -196,12 +112,6 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
}
override fun close() {
- // Flush remaining events
- for ((_, event) in currentHostEvent) {
- hostWriter.write(event)
- }
- currentHostEvent.clear()
-
hostWriter.close()
provisionerWriter.close()
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt
index 4a3e7963..c8fe1cb2 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt
@@ -52,7 +52,7 @@ public class ParquetHostEventWriter(path: File, bufferSize: Int) :
record.put("interfered_burst", event.interferedBurst)
record.put("cpu_usage", event.cpuUsage)
record.put("cpu_demand", event.cpuDemand)
- record.put("power_draw", event.powerDraw * (1.0 / 12))
+ record.put("power_draw", event.powerDraw)
record.put("cores", event.cores)
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt
index 9ab69572..c5294b55 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt
@@ -57,8 +57,8 @@ private val logger = KotlinLogging.logger {}
@OptIn(ExperimentalStdlibApi::class)
public class Sc20StreamingParquetTraceReader(
traceFile: File,
- performanceInterferenceModel: PerformanceInterferenceModel,
- selectedVms: List<String>,
+ performanceInterferenceModel: PerformanceInterferenceModel? = null,
+ selectedVms: List<String> = emptyList(),
random: Random
) : TraceReader<SimWorkload> {
/**
@@ -229,20 +229,26 @@ public class Sc20StreamingParquetTraceReader(
buffers.remove(id)
}
val relevantPerformanceInterferenceModelItems =
- PerformanceInterferenceModel(
- performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSortedSet(),
- Random(random.nextInt())
- )
+ if (performanceInterferenceModel != null)
+ PerformanceInterferenceModel(
+ performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSortedSet(),
+ Random(random.nextInt())
+ )
+ else
+ null
val workload = SimTraceWorkload(fragments)
+ val meta = mapOf(
+ "cores" to maxCores,
+ "required-memory" to requiredMemory,
+ "workload" to workload
+ )
TraceEntry(
uid, id, submissionTime, workload,
- mapOf(
- IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems,
- "cores" to maxCores,
- "required-memory" to requiredMemory,
- "workload" to workload
- )
+ if (performanceInterferenceModel != null)
+ meta + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems as Any)
+ else
+ meta
)
}
.sortedBy { it.start }
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 0441cfed..d2e7473f 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -118,9 +118,9 @@ class CapelinIntegrationTest {
{ assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") },
{ assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") },
{ assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") },
- { assertEquals(1672916917970, monitor.totalRequestedBurst) { "Incorrect requested burst" } },
- { assertEquals(434262255818, monitor.totalGrantedBurst) { "Incorrect granted burst" } },
- { assertEquals(1236692477983, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } },
+ { assertEquals(207388095207, monitor.totalRequestedBurst) { "Incorrect requested burst" } },
+ { assertEquals(204745144701, monitor.totalGrantedBurst) { "Incorrect granted burst" } },
+ { assertEquals(2642950497, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } },
{ assertEquals(0, monitor.totalInterferedBurst) { "Incorrect interfered burst" } }
)
}
@@ -156,9 +156,9 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(702636229989, monitor.totalRequestedBurst) { "Total requested work incorrect" } },
- { assertEquals(172636987071, monitor.totalGrantedBurst) { "Total granted work incorrect" } },
- { assertEquals(528959213229, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } },
+ { assertEquals(96350072517, monitor.totalRequestedBurst) { "Total requested work incorrect" } },
+ { assertEquals(96330335057, monitor.totalGrantedBurst) { "Total granted work incorrect" } },
+ { assertEquals(19737460, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } },
{ assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } }
)
}
@@ -197,9 +197,9 @@ class CapelinIntegrationTest {
interferedBurst: Long,
cpuUsage: Double,
cpuDemand: Double,
+ powerDraw: Double,
numberOfDeployedImages: Int,
host: Host,
- duration: Long
) {
totalRequestedBurst += requestedBurst
totalGrantedBurst += grantedBurst