diff options
| author | Georgios Andreadis <info@gandreadis.com> | 2020-03-10 14:50:53 +0100 |
|---|---|---|
| committer | Georgios Andreadis <info@gandreadis.com> | 2020-03-11 12:47:56 +0100 |
| commit | b67045d699d0d6d33df9a7ee4ee99148528a936a (patch) | |
| tree | 4ef62992ac839781e265ced67fd6b42f448f5e38 /opendc | |
| parent | 1ccfcb28bb91c9dc456a1f324a0be6300086eb28 (diff) | |
Write parsers for internal (proprietary) traces and environment files
Diffstat (limited to 'opendc')
4 files changed, 301 insertions, 8 deletions
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index 76f7b600..b48abf2e 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -33,9 +33,9 @@ import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy -import com.atlarge.opendc.format.environment.sc20.Sc20EnvironmentReader +import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader -import com.atlarge.opendc.format.trace.vm.VmTraceReader +import com.atlarge.opendc.format.trace.sc20.Sc20TraceReader import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking @@ -47,8 +47,8 @@ import kotlin.math.max * Main entry point of the experiment. */ fun main(args: Array<String>) { - if (args.isEmpty()) { - println("error: Please provide path to directory containing VM trace files") + if (args.size < 2) { + println("error: Please provide path to directory containing VM trace files and the path to the environment file") return } @@ -64,7 +64,7 @@ fun main(args: Array<String>) { val root = system.newDomain("root") root.launch { - val environment = Sc20EnvironmentReader(object {}.javaClass.getResourceAsStream("/env/setup-small.json")) + val environment = Sc20ClusterEnvironmentReader(File(args[1])) .use { it.construct(root) } val performanceInterferenceModel = Sc20PerformanceInterferenceReader( @@ -80,8 +80,8 @@ fun main(args: Array<String>) { hypervisorMonitor ) - val reader = VmTraceReader(File(args[0]), performanceInterferenceModel) - delay(1376314846 * 1000L) + val reader = Sc20TraceReader(File(args[0]), performanceInterferenceModel) +// delay(1376314846 * 1000L) while (reader.hasNext()) { val (time, workload) = reader.next() delay(max(0, time * 1000 - simulationContext.clock.millis())) diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt new file mode 100644 index 00000000..96e84976 --- /dev/null +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt @@ -0,0 +1,130 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.format.environment.sc20 + +import com.atlarge.odcsim.Domain +import com.atlarge.opendc.compute.core.MemoryUnit +import com.atlarge.opendc.compute.core.ProcessingNode +import com.atlarge.opendc.compute.core.ProcessingUnit +import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver +import com.atlarge.opendc.compute.metal.service.ProvisioningService +import com.atlarge.opendc.compute.metal.service.SimpleProvisioningService +import com.atlarge.opendc.core.Environment +import com.atlarge.opendc.core.Platform +import com.atlarge.opendc.core.Zone +import com.atlarge.opendc.core.services.ServiceRegistryImpl +import com.atlarge.opendc.format.environment.EnvironmentReader +import java.io.BufferedReader +import java.io.File +import java.io.FileReader +import java.util.UUID + +/** + * A [EnvironmentReader] for the internal environment format. + * + * @param environmentFile The file describing the physical cluster. + */ +class Sc20ClusterEnvironmentReader( + private val environmentFile: File +) : EnvironmentReader { + @Suppress("BlockingMethodInNonBlockingContext") + override suspend fun construct(dom: Domain): Environment { + var clusterIdCol = 0 + var speedCol = 0 + var numberOfHostsCol = 0 + var memoryPerHostCol = 0 + var coresPerHostCol = 0 + + var clusterId: String + var speed: Double + var numberOfHosts: Int + var memoryPerHost: Long + var coresPerHost: Int + + val nodes = mutableListOf<SimpleBareMetalDriver>() + + BufferedReader(FileReader(environmentFile)).use { reader -> + reader.lineSequence() + .filter { line -> + // Ignore comments in the file + !line.startsWith("#") && line.isNotBlank() + } + .forEachIndexed { idx, line -> + val values = line.split(";") + + if (idx == 0) { + val header = values.mapIndexed { col, name -> Pair(name.trim(), col) }.toMap() + clusterIdCol = header["ClusterID"]!! + speedCol = header["Speed"]!! + numberOfHostsCol = header["numberOfHosts"]!! + memoryPerHostCol = header["memoryCapacityPerHost"]!! + coresPerHostCol = header["coreCountPerHost"]!! + return@forEachIndexed + } + + clusterId = values[clusterIdCol].trim() + speed = values[speedCol].trim().toDouble() * 1000.0 + numberOfHosts = values[numberOfHostsCol].trim().toInt() + memoryPerHost = values[memoryPerHostCol].trim().toLong() * 1000L + coresPerHost = values[coresPerHostCol].trim().toInt() + + val unknownProcessingNode = ProcessingNode("unknown", "unknown", "unknown", coresPerHost) + val unknownMemoryUnit = MemoryUnit("unknown", "unknown", -1.0, memoryPerHost) + + repeat(numberOfHosts) { + nodes.add( + SimpleBareMetalDriver( + UUID.randomUUID(), + "node-${clusterId}-${it}", + List(coresPerHost) { coreId -> + ProcessingUnit(unknownProcessingNode, coreId, speed) + }, + listOf(unknownMemoryUnit), + dom.newDomain("node-${clusterId}-${it}") + ) + ) + } + } + } + + val provisioningService = SimpleProvisioningService(dom.newDomain("provisioner")) + for (node in nodes) { + provisioningService.create(node) + } + + val serviceRegistry = ServiceRegistryImpl() + serviceRegistry[ProvisioningService.Key] = provisioningService + + val platform = Platform( + UUID.randomUUID(), "sc20-platform", listOf( + Zone(UUID.randomUUID(), "zone", serviceRegistry) + ) + ) + + return Environment("SC20 Environment", null, listOf(platform)) + } + + override fun close() {} +} diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt new file mode 100644 index 00000000..d4656823 --- /dev/null +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt @@ -0,0 +1,163 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.format.trace.sc20 + +import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment +import com.atlarge.opendc.compute.core.image.VmImage +import com.atlarge.opendc.compute.core.workload.VmWorkload +import com.atlarge.opendc.core.User +import com.atlarge.opendc.core.workload.IMAGE_PERF_INTERFERENCE_MODEL +import com.atlarge.opendc.core.workload.PerformanceInterferenceModel +import com.atlarge.opendc.format.trace.TraceEntry +import com.atlarge.opendc.format.trace.TraceReader +import java.io.BufferedReader +import java.io.File +import java.io.FileReader +import java.util.UUID + +/** + * A [TraceReader] for the internal VM workload trace format. + * + * @param traceDirectory The directory of the traces. + * @param performanceInterferenceModel The performance model covering the workload in the VM trace. + */ +class Sc20TraceReader( + traceDirectory: File, + performanceInterferenceModel: PerformanceInterferenceModel +) : TraceReader<VmWorkload> { + /** + * The internal iterator to use for this reader. + */ + private val iterator: Iterator<TraceEntry<VmWorkload>> + + /** + * Initialize the reader. + */ + init { + val entries = mutableMapOf<String, TraceEntry<VmWorkload>>() + + val timestampCol = 0 + val cpuUsageCol = 1 + val coreCol = 12 + val vmIdCol = 19 + val provisionedMemoryCol = 20 + val traceInterval = 5 * 60 * 1000L + + traceDirectory.walk() + .filterNot { it.isDirectory } + .filter { it.extension == "csv" || it.extension == "txt" } + .forEach { vmFile -> + println(vmFile) + val flopsHistory = mutableListOf<FlopsHistoryFragment>() + var vmId = "" + var cores = -1 + var requiredMemory = -1L + + BufferedReader(FileReader(vmFile)).use { reader -> + reader.lineSequence() + .filter { line -> + // Ignore comments in the trace + !line.startsWith("#") && line.isNotBlank() + } + .forEach { line -> + val values = line.split(" ") + + vmId = values[vmIdCol].trim() + val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L + cores = values[coreCol].trim().toInt() + val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz + requiredMemory = (values[provisionedMemoryCol].trim().toDouble() / 1000).toLong() + + val flops: Long = (cpuUsage * 1_000_000L * 5 * 60 * cores).toLong() + + if (flopsHistory.isEmpty()) { + flopsHistory.add(FlopsHistoryFragment(timestamp, flops, traceInterval, cpuUsage)) + } else { + if (flopsHistory.last().flops != flops) { + flopsHistory.add(FlopsHistoryFragment(timestamp, flops, traceInterval, cpuUsage)) + } else { + val oldFragment = flopsHistory.removeAt(flopsHistory.size - 1) + flopsHistory.add( + FlopsHistoryFragment( + oldFragment.tick, + oldFragment.flops + flops, + oldFragment.duration + traceInterval, + cpuUsage + ) + ) + } + } + } + } + + val uuid = UUID(0L, vmId.hashCode().toLong()) + + val relevantPerformanceInterferenceModelItems = PerformanceInterferenceModel( + performanceInterferenceModel.items.filter { it.workloadIds.contains(uuid) }.toSet() + ) + + val vmWorkload = VmWorkload( + uuid, "VM Workload $vmId", UnnamedUser, + VmImage( + uuid, + vmId, + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), + flopsHistory, + cores, + requiredMemory + ) + ) + entries[vmId] = TraceEntryImpl( + flopsHistory.firstOrNull()?.tick ?: -1, + vmWorkload + ) + } + + // Create the entry iterator + iterator = entries.values.sortedBy { it.submissionTime }.iterator() + } + + override fun hasNext(): Boolean = iterator.hasNext() + + override fun next(): TraceEntry<VmWorkload> = iterator.next() + + override fun close() {} + + /** + * An unnamed user. + */ + private object UnnamedUser : User { + override val name: String = "<unnamed>" + override val uid: UUID = UUID.randomUUID() + } + + /** + * An entry in the trace. + */ + private data class TraceEntryImpl( + override var submissionTime: Long, + override val workload: VmWorkload + ) : TraceEntry<VmWorkload> +} diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt index 2adf99b1..b4964b26 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt @@ -38,7 +38,7 @@ import java.io.FileReader import java.util.UUID /** - * A [TraceReader] for the VM workload trace format. + * A [TraceReader] for the public VM workload trace format. * * @param traceDirectory The directory of the traces. * @param performanceInterferenceModel The performance model covering the workload in the VM trace. |
