summaryrefslogtreecommitdiff
path: root/opendc/opendc-format/src/main
diff options
context:
space:
mode:
authorGeorgios Andreadis <info@gandreadis.com>2020-02-17 14:22:30 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-02-20 22:23:38 +0100
commit6660f4170d3afebd7c778dc352cb1a2d55017dc5 (patch)
tree679e08e71dd14efc813030c1606d79ff95489478 /opendc/opendc-format/src/main
parent04e4bddccc4e06a126f3c6ee2878502323c7116e (diff)
feat: Implement VM support
This change adds support for virtual machines and hypervisors to the _opendc-compute_ module. Moreover, this change also includes VM trace reading capabilities.
Diffstat (limited to 'opendc/opendc-format/src/main')
-rw-r--r--opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt10
-rw-r--r--opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt146
2 files changed, 152 insertions, 4 deletions
diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
index 4d2f9e85..55061492 100644
--- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
+++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
@@ -37,9 +37,9 @@ import com.atlarge.opendc.format.environment.EnvironmentReader
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
+import kotlinx.coroutines.runBlocking
import java.io.InputStream
import java.util.UUID
-import kotlinx.coroutines.runBlocking
/**
* A parser for the JSON experiment setup files used for the SC18 paper: "A Reference Architecture for Datacenter
@@ -87,9 +87,11 @@ class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb
val serviceRegistry = ServiceRegistryImpl()
serviceRegistry[ProvisioningService.Key] = provisioningService
- val platform = Platform(UUID.randomUUID(), "sc18-platform", listOf(
- Zone(UUID.randomUUID(), "zone", serviceRegistry)
- ))
+ val platform = Platform(
+ UUID.randomUUID(), "sc18-platform", listOf(
+ Zone(UUID.randomUUID(), "zone", serviceRegistry)
+ )
+ )
environment = Environment(setup.name, null, listOf(platform))
}
diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt
new file mode 100644
index 00000000..b5c6ca0d
--- /dev/null
+++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt
@@ -0,0 +1,146 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.format.trace.vm
+
+import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment
+import com.atlarge.opendc.compute.core.image.VmImage
+import com.atlarge.opendc.compute.core.workload.VmWorkload
+import com.atlarge.opendc.core.User
+import com.atlarge.opendc.format.trace.TraceEntry
+import com.atlarge.opendc.format.trace.TraceReader
+import java.io.BufferedReader
+import java.io.File
+import java.io.FileReader
+import java.util.UUID
+
+/**
+ * A [TraceReader] for the VM workload trace format.
+ *
+ * @param traceDirectory The directory of the traces.
+ */
+class VmTraceReader(traceDirectory: File) : TraceReader<VmWorkload> {
+ /**
+ * The internal iterator to use for this reader.
+ */
+ private val iterator: Iterator<TraceEntry<VmWorkload>>
+
+ /**
+ * Initialize the reader.
+ */
+ init {
+ val entries = mutableMapOf<Long, TraceEntry<VmWorkload>>()
+
+ var timestampCol = 0
+ var coreCol = 0
+ var cpuUsageCol = 0
+ val traceInterval = 5 * 60 * 1000L
+
+ traceDirectory.walk()
+ .filterNot { it.isDirectory }
+ .forEach { vmFile ->
+ println(vmFile)
+ val flopsHistory = mutableListOf<FlopsHistoryFragment>()
+ var vmId = -1L
+ var cores = -1
+ BufferedReader(FileReader(vmFile)).use { reader ->
+ reader.lineSequence()
+ .filter { line ->
+ // Ignore comments in the trace
+ !line.startsWith("#") && line.isNotBlank()
+ }
+ .forEachIndexed { idx, line ->
+ val values = line.split(";\t")
+
+ // Parse GWF header
+ if (idx == 0) {
+ val header = values.mapIndexed { col, name -> Pair(name.trim(), col) }.toMap()
+ timestampCol = header["Timestamp [ms]"]!!
+ coreCol = header["CPU cores"]!!
+ cpuUsageCol = header["CPU usage [MHZ]"]!!
+ return@forEachIndexed
+ }
+
+ vmId = vmFile.nameWithoutExtension.trim().toLong()
+ val timestamp = values[timestampCol].trim().toLong() - 5 * 60
+ cores = values[coreCol].trim().toInt()
+ val cpuUsage = values[cpuUsageCol].trim().toDouble()
+
+ val flops: Long = (cpuUsage * cores * 1_000_000L * 5 * 60).toLong()
+
+ if (flopsHistory.isEmpty()) {
+ flopsHistory.add(FlopsHistoryFragment(timestamp, flops, traceInterval))
+ } else {
+ if (flopsHistory.last().flops != flops) {
+ flopsHistory.add(FlopsHistoryFragment(timestamp, flops, traceInterval))
+ } else {
+ val oldFragment = flopsHistory.removeAt(flopsHistory.size - 1)
+ flopsHistory.add(
+ FlopsHistoryFragment(
+ oldFragment.tick,
+ oldFragment.flops + flops,
+ oldFragment.duration + traceInterval
+ )
+ )
+ }
+ }
+ }
+ }
+
+ val vmWorkload = VmWorkload(
+ UUID(0L, vmId), "<unnamed>", UnnamedUser,
+ VmImage(flopsHistory, cores)
+ )
+ entries[vmId] = TraceEntryImpl(
+ flopsHistory.firstOrNull()?.tick ?: -1,
+ vmWorkload
+ )
+ }
+
+ // Create the entry iterator
+ iterator = entries.values.sortedBy { it.submissionTime }.iterator()
+ }
+
+ override fun hasNext(): Boolean = iterator.hasNext()
+
+ override fun next(): TraceEntry<VmWorkload> = iterator.next()
+
+ override fun close() {}
+
+ /**
+ * An unnamed user.
+ */
+ private object UnnamedUser : User {
+ override val name: String = "<unnamed>"
+ override val uid: UUID = UUID.randomUUID()
+ }
+
+ /**
+ * An entry in the trace.
+ */
+ private data class TraceEntryImpl(
+ override var submissionTime: Long,
+ override val workload: VmWorkload
+ ) : TraceEntry<VmWorkload>
+}