summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGeorgios Andreadis <info@gandreadis.com>2020-03-10 14:50:53 +0100
committerGeorgios Andreadis <info@gandreadis.com>2020-03-11 12:47:56 +0100
commitb67045d699d0d6d33df9a7ee4ee99148528a936a (patch)
tree4ef62992ac839781e265ced67fd6b42f448f5e38
parent1ccfcb28bb91c9dc456a1f324a0be6300086eb28 (diff)
Write parsers for internal (proprietary) traces and environment files
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt14
-rw-r--r--opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt130
-rw-r--r--opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt163
-rw-r--r--opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt2
4 files changed, 301 insertions, 8 deletions
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
index 76f7b600..b48abf2e 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
@@ -33,9 +33,9 @@ import com.atlarge.opendc.compute.core.monitor.ServerMonitor
import com.atlarge.opendc.compute.metal.service.ProvisioningService
import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService
import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy
-import com.atlarge.opendc.format.environment.sc20.Sc20EnvironmentReader
+import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader
-import com.atlarge.opendc.format.trace.vm.VmTraceReader
+import com.atlarge.opendc.format.trace.sc20.Sc20TraceReader
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
@@ -47,8 +47,8 @@ import kotlin.math.max
* Main entry point of the experiment.
*/
fun main(args: Array<String>) {
- if (args.isEmpty()) {
- println("error: Please provide path to directory containing VM trace files")
+ if (args.size < 2) {
+ println("error: Please provide path to directory containing VM trace files and the path to the environment file")
return
}
@@ -64,7 +64,7 @@ fun main(args: Array<String>) {
val root = system.newDomain("root")
root.launch {
- val environment = Sc20EnvironmentReader(object {}.javaClass.getResourceAsStream("/env/setup-small.json"))
+ val environment = Sc20ClusterEnvironmentReader(File(args[1]))
.use { it.construct(root) }
val performanceInterferenceModel = Sc20PerformanceInterferenceReader(
@@ -80,8 +80,8 @@ fun main(args: Array<String>) {
hypervisorMonitor
)
- val reader = VmTraceReader(File(args[0]), performanceInterferenceModel)
- delay(1376314846 * 1000L)
+ val reader = Sc20TraceReader(File(args[0]), performanceInterferenceModel)
+// delay(1376314846 * 1000L)
while (reader.hasNext()) {
val (time, workload) = reader.next()
delay(max(0, time * 1000 - simulationContext.clock.millis()))
diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt
new file mode 100644
index 00000000..96e84976
--- /dev/null
+++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt
@@ -0,0 +1,130 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.format.environment.sc20
+
+import com.atlarge.odcsim.Domain
+import com.atlarge.opendc.compute.core.MemoryUnit
+import com.atlarge.opendc.compute.core.ProcessingNode
+import com.atlarge.opendc.compute.core.ProcessingUnit
+import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver
+import com.atlarge.opendc.compute.metal.service.ProvisioningService
+import com.atlarge.opendc.compute.metal.service.SimpleProvisioningService
+import com.atlarge.opendc.core.Environment
+import com.atlarge.opendc.core.Platform
+import com.atlarge.opendc.core.Zone
+import com.atlarge.opendc.core.services.ServiceRegistryImpl
+import com.atlarge.opendc.format.environment.EnvironmentReader
+import java.io.BufferedReader
+import java.io.File
+import java.io.FileReader
+import java.util.UUID
+
+/**
+ * A [EnvironmentReader] for the internal environment format.
+ *
+ * @param environmentFile The file describing the physical cluster.
+ */
+class Sc20ClusterEnvironmentReader(
+ private val environmentFile: File
+) : EnvironmentReader {
+ @Suppress("BlockingMethodInNonBlockingContext")
+ override suspend fun construct(dom: Domain): Environment {
+ var clusterIdCol = 0
+ var speedCol = 0
+ var numberOfHostsCol = 0
+ var memoryPerHostCol = 0
+ var coresPerHostCol = 0
+
+ var clusterId: String
+ var speed: Double
+ var numberOfHosts: Int
+ var memoryPerHost: Long
+ var coresPerHost: Int
+
+ val nodes = mutableListOf<SimpleBareMetalDriver>()
+
+ BufferedReader(FileReader(environmentFile)).use { reader ->
+ reader.lineSequence()
+ .filter { line ->
+ // Ignore comments in the file
+ !line.startsWith("#") && line.isNotBlank()
+ }
+ .forEachIndexed { idx, line ->
+ val values = line.split(";")
+
+ if (idx == 0) {
+ val header = values.mapIndexed { col, name -> Pair(name.trim(), col) }.toMap()
+ clusterIdCol = header["ClusterID"]!!
+ speedCol = header["Speed"]!!
+ numberOfHostsCol = header["numberOfHosts"]!!
+ memoryPerHostCol = header["memoryCapacityPerHost"]!!
+ coresPerHostCol = header["coreCountPerHost"]!!
+ return@forEachIndexed
+ }
+
+ clusterId = values[clusterIdCol].trim()
+ speed = values[speedCol].trim().toDouble() * 1000.0
+ numberOfHosts = values[numberOfHostsCol].trim().toInt()
+ memoryPerHost = values[memoryPerHostCol].trim().toLong() * 1000L
+ coresPerHost = values[coresPerHostCol].trim().toInt()
+
+ val unknownProcessingNode = ProcessingNode("unknown", "unknown", "unknown", coresPerHost)
+ val unknownMemoryUnit = MemoryUnit("unknown", "unknown", -1.0, memoryPerHost)
+
+ repeat(numberOfHosts) {
+ nodes.add(
+ SimpleBareMetalDriver(
+ UUID.randomUUID(),
+ "node-${clusterId}-${it}",
+ List(coresPerHost) { coreId ->
+ ProcessingUnit(unknownProcessingNode, coreId, speed)
+ },
+ listOf(unknownMemoryUnit),
+ dom.newDomain("node-${clusterId}-${it}")
+ )
+ )
+ }
+ }
+ }
+
+ val provisioningService = SimpleProvisioningService(dom.newDomain("provisioner"))
+ for (node in nodes) {
+ provisioningService.create(node)
+ }
+
+ val serviceRegistry = ServiceRegistryImpl()
+ serviceRegistry[ProvisioningService.Key] = provisioningService
+
+ val platform = Platform(
+ UUID.randomUUID(), "sc20-platform", listOf(
+ Zone(UUID.randomUUID(), "zone", serviceRegistry)
+ )
+ )
+
+ return Environment("SC20 Environment", null, listOf(platform))
+ }
+
+ override fun close() {}
+}
diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt
new file mode 100644
index 00000000..d4656823
--- /dev/null
+++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt
@@ -0,0 +1,163 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.format.trace.sc20
+
+import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment
+import com.atlarge.opendc.compute.core.image.VmImage
+import com.atlarge.opendc.compute.core.workload.VmWorkload
+import com.atlarge.opendc.core.User
+import com.atlarge.opendc.core.workload.IMAGE_PERF_INTERFERENCE_MODEL
+import com.atlarge.opendc.core.workload.PerformanceInterferenceModel
+import com.atlarge.opendc.format.trace.TraceEntry
+import com.atlarge.opendc.format.trace.TraceReader
+import java.io.BufferedReader
+import java.io.File
+import java.io.FileReader
+import java.util.UUID
+
+/**
+ * A [TraceReader] for the internal VM workload trace format.
+ *
+ * @param traceDirectory The directory of the traces.
+ * @param performanceInterferenceModel The performance model covering the workload in the VM trace.
+ */
+class Sc20TraceReader(
+ traceDirectory: File,
+ performanceInterferenceModel: PerformanceInterferenceModel
+) : TraceReader<VmWorkload> {
+ /**
+ * The internal iterator to use for this reader.
+ */
+ private val iterator: Iterator<TraceEntry<VmWorkload>>
+
+ /**
+ * Initialize the reader.
+ */
+ init {
+ val entries = mutableMapOf<String, TraceEntry<VmWorkload>>()
+
+ val timestampCol = 0
+ val cpuUsageCol = 1
+ val coreCol = 12
+ val vmIdCol = 19
+ val provisionedMemoryCol = 20
+ val traceInterval = 5 * 60 * 1000L
+
+ traceDirectory.walk()
+ .filterNot { it.isDirectory }
+ .filter { it.extension == "csv" || it.extension == "txt" }
+ .forEach { vmFile ->
+ println(vmFile)
+ val flopsHistory = mutableListOf<FlopsHistoryFragment>()
+ var vmId = ""
+ var cores = -1
+ var requiredMemory = -1L
+
+ BufferedReader(FileReader(vmFile)).use { reader ->
+ reader.lineSequence()
+ .filter { line ->
+ // Ignore comments in the trace
+ !line.startsWith("#") && line.isNotBlank()
+ }
+ .forEach { line ->
+ val values = line.split(" ")
+
+ vmId = values[vmIdCol].trim()
+ val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L
+ cores = values[coreCol].trim().toInt()
+ val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz
+ requiredMemory = (values[provisionedMemoryCol].trim().toDouble() / 1000).toLong()
+
+ val flops: Long = (cpuUsage * 1_000_000L * 5 * 60 * cores).toLong()
+
+ if (flopsHistory.isEmpty()) {
+ flopsHistory.add(FlopsHistoryFragment(timestamp, flops, traceInterval, cpuUsage))
+ } else {
+ if (flopsHistory.last().flops != flops) {
+ flopsHistory.add(FlopsHistoryFragment(timestamp, flops, traceInterval, cpuUsage))
+ } else {
+ val oldFragment = flopsHistory.removeAt(flopsHistory.size - 1)
+ flopsHistory.add(
+ FlopsHistoryFragment(
+ oldFragment.tick,
+ oldFragment.flops + flops,
+ oldFragment.duration + traceInterval,
+ cpuUsage
+ )
+ )
+ }
+ }
+ }
+ }
+
+ val uuid = UUID(0L, vmId.hashCode().toLong())
+
+ val relevantPerformanceInterferenceModelItems = PerformanceInterferenceModel(
+ performanceInterferenceModel.items.filter { it.workloadIds.contains(uuid) }.toSet()
+ )
+
+ val vmWorkload = VmWorkload(
+ uuid, "VM Workload $vmId", UnnamedUser,
+ VmImage(
+ uuid,
+ vmId,
+ mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems),
+ flopsHistory,
+ cores,
+ requiredMemory
+ )
+ )
+ entries[vmId] = TraceEntryImpl(
+ flopsHistory.firstOrNull()?.tick ?: -1,
+ vmWorkload
+ )
+ }
+
+ // Create the entry iterator
+ iterator = entries.values.sortedBy { it.submissionTime }.iterator()
+ }
+
+ override fun hasNext(): Boolean = iterator.hasNext()
+
+ override fun next(): TraceEntry<VmWorkload> = iterator.next()
+
+ override fun close() {}
+
+ /**
+ * An unnamed user.
+ */
+ private object UnnamedUser : User {
+ override val name: String = "<unnamed>"
+ override val uid: UUID = UUID.randomUUID()
+ }
+
+ /**
+ * An entry in the trace.
+ */
+ private data class TraceEntryImpl(
+ override var submissionTime: Long,
+ override val workload: VmWorkload
+ ) : TraceEntry<VmWorkload>
+}
diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt
index 2adf99b1..b4964b26 100644
--- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt
+++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt
@@ -38,7 +38,7 @@ import java.io.FileReader
import java.util.UUID
/**
- * A [TraceReader] for the VM workload trace format.
+ * A [TraceReader] for the public VM workload trace format.
*
* @param traceDirectory The directory of the traces.
* @param performanceInterferenceModel The performance model covering the workload in the VM trace.