summaryrefslogtreecommitdiff
path: root/opendc-experiments/opendc-experiments-compute/src/main
diff options
context:
space:
mode:
authorDante Niewenhuis <d.niewenhuis@hotmail.com>2023-11-14 13:28:02 +0100
committerDante Niewenhuis <d.niewenhuis@hotmail.com>2023-11-14 13:32:46 +0100
commitd823cd1eb16d175fb778c9f6c9282aa16f1a25ff (patch)
tree28f0461d8ccbac4db69e6ec748554b879b62f179 /opendc-experiments/opendc-experiments-compute/src/main
parent79c1818e116a7ac72d5210865a528538800bb794 (diff)
Updated TraceReader, Simulation now continues until all tasks are done
Diffstat (limited to 'opendc-experiments/opendc-experiments-compute/src/main')
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkloadLoader.kt22
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt33
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt57
3 files changed, 57 insertions, 55 deletions
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkloadLoader.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkloadLoader.kt
index 9aa544cb..e8596218 100644
--- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkloadLoader.kt
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkloadLoader.kt
@@ -45,6 +45,8 @@ import java.io.File
import java.lang.ref.SoftReference
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
+import java.time.Duration
+import java.time.Instant
import kotlin.math.max
import kotlin.math.roundToLong
@@ -82,14 +84,12 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
while (reader.nextRow()) {
val id = reader.getString(idCol)!!
val time = reader.getInstant(timestampCol)!!
- val duration = reader.getDuration(durationCol)!!
+ val durationMs = reader.getDuration(durationCol)!!
val cores = reader.getInt(coresCol)
val cpuUsage = reader.getDouble(usageCol)
- val deadlineMs = time.toEpochMilli()
- val timeMs = (time - duration).toEpochMilli()
val builder = fragments.computeIfAbsent(id) { Builder() }
- builder.add(timeMs, deadlineMs, cpuUsage, cores)
+ builder.add(time, durationMs, cpuUsage, cores)
}
fragments
@@ -246,17 +246,17 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
* @param usage CPU usage of this fragment.
* @param cores Number of cores used.
*/
- fun add(timestamp: Long, deadline: Long, usage: Double, cores: Int) {
- val duration = max(0, deadline - timestamp)
- totalLoad += (usage * duration) / 1000.0 // avg MHz * duration = MFLOPs
+ fun add(deadline: Instant, duration: Duration, usage: Double, cores: Int) {
+ val startTimeMs = (deadline - duration).toEpochMilli()
+ totalLoad += (usage * duration.toMillis()) / 1000.0 // avg MHz * duration = MFLOPs
- if (timestamp != previousDeadline) {
+ if ((startTimeMs != previousDeadline) && (previousDeadline != Long.MIN_VALUE)) {
// There is a gap between the previous and current fragment; fill the gap
- builder.add(timestamp, 0.0, cores)
+ builder.add(startTimeMs, 0.0, cores)
}
- builder.add(deadline, usage, cores)
- previousDeadline = deadline
+ builder.add(deadline.toEpochMilli(), usage, cores)
+ previousDeadline = deadline.toEpochMilli()
}
/**
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt
index 36daa756..059985a8 100644
--- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt
@@ -46,6 +46,11 @@ public class RunningServerWatcher: ServerWatcher {
_mutex.lock()
}
+ public suspend fun wait () {
+ // TODO: look at the better way to wait for an unlock
+ this.lock();
+ }
+
override fun onStateChanged(server: Server, newState: ServerState) {
when (newState) {
ServerState.TERMINATED -> {
@@ -92,25 +97,26 @@ public suspend fun ComputeService.replay(
// Start the fault injector
injector?.start()
- var offset = Long.MIN_VALUE
+ var simulationOffset = Long.MIN_VALUE
for (entry in trace.sortedBy { it.startTime }) {
val now = clock.millis()
val start = entry.startTime.toEpochMilli()
- if (offset < 0) {
- offset = start - now
+ // Set the simulationOffset based on the starting time of the first server
+ if (simulationOffset == Long.MIN_VALUE) {
+ simulationOffset = start - now
}
// Make sure the trace entries are ordered by submission time
- assert(start - offset >= 0) { "Invalid trace order" }
+// assert(start - simulationOffset >= 0) { "Invalid trace order" }
+ // Delay the server based on the startTime given by the trace.
if (!submitImmediately) {
- delay(max(0, (start - offset) - now))
+ delay(max(0, (start - now - simulationOffset)));
}
- val workloadOffset = -offset + 300001
- val workload = entry.trace.createWorkload(workloadOffset)
+ val workload = entry.trace.createWorkload(start)
val meta = mutableMapOf<String, Any>("workload" to workload)
val interferenceProfile = entry.interferenceProfile
@@ -131,23 +137,18 @@ public suspend fun ComputeService.replay(
meta = meta
)
-
- // Wait for the server reach its end time
-// val endTime = entry.stopTime.toEpochMilli()
-// delay(endTime + workloadOffset - clock.millis() + (5 * 60 * 10000))
-
val serverWatcher = RunningServerWatcher()
-
serverWatcher.lock()
server.watch(serverWatcher)
- serverWatcher.lock()
+
+ // Wait until the server is terminated
+ serverWatcher.wait()
// Stop the server after reaching the end-time of the virtual machine
- server.stop()
+ server.delete()
}
}
}
-
yield()
} finally {
injector?.close()
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt
index de0f0b7c..ff3bfb8e 100644
--- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt
@@ -79,46 +79,47 @@ public class ComputeMetricReader(
*/
private val job = scope.launch {
val intervalMs = exportInterval.toMillis()
- val service = service
- val monitor = monitor
- val hostTableReaders = hostTableReaders
- val serverTableReaders = serverTableReaders
- val serviceTableReader = serviceTableReader
-
try {
while (isActive) {
delay(intervalMs)
- try {
- val now = clock.instant()
-
- for (host in service.hosts) {
- val reader = hostTableReaders.computeIfAbsent(host) { HostTableReaderImpl(it) }
- reader.record(now)
- monitor.record(reader.copy())
- reader.reset()
- }
-
- for (server in service.servers) {
- val reader = serverTableReaders.computeIfAbsent(server) { ServerTableReaderImpl(service, it) }
- reader.record(now)
- monitor.record(reader)
- reader.reset()
- }
-
- serviceTableReader.record(now)
- monitor.record(serviceTableReader)
- } catch (cause: Throwable) {
- logger.warn(cause) { "Exporter threw an Exception" }
- }
+ loggState()
}
+
} finally {
+ loggState()
+
if (monitor is AutoCloseable) {
monitor.close()
}
}
}
+ private fun loggState() {
+ try {
+ val now = this.clock.instant()
+
+ for (host in this.service.hosts) {
+ val reader = this.hostTableReaders.computeIfAbsent(host) { HostTableReaderImpl(it) }
+ reader.record(now)
+ this.monitor.record(reader.copy())
+ reader.reset()
+ }
+
+ for (server in this.service.servers) {
+ val reader = this.serverTableReaders.computeIfAbsent(server) { ServerTableReaderImpl(service, it) }
+ reader.record(now)
+ this.monitor.record(reader)
+ reader.reset()
+ }
+
+ this.serviceTableReader.record(now)
+ monitor.record(this.serviceTableReader)
+ } catch (cause: Throwable) {
+ this.logger.warn(cause) { "Exporter threw an Exception" }
+ }
+ }
+
override fun close() {
job.cancel()
}