diff options
Diffstat (limited to 'opendc-experiments/opendc-experiments-compute/src/main')
4 files changed, 121 insertions, 57 deletions
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkloadLoader.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkloadLoader.kt index 9aa544cb..e8596218 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkloadLoader.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeWorkloadLoader.kt @@ -45,6 +45,8 @@ import java.io.File import java.lang.ref.SoftReference import java.util.UUID import java.util.concurrent.ConcurrentHashMap +import java.time.Duration +import java.time.Instant import kotlin.math.max import kotlin.math.roundToLong @@ -82,14 +84,12 @@ public class ComputeWorkloadLoader(private val baseDir: File) { while (reader.nextRow()) { val id = reader.getString(idCol)!! val time = reader.getInstant(timestampCol)!! - val duration = reader.getDuration(durationCol)!! + val durationMs = reader.getDuration(durationCol)!! val cores = reader.getInt(coresCol) val cpuUsage = reader.getDouble(usageCol) - val deadlineMs = time.toEpochMilli() - val timeMs = (time - duration).toEpochMilli() val builder = fragments.computeIfAbsent(id) { Builder() } - builder.add(timeMs, deadlineMs, cpuUsage, cores) + builder.add(time, durationMs, cpuUsage, cores) } fragments @@ -246,17 +246,17 @@ public class ComputeWorkloadLoader(private val baseDir: File) { * @param usage CPU usage of this fragment. * @param cores Number of cores used. */ - fun add(timestamp: Long, deadline: Long, usage: Double, cores: Int) { - val duration = max(0, deadline - timestamp) - totalLoad += (usage * duration) / 1000.0 // avg MHz * duration = MFLOPs + fun add(deadline: Instant, duration: Duration, usage: Double, cores: Int) { + val startTimeMs = (deadline - duration).toEpochMilli() + totalLoad += (usage * duration.toMillis()) / 1000.0 // avg MHz * duration = MFLOPs - if (timestamp != previousDeadline) { + if ((startTimeMs != previousDeadline) && (previousDeadline != Long.MIN_VALUE)) { // There is a gap between the previous and current fragment; fill the gap - builder.add(timestamp, 0.0, cores) + builder.add(startTimeMs, 0.0, cores) } - builder.add(deadline, usage, cores) - previousDeadline = deadline + builder.add(deadline.toEpochMilli(), usage, cores) + previousDeadline = deadline.toEpochMilli() } /** diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt index 5d4c88cd..059985a8 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt @@ -27,13 +27,47 @@ package org.opendc.experiments.compute import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.yield +import org.opendc.compute.api.Server +import org.opendc.compute.api.ServerState +import org.opendc.compute.api.ServerWatcher import org.opendc.compute.service.ComputeService import java.time.InstantSource import java.util.Random import kotlin.coroutines.coroutineContext import kotlin.math.max +public class RunningServerWatcher: ServerWatcher { + + private val _mutex: Mutex = Mutex(); + + public suspend fun lock () { + _mutex.lock() + } + + public suspend fun wait () { + // TODO: look at the better way to wait for an unlock + this.lock(); + } + + override fun onStateChanged(server: Server, newState: ServerState) { + when (newState) { + ServerState.TERMINATED -> { + _mutex.unlock() + } + ServerState.ERROR -> { + _mutex.unlock() + } + ServerState.DELETED -> { + _mutex.unlock() + } + else -> {} + } + } + +} + /** * Helper method to replay the specified list of [VirtualMachine] and suspend execution util all VMs have finished. * @@ -63,25 +97,26 @@ public suspend fun ComputeService.replay( // Start the fault injector injector?.start() - var offset = Long.MIN_VALUE + var simulationOffset = Long.MIN_VALUE for (entry in trace.sortedBy { it.startTime }) { val now = clock.millis() val start = entry.startTime.toEpochMilli() - if (offset < 0) { - offset = start - now + // Set the simulationOffset based on the starting time of the first server + if (simulationOffset == Long.MIN_VALUE) { + simulationOffset = start - now } // Make sure the trace entries are ordered by submission time - assert(start - offset >= 0) { "Invalid trace order" } +// assert(start - simulationOffset >= 0) { "Invalid trace order" } + // Delay the server based on the startTime given by the trace. if (!submitImmediately) { - delay(max(0, (start - offset) - now)) + delay(max(0, (start - now - simulationOffset))); } - val workloadOffset = -offset + 300001 - val workload = entry.trace.createWorkload(workloadOffset) + val workload = entry.trace.createWorkload(start) val meta = mutableMapOf<String, Any>("workload" to workload) val interferenceProfile = entry.interferenceProfile @@ -102,16 +137,18 @@ public suspend fun ComputeService.replay( meta = meta ) - // Wait for the server reach its end time - val endTime = entry.stopTime.toEpochMilli() - delay(endTime + workloadOffset - clock.millis() + (5 * 60 * 10000)) + val serverWatcher = RunningServerWatcher() + serverWatcher.lock() + server.watch(serverWatcher) + + // Wait until the server is terminated + serverWatcher.wait() // Stop the server after reaching the end-time of the virtual machine - server.stop() + server.delete() } } } - yield() } finally { injector?.close() diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt index de0f0b7c..e8cd3a55 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt @@ -79,46 +79,47 @@ public class ComputeMetricReader( */ private val job = scope.launch { val intervalMs = exportInterval.toMillis() - val service = service - val monitor = monitor - val hostTableReaders = hostTableReaders - val serverTableReaders = serverTableReaders - val serviceTableReader = serviceTableReader - try { while (isActive) { delay(intervalMs) - try { - val now = clock.instant() - - for (host in service.hosts) { - val reader = hostTableReaders.computeIfAbsent(host) { HostTableReaderImpl(it) } - reader.record(now) - monitor.record(reader.copy()) - reader.reset() - } - - for (server in service.servers) { - val reader = serverTableReaders.computeIfAbsent(server) { ServerTableReaderImpl(service, it) } - reader.record(now) - monitor.record(reader) - reader.reset() - } - - serviceTableReader.record(now) - monitor.record(serviceTableReader) - } catch (cause: Throwable) { - logger.warn(cause) { "Exporter threw an Exception" } - } + loggState() } + } finally { +// loggState() + if (monitor is AutoCloseable) { monitor.close() } } } + private fun loggState() { + try { + val now = this.clock.instant() + + for (host in this.service.hosts) { + val reader = this.hostTableReaders.computeIfAbsent(host) { HostTableReaderImpl(it) } + reader.record(now) + this.monitor.record(reader.copy()) + reader.reset() + } + + for (server in this.service.servers) { + val reader = this.serverTableReaders.computeIfAbsent(server) { ServerTableReaderImpl(service, it) } + reader.record(now) + this.monitor.record(reader.copy()) + reader.reset() + } + + this.serviceTableReader.record(now) + monitor.record(this.serviceTableReader.copy()) + } catch (cause: Throwable) { + this.logger.warn(cause) { "Exporter threw an Exception" } + } + } + override fun close() { job.cancel() } @@ -127,6 +128,27 @@ public class ComputeMetricReader( * An aggregator for service metrics before they are reported. */ private class ServiceTableReaderImpl(private val service: ComputeService) : ServiceTableReader { + + override fun copy(): ServiceTableReader { + val newServiceTable = ServiceTableReaderImpl(service) + newServiceTable.setValues(this) + + return newServiceTable + } + + override fun setValues(table: ServiceTableReader) { + _timestamp = table.timestamp + + _hostsUp = table.hostsUp + _hostsDown = table.hostsDown + _serversTotal = table.serversTotal + _serversPending = table.serversPending + _serversActive = table.serversActive + _attemptsSuccess = table.attemptsSuccess + _attemptsFailure = table.attemptsFailure + _attemptsError = table.attemptsError + } + private var _timestamp: Instant = Instant.MIN override val timestamp: Instant get() = _timestamp @@ -362,18 +384,18 @@ public class ComputeMetricReader( } override fun setValues(table: ServerTableReader) { + host = table.host + _timestamp = table.timestamp - _uptime = table.uptime - _downtime = table.downtime - _provisionTime = table.provisionTime - _bootTime = table.bootTime _cpuLimit = table.cpuLimit _cpuActiveTime = table.cpuActiveTime _cpuIdleTime = table.cpuIdleTime _cpuStealTime = table.cpuStealTime _cpuLostTime = table.cpuLostTime - - host = table.host + _uptime = table.uptime + _downtime = table.downtime + _provisionTime = table.provisionTime + _bootTime = table.bootTime } private val _server = server diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServiceTableReader.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServiceTableReader.kt index bb926298..a077a476 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServiceTableReader.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServiceTableReader.kt @@ -28,6 +28,11 @@ import java.time.Instant * An interface that is used to read a row of a service trace entry. */ public interface ServiceTableReader { + + public fun copy(): ServiceTableReader + + public fun setValues(table: ServiceTableReader) + /** * The timestamp of the current entry of the reader. */ |
