summaryrefslogtreecommitdiff
path: root/opendc-experiments/opendc-experiments-compute/src/main
diff options
context:
space:
mode:
authorDante Niewenhuis <d.niewenhuis@hotmail.com>2023-11-15 15:56:03 +0100
committerGitHub <noreply@github.com>2023-11-15 15:56:03 +0100
commit05141e92582372c816648b94ef3de5e16d1fe5b9 (patch)
tree41e703d5a129948d445d9957392c632ab3f74872 /opendc-experiments/opendc-experiments-compute/src/main
parent2fc71b81ea01072c37ce140d4a47e33a25d65f72 (diff)
parent1513efe07605975cd2f86f0b739bd490e4fc9970 (diff)
Merge pull request #168 from DanteNiewenhuis/greenifier-demo
Updated the simulation to let servers run until they are finished.
Diffstat (limited to 'opendc-experiments/opendc-experiments-compute/src/main')
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkloadLoader.kt22
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt61
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt90
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServiceTableReader.kt5
4 files changed, 121 insertions, 57 deletions
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkloadLoader.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkloadLoader.kt
index 9aa544cb..e8596218 100644
--- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkloadLoader.kt
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkloadLoader.kt
@@ -45,6 +45,8 @@ import java.io.File
import java.lang.ref.SoftReference
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
+import java.time.Duration
+import java.time.Instant
import kotlin.math.max
import kotlin.math.roundToLong
@@ -82,14 +84,12 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
while (reader.nextRow()) {
val id = reader.getString(idCol)!!
val time = reader.getInstant(timestampCol)!!
- val duration = reader.getDuration(durationCol)!!
+ val durationMs = reader.getDuration(durationCol)!!
val cores = reader.getInt(coresCol)
val cpuUsage = reader.getDouble(usageCol)
- val deadlineMs = time.toEpochMilli()
- val timeMs = (time - duration).toEpochMilli()
val builder = fragments.computeIfAbsent(id) { Builder() }
- builder.add(timeMs, deadlineMs, cpuUsage, cores)
+ builder.add(time, durationMs, cpuUsage, cores)
}
fragments
@@ -246,17 +246,17 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
* @param usage CPU usage of this fragment.
* @param cores Number of cores used.
*/
- fun add(timestamp: Long, deadline: Long, usage: Double, cores: Int) {
- val duration = max(0, deadline - timestamp)
- totalLoad += (usage * duration) / 1000.0 // avg MHz * duration = MFLOPs
+ fun add(deadline: Instant, duration: Duration, usage: Double, cores: Int) {
+ val startTimeMs = (deadline - duration).toEpochMilli()
+ totalLoad += (usage * duration.toMillis()) / 1000.0 // avg MHz * duration = MFLOPs
- if (timestamp != previousDeadline) {
+ if ((startTimeMs != previousDeadline) && (previousDeadline != Long.MIN_VALUE)) {
// There is a gap between the previous and current fragment; fill the gap
- builder.add(timestamp, 0.0, cores)
+ builder.add(startTimeMs, 0.0, cores)
}
- builder.add(deadline, usage, cores)
- previousDeadline = deadline
+ builder.add(deadline.toEpochMilli(), usage, cores)
+ previousDeadline = deadline.toEpochMilli()
}
/**
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt
index 5d4c88cd..059985a8 100644
--- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt
@@ -27,13 +27,47 @@ package org.opendc.experiments.compute
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
+import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.yield
+import org.opendc.compute.api.Server
+import org.opendc.compute.api.ServerState
+import org.opendc.compute.api.ServerWatcher
import org.opendc.compute.service.ComputeService
import java.time.InstantSource
import java.util.Random
import kotlin.coroutines.coroutineContext
import kotlin.math.max
+public class RunningServerWatcher: ServerWatcher {
+
+ private val _mutex: Mutex = Mutex();
+
+ public suspend fun lock () {
+ _mutex.lock()
+ }
+
+ public suspend fun wait () {
+ // TODO: look at the better way to wait for an unlock
+ this.lock();
+ }
+
+ override fun onStateChanged(server: Server, newState: ServerState) {
+ when (newState) {
+ ServerState.TERMINATED -> {
+ _mutex.unlock()
+ }
+ ServerState.ERROR -> {
+ _mutex.unlock()
+ }
+ ServerState.DELETED -> {
+ _mutex.unlock()
+ }
+ else -> {}
+ }
+ }
+
+}
+
/**
* Helper method to replay the specified list of [VirtualMachine] and suspend execution util all VMs have finished.
*
@@ -63,25 +97,26 @@ public suspend fun ComputeService.replay(
// Start the fault injector
injector?.start()
- var offset = Long.MIN_VALUE
+ var simulationOffset = Long.MIN_VALUE
for (entry in trace.sortedBy { it.startTime }) {
val now = clock.millis()
val start = entry.startTime.toEpochMilli()
- if (offset < 0) {
- offset = start - now
+ // Set the simulationOffset based on the starting time of the first server
+ if (simulationOffset == Long.MIN_VALUE) {
+ simulationOffset = start - now
}
// Make sure the trace entries are ordered by submission time
- assert(start - offset >= 0) { "Invalid trace order" }
+// assert(start - simulationOffset >= 0) { "Invalid trace order" }
+ // Delay the server based on the startTime given by the trace.
if (!submitImmediately) {
- delay(max(0, (start - offset) - now))
+ delay(max(0, (start - now - simulationOffset)));
}
- val workloadOffset = -offset + 300001
- val workload = entry.trace.createWorkload(workloadOffset)
+ val workload = entry.trace.createWorkload(start)
val meta = mutableMapOf<String, Any>("workload" to workload)
val interferenceProfile = entry.interferenceProfile
@@ -102,16 +137,18 @@ public suspend fun ComputeService.replay(
meta = meta
)
- // Wait for the server reach its end time
- val endTime = entry.stopTime.toEpochMilli()
- delay(endTime + workloadOffset - clock.millis() + (5 * 60 * 10000))
+ val serverWatcher = RunningServerWatcher()
+ serverWatcher.lock()
+ server.watch(serverWatcher)
+
+ // Wait until the server is terminated
+ serverWatcher.wait()
// Stop the server after reaching the end-time of the virtual machine
- server.stop()
+ server.delete()
}
}
}
-
yield()
} finally {
injector?.close()
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt
index de0f0b7c..e8cd3a55 100644
--- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt
@@ -79,46 +79,47 @@ public class ComputeMetricReader(
*/
private val job = scope.launch {
val intervalMs = exportInterval.toMillis()
- val service = service
- val monitor = monitor
- val hostTableReaders = hostTableReaders
- val serverTableReaders = serverTableReaders
- val serviceTableReader = serviceTableReader
-
try {
while (isActive) {
delay(intervalMs)
- try {
- val now = clock.instant()
-
- for (host in service.hosts) {
- val reader = hostTableReaders.computeIfAbsent(host) { HostTableReaderImpl(it) }
- reader.record(now)
- monitor.record(reader.copy())
- reader.reset()
- }
-
- for (server in service.servers) {
- val reader = serverTableReaders.computeIfAbsent(server) { ServerTableReaderImpl(service, it) }
- reader.record(now)
- monitor.record(reader)
- reader.reset()
- }
-
- serviceTableReader.record(now)
- monitor.record(serviceTableReader)
- } catch (cause: Throwable) {
- logger.warn(cause) { "Exporter threw an Exception" }
- }
+ loggState()
}
+
} finally {
+// loggState()
+
if (monitor is AutoCloseable) {
monitor.close()
}
}
}
+ private fun loggState() {
+ try {
+ val now = this.clock.instant()
+
+ for (host in this.service.hosts) {
+ val reader = this.hostTableReaders.computeIfAbsent(host) { HostTableReaderImpl(it) }
+ reader.record(now)
+ this.monitor.record(reader.copy())
+ reader.reset()
+ }
+
+ for (server in this.service.servers) {
+ val reader = this.serverTableReaders.computeIfAbsent(server) { ServerTableReaderImpl(service, it) }
+ reader.record(now)
+ this.monitor.record(reader.copy())
+ reader.reset()
+ }
+
+ this.serviceTableReader.record(now)
+ monitor.record(this.serviceTableReader.copy())
+ } catch (cause: Throwable) {
+ this.logger.warn(cause) { "Exporter threw an Exception" }
+ }
+ }
+
override fun close() {
job.cancel()
}
@@ -127,6 +128,27 @@ public class ComputeMetricReader(
* An aggregator for service metrics before they are reported.
*/
private class ServiceTableReaderImpl(private val service: ComputeService) : ServiceTableReader {
+
+ override fun copy(): ServiceTableReader {
+ val newServiceTable = ServiceTableReaderImpl(service)
+ newServiceTable.setValues(this)
+
+ return newServiceTable
+ }
+
+ override fun setValues(table: ServiceTableReader) {
+ _timestamp = table.timestamp
+
+ _hostsUp = table.hostsUp
+ _hostsDown = table.hostsDown
+ _serversTotal = table.serversTotal
+ _serversPending = table.serversPending
+ _serversActive = table.serversActive
+ _attemptsSuccess = table.attemptsSuccess
+ _attemptsFailure = table.attemptsFailure
+ _attemptsError = table.attemptsError
+ }
+
private var _timestamp: Instant = Instant.MIN
override val timestamp: Instant
get() = _timestamp
@@ -362,18 +384,18 @@ public class ComputeMetricReader(
}
override fun setValues(table: ServerTableReader) {
+ host = table.host
+
_timestamp = table.timestamp
- _uptime = table.uptime
- _downtime = table.downtime
- _provisionTime = table.provisionTime
- _bootTime = table.bootTime
_cpuLimit = table.cpuLimit
_cpuActiveTime = table.cpuActiveTime
_cpuIdleTime = table.cpuIdleTime
_cpuStealTime = table.cpuStealTime
_cpuLostTime = table.cpuLostTime
-
- host = table.host
+ _uptime = table.uptime
+ _downtime = table.downtime
+ _provisionTime = table.provisionTime
+ _bootTime = table.bootTime
}
private val _server = server
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServiceTableReader.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServiceTableReader.kt
index bb926298..a077a476 100644
--- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServiceTableReader.kt
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServiceTableReader.kt
@@ -28,6 +28,11 @@ import java.time.Instant
* An interface that is used to read a row of a service trace entry.
*/
public interface ServiceTableReader {
+
+ public fun copy(): ServiceTableReader
+
+ public fun setValues(table: ServiceTableReader)
+
/**
* The timestamp of the current entry of the reader.
*/