summaryrefslogtreecommitdiff
path: root/opendc/opendc-experiments-sc20/src
diff options
context:
space:
mode:
authorGeorgios Andreadis <g.andreadis@student.tudelft.nl>2020-04-20 15:21:35 +0200
committerGeorgios Andreadis <g.andreadis@student.tudelft.nl>2020-04-20 15:21:35 +0200
commit6981d5581e2ce5c6df42dfbf133c350bd9c35a0f (patch)
tree74e8993babb28dd56950f1da69eda2d97735a0e8 /opendc/opendc-experiments-sc20/src
parent60372f0022d423efd5267ef4008d9afcbe870911 (diff)
parent3e056406616860c77168d827f1ca9d8d3c79c08e (diff)
Merge branch 'bug/experiment-issues' into '2.x'
Address issues during experiments See merge request opendc/opendc-simulator!61
Diffstat (limited to 'opendc/opendc-experiments-sc20/src')
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt101
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt290
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt65
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt193
4 files changed, 606 insertions, 43 deletions
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt
index 7e6398bb..bac0de21 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt
@@ -6,23 +6,71 @@ import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.metal.driver.BareMetalDriver
import com.atlarge.opendc.compute.virt.driver.VirtDriver
import kotlinx.coroutines.flow.first
-import java.io.BufferedWriter
+import org.apache.avro.SchemaBuilder
+import org.apache.avro.generic.GenericData
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.parquet.hadoop.metadata.CompressionCodecName
import java.io.Closeable
-import java.io.FileWriter
+import java.util.concurrent.ArrayBlockingQueue
+import kotlin.concurrent.thread
class Sc20Monitor(
destination: String
) : Closeable {
- private val outputFile = BufferedWriter(FileWriter(destination))
private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>()
-
- init {
- outputFile.write("time,duration,requestedBurst,grantedBurst,overcommissionedBurst,interferedBurst,cpuUsage,cpuDemand,numberOfDeployedImages,server,hostState,hostUsage,powerDraw\n")
+ private val schema = SchemaBuilder
+ .record("slice")
+ .namespace("com.atlarge.opendc.experiments.sc20")
+ .fields()
+ .name("time").type().longType().noDefault()
+ .name("duration").type().longType().noDefault()
+ .name("requestedBurst").type().longType().noDefault()
+ .name("grantedBurst").type().longType().noDefault()
+ .name("overcommissionedBurst").type().longType().noDefault()
+ .name("interferedBurst").type().longType().noDefault()
+ .name("cpuUsage").type().doubleType().noDefault()
+ .name("cpuDemand").type().doubleType().noDefault()
+ .name("numberOfDeployedImages").type().intType().noDefault()
+ .name("server").type().stringType().noDefault()
+ .name("hostState").type().stringType().noDefault()
+ .name("hostUsage").type().doubleType().noDefault()
+ .name("powerDraw").type().doubleType().noDefault()
+ .name("totalSubmittedVms").type().longType().noDefault()
+ .name("totalQueuedVms").type().longType().noDefault()
+ .name("totalRunningVms").type().longType().noDefault()
+ .name("totalFinishedVms").type().longType().noDefault()
+ .endRecord()
+ private val writer = AvroParquetWriter.builder<GenericData.Record>(Path(destination))
+ .withSchema(schema)
+ .withCompressionCodec(CompressionCodecName.SNAPPY)
+ .withPageSize(4 * 1024 * 1024) // For compression
+ .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
+ .build()
+ private val queue = ArrayBlockingQueue<GenericData.Record>(2048)
+ private val writerThread = thread(start = true, name = "sc20-writer") {
+ try {
+ while (true) {
+ val record = queue.take()
+ writer.write(record)
+ }
+ } catch (e: InterruptedException) {
+ // Do not rethrow this
+ } finally {
+ writer.close()
+ }
}
suspend fun onVmStateChanged(server: Server) {}
- suspend fun serverStateChanged(driver: VirtDriver, server: Server) {
+ suspend fun serverStateChanged(
+ driver: VirtDriver,
+ server: Server,
+ submittedVms: Long,
+ queuedVms: Long,
+ runningVms: Long,
+ finishedVms: Long
+ ) {
val lastServerState = lastServerStates[server]
if (server.state == ServerState.SHUTOFF && lastServerState != null) {
val duration = simulationContext.clock.millis() - lastServerState.second
@@ -36,6 +84,10 @@ class Sc20Monitor(
0.0,
0,
server,
+ submittedVms,
+ queuedVms,
+ runningVms,
+ finishedVms,
duration
)
}
@@ -55,21 +107,44 @@ class Sc20Monitor(
cpuDemand: Double,
numberOfDeployedImages: Int,
hostServer: Server,
+ submittedVms: Long,
+ queuedVms: Long,
+ runningVms: Long,
+ finishedVms: Long,
duration: Long = 5 * 60 * 1000L
) {
- lastServerStates.remove(hostServer)
-
// Assume for now that the host is not virtualized and measure the current power draw
val driver = hostServer.services[BareMetalDriver.Key]
val usage = driver.usage.first()
val powerDraw = driver.powerDraw.first()
- outputFile.write("$time,$duration,$requestedBurst,$grantedBurst,$overcommissionedBurst,$interferedBurst,$cpuUsage,$cpuDemand,$numberOfDeployedImages,${hostServer.uid},${hostServer.state},$usage,$powerDraw")
- outputFile.newLine()
+ val record = GenericData.Record(schema)
+ record.put("time", time)
+ record.put("duration", duration)
+ record.put("requestedBurst", requestedBurst)
+ record.put("grantedBurst", grantedBurst)
+ record.put("overcommissionedBurst", overcommissionedBurst)
+ record.put("interferedBurst", interferedBurst)
+ record.put("cpuUsage", cpuUsage)
+ record.put("cpuDemand", cpuDemand)
+ record.put("numberOfDeployedImages", numberOfDeployedImages)
+ record.put("server", hostServer.uid)
+ record.put("hostState", hostServer.state)
+ record.put("hostUsage", usage)
+ record.put("powerDraw", powerDraw)
+ record.put("totalSubmittedVms", submittedVms)
+ record.put("totalQueuedVms", queuedVms)
+ record.put("totalRunningVms", runningVms)
+ record.put("totalFinishedVms", finishedVms)
+
+ queue.put(record)
}
override fun close() {
- outputFile.flush()
- outputFile.close()
+ // Busy loop to wait for writer thread to finish
+ while (queue.isNotEmpty()) {
+ Thread.sleep(500)
+ }
+ writerThread.interrupt()
}
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt
new file mode 100644
index 00000000..0a7718e9
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt
@@ -0,0 +1,290 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20
+
+import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment
+import com.atlarge.opendc.compute.core.image.VmImage
+import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL
+import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
+import com.atlarge.opendc.compute.core.workload.VmWorkload
+import com.atlarge.opendc.core.User
+import com.atlarge.opendc.format.trace.TraceEntry
+import com.atlarge.opendc.format.trace.TraceReader
+import org.apache.avro.generic.GenericData
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.avro.AvroParquetReader
+import org.apache.parquet.filter2.compat.FilterCompat
+import org.apache.parquet.filter2.predicate.FilterApi
+import org.apache.parquet.filter2.predicate.Statistics
+import org.apache.parquet.filter2.predicate.UserDefinedPredicate
+import org.apache.parquet.io.api.Binary
+import java.io.File
+import java.io.Serializable
+import java.util.SortedSet
+import java.util.TreeSet
+import java.util.UUID
+import java.util.concurrent.ArrayBlockingQueue
+import kotlin.concurrent.thread
+import kotlin.random.Random
+
+/**
+ * A [TraceReader] for the internal VM workload trace format.
+ *
+ * @param traceFile The directory of the traces.
+ * @param performanceInterferenceModel The performance model covering the workload in the VM trace.
+ */
+@OptIn(ExperimentalStdlibApi::class)
+class Sc20ParquetTraceReader(
+ traceFile: File,
+ performanceInterferenceModel: PerformanceInterferenceModel,
+ selectedVms: List<String>,
+ random: Random
+) : TraceReader<VmWorkload> {
+ /**
+ * The internal iterator to use for this reader.
+ */
+ private val iterator: Iterator<TraceEntry<VmWorkload>>
+
+ /**
+ * The intermediate buffer to store the read records in.
+ */
+ private val queue = ArrayBlockingQueue<Pair<String, FlopsHistoryFragment>>(1024)
+
+ /**
+ * An optional filter for filtering the selected VMs
+ */
+ private val filter =
+ if (selectedVms.isEmpty())
+ null
+ else
+ FilterCompat.get(FilterApi.userDefined(FilterApi.binaryColumn("id"), SelectedVmFilter(TreeSet(selectedVms))))
+
+ /**
+ * A poisonous fragment.
+ */
+ private val poison = Pair("\u0000", FlopsHistoryFragment(0, 0, 0, 0.0, 0))
+
+ /**
+ * The thread to read the records in.
+ */
+ private val readerThread = thread(start = true, name = "sc20-reader") {
+ val reader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "trace.parquet"))
+ .disableCompatibility()
+ .run { if (filter != null) withFilter(filter) else this }
+ .build()
+
+ try {
+ while (true) {
+ val record = reader.read()
+
+ if (record == null) {
+ queue.put(poison)
+ break
+ }
+
+ val id = record["id"].toString()
+ val tick = record["time"] as Long
+ val duration = record["duration"] as Long
+ val cores = record["cores"] as Int
+ val cpuUsage = record["cpuUsage"] as Double
+ val flops = record["flops"] as Long
+
+ val fragment = FlopsHistoryFragment(
+ tick,
+ flops,
+ duration,
+ cpuUsage,
+ cores
+ )
+
+ queue.put(id to fragment)
+ }
+ } catch (e: InterruptedException) {
+ // Do not rethrow this
+ } finally {
+ reader.close()
+ }
+ }
+
+ /**
+ * Fill the buffers with the VMs
+ */
+ private fun pull(buffers: Map<String, List<MutableList<FlopsHistoryFragment>>>) {
+ if (!hasNext) {
+ return
+ }
+
+ val fragments = mutableListOf<Pair<String, FlopsHistoryFragment>>()
+ queue.drainTo(fragments)
+
+ for ((id, fragment) in fragments) {
+ if (id == poison.first) {
+ hasNext = false
+ return
+ }
+ buffers[id]?.forEach { it.add(fragment) }
+ }
+ }
+
+ /**
+ * A flag to indicate whether the reader has more entries.
+ */
+ private var hasNext: Boolean = true
+
+ /**
+ * Initialize the reader.
+ */
+ init {
+ val takenIds = mutableSetOf<UUID>()
+ val entries = mutableMapOf<String, GenericData.Record>()
+ val buffers = mutableMapOf<String, MutableList<MutableList<FlopsHistoryFragment>>>()
+
+ val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "meta.parquet"))
+ .disableCompatibility()
+ .run { if (filter != null) withFilter(filter) else this }
+ .build()
+
+ while (true) {
+ val record = metaReader.read() ?: break
+ val id = record["id"].toString()
+ entries[id] = record
+ }
+
+ metaReader.close()
+
+ val selection = if (selectedVms.isEmpty()) entries.keys else selectedVms
+
+ // Create the entry iterator
+ iterator = selection.asSequence()
+ .mapNotNull { entries[it] }
+ .mapIndexed { index, record ->
+ val id = record["id"].toString()
+ val submissionTime = record["submissionTime"] as Long
+ val endTime = record["endTime"] as Long
+ val maxCores = record["maxCores"] as Int
+ val requiredMemory = record["requiredMemory"] as Long
+ val uid = UUID.nameUUIDFromBytes("$id-$index".toByteArray())
+
+ assert(uid !in takenIds)
+ takenIds += uid
+
+ println(id)
+
+ val internalBuffer = mutableListOf<FlopsHistoryFragment>()
+ val externalBuffer = mutableListOf<FlopsHistoryFragment>()
+ buffers.getOrPut(id) { mutableListOf() }.add(externalBuffer)
+ val fragments = sequence<FlopsHistoryFragment> {
+ repeat@while (true) {
+ if (externalBuffer.isEmpty()) {
+ if (hasNext) {
+ pull(buffers)
+ continue
+ } else {
+ break
+ }
+ }
+
+ internalBuffer.addAll(externalBuffer)
+ externalBuffer.clear()
+
+ for (fragment in internalBuffer) {
+ yield(fragment)
+
+ if (fragment.tick >= endTime) {
+ break@repeat
+ }
+ }
+
+ internalBuffer.clear()
+ }
+
+ buffers.remove(id)
+ }
+ val relevantPerformanceInterferenceModelItems =
+ PerformanceInterferenceModel(
+ performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSet(),
+ Random(random.nextInt())
+ )
+ val vmWorkload = VmWorkload(
+ uid, "VM Workload $id", UnnamedUser,
+ VmImage(
+ uid,
+ id,
+ mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems),
+ fragments,
+ maxCores,
+ requiredMemory
+ )
+ )
+
+ TraceEntryImpl(submissionTime, vmWorkload)
+ }
+ .sortedBy { it.submissionTime }
+ .toList()
+ .iterator()
+ }
+
+ override fun hasNext(): Boolean = iterator.hasNext()
+
+ override fun next(): TraceEntry<VmWorkload> = iterator.next()
+
+ override fun close() {
+ readerThread.interrupt()
+ }
+
+ private class SelectedVmFilter(val selectedVms: SortedSet<String>) : UserDefinedPredicate<Binary>(), Serializable {
+ override fun keep(value: Binary?): Boolean = value != null && selectedVms.contains(value.toStringUsingUTF8())
+
+ override fun canDrop(statistics: Statistics<Binary>): Boolean {
+ val min = statistics.min
+ val max = statistics.max
+
+ return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isEmpty()
+ }
+
+ override fun inverseCanDrop(statistics: Statistics<Binary>): Boolean {
+ val min = statistics.min
+ val max = statistics.max
+
+ return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isNotEmpty()
+ }
+ }
+
+ /**
+ * An unnamed user.
+ */
+ private object UnnamedUser : User {
+ override val name: String = "<unnamed>"
+ override val uid: UUID = UUID.randomUUID()
+ }
+
+ /**
+ * An entry in the trace.
+ */
+ private data class TraceEntryImpl(
+ override var submissionTime: Long,
+ override val workload: VmWorkload
+ ) : TraceEntry<VmWorkload>
+}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
index c75bde30..028cfb9a 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
@@ -29,7 +29,6 @@ import com.atlarge.odcsim.SimulationEngineProvider
import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.ServerEvent
-import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.metal.NODE_CLUSTER
import com.atlarge.opendc.compute.metal.service.ProvisioningService
import com.atlarge.opendc.compute.virt.HypervisorEvent
@@ -45,7 +44,6 @@ import com.atlarge.opendc.core.failure.FailureDomain
import com.atlarge.opendc.core.failure.FaultInjector
import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader
-import com.atlarge.opendc.format.trace.sc20.Sc20TraceReader
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import com.xenomachina.argparser.ArgParser
@@ -70,13 +68,15 @@ class ExperimentParameters(parser: ArgParser) {
val environmentFile by parser.storing("path to the environment file")
val performanceInterferenceFile by parser.storing("path to the performance interference file").default { null }
val outputFile by parser.storing("path to where the output should be stored")
- .default { "sc20-experiment-results.csv" }
+ .default { "data/results-${System.currentTimeMillis()}.parquet" }
val selectedVms by parser.storing("the VMs to run") { parseVMs(this) }
.default { emptyList() }
val selectedVmsFile by parser.storing("path to a file containing the VMs to run") {
parseVMs(FileReader(File(this)).readText())
}
.default { emptyList() }
+ val seed by parser.storing("the random seed") { toInt() }
+ .default(0)
val failures by parser.flagging("-x", "--failures", help = "enable (correlated) machine failures")
val allocationPolicy by parser.storing("name of VM allocation policy to use").default("core-mem")
@@ -118,6 +118,14 @@ fun createFaultInjector(domain: Domain, random: Random): FaultInjector {
@OptIn(ExperimentalCoroutinesApi::class)
fun main(args: Array<String>) {
ArgParser(args).parseInto(::ExperimentParameters).run {
+ println("trace-directory: $traceDirectory")
+ println("environment-file: $environmentFile")
+ println("performance-interference-file: $performanceInterferenceFile")
+ println("selected-vms-file: $selectedVmsFile")
+ println("seed: $seed")
+ println("failures: $failures")
+ println("allocation-policy: $allocationPolicy")
+
val start = System.currentTimeMillis()
val monitor = Sc20Monitor(outputFile)
@@ -135,7 +143,7 @@ fun main(args: Array<String>) {
"active-servers-inv" to NumberOfActiveServersAllocationPolicy(true),
"provisioned-cores" to ProvisionedCoresAllocationPolicy(),
"provisioned-cores-inv" to ProvisionedCoresAllocationPolicy(true),
- "random" to RandomAllocationPolicy()
+ "random" to RandomAllocationPolicy(Random(seed))
)
if (allocationPolicy !in allocationPolicies) {
@@ -174,20 +182,16 @@ fun main(args: Array<String>) {
delay(10)
val hypervisors = scheduler.drivers()
- var availableHypervisors = hypervisors.size
// Monitor hypervisor events
for (hypervisor in hypervisors) {
// TODO Do not expose VirtDriver directly but use Hypervisor class.
- monitor.serverStateChanged(hypervisor, (hypervisor as SimpleVirtDriver).server)
+ monitor.serverStateChanged(hypervisor, (hypervisor as SimpleVirtDriver).server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms)
hypervisor.server.events
.onEach { event ->
when (event) {
is ServerEvent.StateChanged -> {
- monitor.serverStateChanged(hypervisor, event.server)
-
- if (event.server.state == ServerState.ERROR)
- availableHypervisors -= 1
+ monitor.serverStateChanged(hypervisor, event.server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms)
}
}
}
@@ -204,7 +208,11 @@ fun main(args: Array<String>) {
event.cpuUsage,
event.cpuDemand,
event.numberOfDeployedImages,
- event.hostServer
+ event.hostServer,
+ scheduler.submittedVms,
+ scheduler.queuedVms,
+ scheduler.runningVms,
+ scheduler.finishedVms
)
}
}
@@ -216,7 +224,7 @@ fun main(args: Array<String>) {
val domain = root.newDomain(name = "failures")
domain.launch {
chan.receive()
- val random = Random(0)
+ val random = Random(seed)
val injectors = mutableMapOf<String, FaultInjector>()
for (node in bareMetalProvisioner.nodes()) {
val cluster = node.metadata[NODE_CLUSTER] as String
@@ -229,15 +237,13 @@ fun main(args: Array<String>) {
null
}
- val finish = Channel<Unit>(Channel.RENDEZVOUS)
-
- var submitted = 0
- var finished = 0
- val reader = Sc20TraceReader(File(traceDirectory), performanceInterferenceModel, getSelectedVmList())
+ var submitted = 0L
+ val finished = Channel<Unit>(Channel.RENDEZVOUS)
+ val reader = Sc20ParquetTraceReader(File(traceDirectory), performanceInterferenceModel, getSelectedVmList(), Random(seed))
while (reader.hasNext()) {
val (time, workload) = reader.next()
- delay(max(0, time - simulationContext.clock.millis()))
submitted++
+ delay(max(0, time - simulationContext.clock.millis()))
launch {
chan.send(Unit)
val server = scheduler.deploy(
@@ -247,27 +253,26 @@ fun main(args: Array<String>) {
// Monitor server events
server.events
.onEach {
- if (it is ServerEvent.StateChanged)
+ if (it is ServerEvent.StateChanged) {
monitor.onVmStateChanged(it.server)
-
- // Detect whether the VM has finished running
- if (it.server.state == ServerState.SHUTOFF) {
- finished++
}
- if (finished == submitted && !reader.hasNext()) {
- finish.send(Unit)
- }
+ finished.send(Unit)
}
.collect()
}
}
- finish.receive()
- scheduler.terminate()
+ while (scheduler.finishedVms + scheduler.unscheduledVms != submitted || reader.hasNext()) {
+ finished.receive()
+ }
+
+ println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}")
+
failureDomain?.cancel()
- println(simulationContext.clock.instant())
- println("${System.currentTimeMillis() - start} milliseconds")
+ scheduler.terminate()
+ reader.close()
+ println("[${simulationContext.clock.millis()}] DONE ${System.currentTimeMillis() - start} milliseconds")
}
runBlocking {
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt
new file mode 100644
index 00000000..d005a157
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt
@@ -0,0 +1,193 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20
+
+import org.apache.avro.SchemaBuilder
+import org.apache.avro.generic.GenericData
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.parquet.hadoop.metadata.CompressionCodecName
+import java.io.BufferedReader
+import java.io.File
+import java.io.FileReader
+import kotlin.math.max
+import kotlin.math.min
+
+/**
+ *
+ * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
+ */
+fun main() {
+ val metaSchema = SchemaBuilder
+ .record("meta")
+ .namespace("com.atlarge.opendc.format.sc20")
+ .fields()
+ .name("id").type().stringType().noDefault()
+ .name("submissionTime").type().longType().noDefault()
+ .name("endTime").type().longType().noDefault()
+ .name("maxCores").type().intType().noDefault()
+ .name("requiredMemory").type().longType().noDefault()
+ .endRecord()
+ val schema = SchemaBuilder
+ .record("trace")
+ .namespace("com.atlarge.opendc.format.sc20")
+ .fields()
+ .name("id").type().stringType().noDefault()
+ .name("time").type().longType().noDefault()
+ .name("duration").type().longType().noDefault()
+ .name("cores").type().intType().noDefault()
+ .name("cpuUsage").type().doubleType().noDefault()
+ .name("flops").type().longType().noDefault()
+ .endRecord()
+
+ val timestampCol = 0
+ val cpuUsageCol = 1
+ val coreCol = 12
+ val vmIdCol = 19
+ val provisionedMemoryCol = 20
+ val traceInterval = 5 * 60 * 1000L
+
+ val dest = File("../traces/solvinity/small-parquet")
+ val traceDirectory = File("../traces/solvinity/small")
+ val vms =
+ traceDirectory.walk()
+ .filterNot { it.isDirectory }
+ .filter { it.extension == "csv" || it.extension == "txt" }
+ .toList()
+
+ val metaWriter = AvroParquetWriter.builder<GenericData.Record>(Path(dest.absolutePath, "meta.parquet"))
+ .withSchema(metaSchema)
+ .withCompressionCodec(CompressionCodecName.SNAPPY)
+ .withPageSize(4 * 1024 * 1024) // For compression
+ .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
+ .build()
+
+ val allFragments = mutableListOf<Fragment>()
+
+ vms
+ .forEachIndexed { idx, vmFile ->
+ println(vmFile)
+
+ var vmId = ""
+ var maxCores = -1
+ var requiredMemory = -1L
+ var cores = -1
+ var minTime = Long.MAX_VALUE
+
+ val flopsFragments = sequence {
+ var last: Fragment? = null
+
+ BufferedReader(FileReader(vmFile)).use { reader ->
+ reader.lineSequence()
+ .chunked(128)
+ .forEach { lines ->
+ for (line in lines) {
+ // Ignore comments in the trace
+ if (line.startsWith("#") || line.isBlank()) {
+ continue
+ }
+
+ val values = line.split(" ")
+
+ vmId = vmFile.name
+ val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L
+ cores = values[coreCol].trim().toInt()
+ requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong())
+ maxCores = max(maxCores, cores)
+ minTime = min(minTime, timestamp)
+ val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz
+ requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong())
+ maxCores = max(maxCores, cores)
+
+ val flops: Long = (cpuUsage * 5 * 60).toLong()
+
+ last = if (last != null && last!!.flops == 0L && flops == 0L) {
+ val oldFragment = last!!
+ Fragment(
+ vmId,
+ oldFragment.tick,
+ oldFragment.flops + flops,
+ oldFragment.duration + traceInterval,
+ cpuUsage,
+ cores
+ )
+ } else {
+ val fragment =
+ Fragment(vmId, timestamp, flops, traceInterval, cpuUsage, cores)
+ if (last != null) {
+ yield(last!!)
+ }
+ fragment
+ }
+ }
+ }
+ }
+
+ if (last != null) {
+ yield(last!!)
+ }
+ }
+
+ var maxTime = Long.MIN_VALUE
+ flopsFragments.forEach { fragment ->
+ allFragments.add(fragment)
+ maxTime = max(maxTime, fragment.tick)
+ }
+
+ val metaRecord = GenericData.Record(metaSchema)
+ metaRecord.put("id", vmId)
+ metaRecord.put("submissionTime", minTime)
+ metaRecord.put("endTime", maxTime)
+ metaRecord.put("maxCores", maxCores)
+ metaRecord.put("requiredMemory", requiredMemory)
+ metaWriter.write(metaRecord)
+ }
+
+ val writer = AvroParquetWriter.builder<GenericData.Record>(Path(dest.absolutePath, "trace.parquet"))
+ .withSchema(schema)
+ .withCompressionCodec(CompressionCodecName.SNAPPY)
+ .withPageSize(4 * 1024 * 1024) // For compression
+ .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
+ .build()
+
+ allFragments.sortWith(compareBy<Fragment> { it.tick }.thenBy { it.id })
+
+ for (fragment in allFragments) {
+ val record = GenericData.Record(schema)
+ record.put("id", fragment.id)
+ record.put("time", fragment.tick)
+ record.put("duration", fragment.duration)
+ record.put("cores", fragment.cores)
+ record.put("cpuUsage", fragment.usage)
+ record.put("flops", fragment.flops)
+
+ writer.write(record)
+ }
+
+ writer.close()
+ metaWriter.close()
+}
+
+data class Fragment(val id: String, val tick: Long, val flops: Long, val duration: Long, val usage: Double, val cores: Int)