From d823cd1eb16d175fb778c9f6c9282aa16f1a25ff Mon Sep 17 00:00:00 2001 From: Dante Niewenhuis Date: Tue, 14 Nov 2023 13:28:02 +0100 Subject: Updated TraceReader, Simulation now continues until all tasks are done --- .../experiments/compute/ComputeWorkloadLoader.kt | 22 ++++----- .../org/opendc/experiments/compute/TraceHelpers.kt | 33 +++++++------ .../compute/telemetry/ComputeMetricReader.kt | 57 +++++++++++----------- 3 files changed, 57 insertions(+), 55 deletions(-) (limited to 'opendc-experiments/opendc-experiments-compute/src/main') diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkloadLoader.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkloadLoader.kt index 9aa544cb..e8596218 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkloadLoader.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkloadLoader.kt @@ -45,6 +45,8 @@ import java.io.File import java.lang.ref.SoftReference import java.util.UUID import java.util.concurrent.ConcurrentHashMap +import java.time.Duration +import java.time.Instant import kotlin.math.max import kotlin.math.roundToLong @@ -82,14 +84,12 @@ public class ComputeWorkloadLoader(private val baseDir: File) { while (reader.nextRow()) { val id = reader.getString(idCol)!! val time = reader.getInstant(timestampCol)!! - val duration = reader.getDuration(durationCol)!! + val durationMs = reader.getDuration(durationCol)!! val cores = reader.getInt(coresCol) val cpuUsage = reader.getDouble(usageCol) - val deadlineMs = time.toEpochMilli() - val timeMs = (time - duration).toEpochMilli() val builder = fragments.computeIfAbsent(id) { Builder() } - builder.add(timeMs, deadlineMs, cpuUsage, cores) + builder.add(time, durationMs, cpuUsage, cores) } fragments @@ -246,17 +246,17 @@ public class ComputeWorkloadLoader(private val baseDir: File) { * @param usage CPU usage of this fragment. * @param cores Number of cores used. */ - fun add(timestamp: Long, deadline: Long, usage: Double, cores: Int) { - val duration = max(0, deadline - timestamp) - totalLoad += (usage * duration) / 1000.0 // avg MHz * duration = MFLOPs + fun add(deadline: Instant, duration: Duration, usage: Double, cores: Int) { + val startTimeMs = (deadline - duration).toEpochMilli() + totalLoad += (usage * duration.toMillis()) / 1000.0 // avg MHz * duration = MFLOPs - if (timestamp != previousDeadline) { + if ((startTimeMs != previousDeadline) && (previousDeadline != Long.MIN_VALUE)) { // There is a gap between the previous and current fragment; fill the gap - builder.add(timestamp, 0.0, cores) + builder.add(startTimeMs, 0.0, cores) } - builder.add(deadline, usage, cores) - previousDeadline = deadline + builder.add(deadline.toEpochMilli(), usage, cores) + previousDeadline = deadline.toEpochMilli() } /** diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt index 36daa756..059985a8 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt @@ -46,6 +46,11 @@ public class RunningServerWatcher: ServerWatcher { _mutex.lock() } + public suspend fun wait () { + // TODO: look at the better way to wait for an unlock + this.lock(); + } + override fun onStateChanged(server: Server, newState: ServerState) { when (newState) { ServerState.TERMINATED -> { @@ -92,25 +97,26 @@ public suspend fun ComputeService.replay( // Start the fault injector injector?.start() - var offset = Long.MIN_VALUE + var simulationOffset = Long.MIN_VALUE for (entry in trace.sortedBy { it.startTime }) { val now = clock.millis() val start = entry.startTime.toEpochMilli() - if (offset < 0) { - offset = start - now + // Set the simulationOffset based on the starting time of the first server + if (simulationOffset == Long.MIN_VALUE) { + simulationOffset = start - now } // Make sure the trace entries are ordered by submission time - assert(start - offset >= 0) { "Invalid trace order" } +// assert(start - simulationOffset >= 0) { "Invalid trace order" } + // Delay the server based on the startTime given by the trace. if (!submitImmediately) { - delay(max(0, (start - offset) - now)) + delay(max(0, (start - now - simulationOffset))); } - val workloadOffset = -offset + 300001 - val workload = entry.trace.createWorkload(workloadOffset) + val workload = entry.trace.createWorkload(start) val meta = mutableMapOf("workload" to workload) val interferenceProfile = entry.interferenceProfile @@ -131,23 +137,18 @@ public suspend fun ComputeService.replay( meta = meta ) - - // Wait for the server reach its end time -// val endTime = entry.stopTime.toEpochMilli() -// delay(endTime + workloadOffset - clock.millis() + (5 * 60 * 10000)) - val serverWatcher = RunningServerWatcher() - serverWatcher.lock() server.watch(serverWatcher) - serverWatcher.lock() + + // Wait until the server is terminated + serverWatcher.wait() // Stop the server after reaching the end-time of the virtual machine - server.stop() + server.delete() } } } - yield() } finally { injector?.close() diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt index de0f0b7c..ff3bfb8e 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt @@ -79,46 +79,47 @@ public class ComputeMetricReader( */ private val job = scope.launch { val intervalMs = exportInterval.toMillis() - val service = service - val monitor = monitor - val hostTableReaders = hostTableReaders - val serverTableReaders = serverTableReaders - val serviceTableReader = serviceTableReader - try { while (isActive) { delay(intervalMs) - try { - val now = clock.instant() - - for (host in service.hosts) { - val reader = hostTableReaders.computeIfAbsent(host) { HostTableReaderImpl(it) } - reader.record(now) - monitor.record(reader.copy()) - reader.reset() - } - - for (server in service.servers) { - val reader = serverTableReaders.computeIfAbsent(server) { ServerTableReaderImpl(service, it) } - reader.record(now) - monitor.record(reader) - reader.reset() - } - - serviceTableReader.record(now) - monitor.record(serviceTableReader) - } catch (cause: Throwable) { - logger.warn(cause) { "Exporter threw an Exception" } - } + loggState() } + } finally { + loggState() + if (monitor is AutoCloseable) { monitor.close() } } } + private fun loggState() { + try { + val now = this.clock.instant() + + for (host in this.service.hosts) { + val reader = this.hostTableReaders.computeIfAbsent(host) { HostTableReaderImpl(it) } + reader.record(now) + this.monitor.record(reader.copy()) + reader.reset() + } + + for (server in this.service.servers) { + val reader = this.serverTableReaders.computeIfAbsent(server) { ServerTableReaderImpl(service, it) } + reader.record(now) + this.monitor.record(reader) + reader.reset() + } + + this.serviceTableReader.record(now) + monitor.record(this.serviceTableReader) + } catch (cause: Throwable) { + this.logger.warn(cause) { "Exporter threw an Exception" } + } + } + override fun close() { job.cancel() } -- cgit v1.2.3