From 6660f4170d3afebd7c778dc352cb1a2d55017dc5 Mon Sep 17 00:00:00 2001 From: Georgios Andreadis Date: Mon, 17 Feb 2020 14:22:30 +0100 Subject: feat: Implement VM support This change adds support for virtual machines and hypervisors to the _opendc-compute_ module. Moreover, this change also includes VM trace reading capabilities. --- .../environment/sc18/Sc18EnvironmentReader.kt | 10 +- .../opendc/format/trace/vm/VmTraceReader.kt | 146 +++++++++++++++++++++ 2 files changed, 152 insertions(+), 4 deletions(-) create mode 100644 opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt (limited to 'opendc/opendc-format/src/main') diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt index 4d2f9e85..55061492 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt @@ -37,9 +37,9 @@ import com.atlarge.opendc.format.environment.EnvironmentReader import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue +import kotlinx.coroutines.runBlocking import java.io.InputStream import java.util.UUID -import kotlinx.coroutines.runBlocking /** * A parser for the JSON experiment setup files used for the SC18 paper: "A Reference Architecture for Datacenter @@ -87,9 +87,11 @@ class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb val serviceRegistry = ServiceRegistryImpl() serviceRegistry[ProvisioningService.Key] = provisioningService - val platform = Platform(UUID.randomUUID(), "sc18-platform", listOf( - Zone(UUID.randomUUID(), "zone", serviceRegistry) - )) + val platform = Platform( + UUID.randomUUID(), "sc18-platform", listOf( + Zone(UUID.randomUUID(), "zone", serviceRegistry) + ) + ) environment = Environment(setup.name, null, listOf(platform)) } diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt new file mode 100644 index 00000000..b5c6ca0d --- /dev/null +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt @@ -0,0 +1,146 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.format.trace.vm + +import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment +import com.atlarge.opendc.compute.core.image.VmImage +import com.atlarge.opendc.compute.core.workload.VmWorkload +import com.atlarge.opendc.core.User +import com.atlarge.opendc.format.trace.TraceEntry +import com.atlarge.opendc.format.trace.TraceReader +import java.io.BufferedReader +import java.io.File +import java.io.FileReader +import java.util.UUID + +/** + * A [TraceReader] for the VM workload trace format. + * + * @param traceDirectory The directory of the traces. + */ +class VmTraceReader(traceDirectory: File) : TraceReader { + /** + * The internal iterator to use for this reader. + */ + private val iterator: Iterator> + + /** + * Initialize the reader. + */ + init { + val entries = mutableMapOf>() + + var timestampCol = 0 + var coreCol = 0 + var cpuUsageCol = 0 + val traceInterval = 5 * 60 * 1000L + + traceDirectory.walk() + .filterNot { it.isDirectory } + .forEach { vmFile -> + println(vmFile) + val flopsHistory = mutableListOf() + var vmId = -1L + var cores = -1 + BufferedReader(FileReader(vmFile)).use { reader -> + reader.lineSequence() + .filter { line -> + // Ignore comments in the trace + !line.startsWith("#") && line.isNotBlank() + } + .forEachIndexed { idx, line -> + val values = line.split(";\t") + + // Parse GWF header + if (idx == 0) { + val header = values.mapIndexed { col, name -> Pair(name.trim(), col) }.toMap() + timestampCol = header["Timestamp [ms]"]!! + coreCol = header["CPU cores"]!! + cpuUsageCol = header["CPU usage [MHZ]"]!! + return@forEachIndexed + } + + vmId = vmFile.nameWithoutExtension.trim().toLong() + val timestamp = values[timestampCol].trim().toLong() - 5 * 60 + cores = values[coreCol].trim().toInt() + val cpuUsage = values[cpuUsageCol].trim().toDouble() + + val flops: Long = (cpuUsage * cores * 1_000_000L * 5 * 60).toLong() + + if (flopsHistory.isEmpty()) { + flopsHistory.add(FlopsHistoryFragment(timestamp, flops, traceInterval)) + } else { + if (flopsHistory.last().flops != flops) { + flopsHistory.add(FlopsHistoryFragment(timestamp, flops, traceInterval)) + } else { + val oldFragment = flopsHistory.removeAt(flopsHistory.size - 1) + flopsHistory.add( + FlopsHistoryFragment( + oldFragment.tick, + oldFragment.flops + flops, + oldFragment.duration + traceInterval + ) + ) + } + } + } + } + + val vmWorkload = VmWorkload( + UUID(0L, vmId), "", UnnamedUser, + VmImage(flopsHistory, cores) + ) + entries[vmId] = TraceEntryImpl( + flopsHistory.firstOrNull()?.tick ?: -1, + vmWorkload + ) + } + + // Create the entry iterator + iterator = entries.values.sortedBy { it.submissionTime }.iterator() + } + + override fun hasNext(): Boolean = iterator.hasNext() + + override fun next(): TraceEntry = iterator.next() + + override fun close() {} + + /** + * An unnamed user. + */ + private object UnnamedUser : User { + override val name: String = "" + override val uid: UUID = UUID.randomUUID() + } + + /** + * An entry in the trace. + */ + private data class TraceEntryImpl( + override var submissionTime: Long, + override val workload: VmWorkload + ) : TraceEntry +} -- cgit v1.2.3