summaryrefslogtreecommitdiff
path: root/opendc
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-04-16 15:30:34 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-04-16 15:30:34 +0200
commit6cd93b57945b289b2e14556f7ceaa193326eff78 (patch)
tree9e2980e74a898a01cb9cd4a37fe0d37ca8d98f9f /opendc
parente097aeb16d77c260126b65c7f13330076d800d52 (diff)
bug: Fix issues related to early termination
Diffstat (limited to 'opendc')
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt20
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt4
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt30
-rw-r--r--opendc/opendc-experiments-sc20/build.gradle.kts2
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt23
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt121
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt27
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt7
8 files changed, 160 insertions, 74 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
index e3227540..36bbfa45 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
@@ -23,19 +23,17 @@ class VmImage(
val clock = simulationContext.clock
val job = coroutineContext[Job]!!
- for (fragments in flopsHistory.chunked(128)) {
- for (fragment in fragments) {
- job.ensureActive()
+ for (fragment in flopsHistory) {
+ job.ensureActive()
- if (fragment.flops == 0L) {
- delay(fragment.duration)
- } else {
- val cores = min(fragment.cores, ctx.server.flavor.cpuCount)
- val burst = LongArray(cores) { fragment.flops / cores }
- val usage = DoubleArray(cores) { fragment.usage / cores }
+ if (fragment.flops == 0L) {
+ delay(fragment.duration)
+ } else {
+ val cores = min(fragment.cores, ctx.server.flavor.cpuCount)
+ val burst = LongArray(cores) { fragment.flops / cores }
+ val usage = DoubleArray(cores) { fragment.usage / cores }
- ctx.run(burst, usage, clock.millis() + fragment.duration)
- }
+ ctx.run(burst, usage, clock.millis() + fragment.duration)
}
}
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
index 8a32bc43..53fa463b 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
@@ -243,7 +243,7 @@ class SimpleVirtDriver(
}
// XXX We set the minimum duration to 5 minutes here to prevent the rounding issues that are occurring with the FLOPs.
- duration = max(300.0, ceil(duration))
+ duration = 300.0
val totalAllocatedUsage = maxUsage - availableUsage
var totalAllocatedBurst = 0L
@@ -335,7 +335,7 @@ class SimpleVirtDriver(
eventFlow.emit(
HypervisorEvent.SliceFinished(
this@SimpleVirtDriver,
- totalRequestedSubBurst,
+ totalRequestedBurst,
min(totalRequestedSubBurst, totalGrantedBurst), // We can run more than requested due to timing
totalOvercommissionedBurst,
totalInterferedBurst, // Might be smaller than zero due to FP rounding errors,
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
index 09036d0d..520f6dc5 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
@@ -26,6 +26,7 @@ import kotlinx.coroutines.suspendCancellableCoroutine
import kotlinx.coroutines.withContext
import kotlin.coroutines.Continuation
import kotlin.coroutines.resume
+import kotlin.math.max
@OptIn(ExperimentalCoroutinesApi::class)
class SimpleVirtProvisioningService(
@@ -57,6 +58,10 @@ class SimpleVirtProvisioningService(
public var queuedVms = 0L
public var runningVms = 0L
public var finishedVms = 0L
+ public var unscheduledVms = 0L
+
+ private var maxCores = 0
+ private var maxMemory = 0L
/**
* The allocation logic to use.
@@ -124,15 +129,24 @@ class SimpleVirtProvisioningService(
}
private suspend fun schedule() {
- val log = simulationContext.log
+ val clock = simulationContext.clock
val imagesToBeScheduled = incomingImages.toSet()
for (imageInstance in imagesToBeScheduled) {
val requiredMemory = (imageInstance.image as VmImage).requiredMemory
- val selectedHv = allocationLogic.select(availableHypervisors, imageInstance) ?: break
+ val selectedHv = allocationLogic.select(availableHypervisors, imageInstance)
+
+ if (selectedHv == null) {
+ if (requiredMemory > maxMemory || imageInstance.flavor.cpuCount > maxCores) {
+ unscheduledVms++
+ println("[${clock.millis()}] CANNOT SPAWN ${imageInstance.image}")
+ }
+
+ break
+ }
try {
- log.info("Spawning ${imageInstance.image} on ${selectedHv.server}")
+ println("[${clock.millis()}] SPAWN ${imageInstance.image} on ${selectedHv.server.uid} ${selectedHv.server.name} ${selectedHv.server.flavor}")
incomingImages -= imageInstance
// Speculatively update the hypervisor view information to prevent other images in the queue from
@@ -157,6 +171,7 @@ class SimpleVirtProvisioningService(
when (event) {
is ServerEvent.StateChanged -> {
if (event.server.state == ServerState.SHUTOFF) {
+ println("[${clock.millis()}] FINISH ${event.server.uid} ${event.server.name} ${event.server.flavor}")
runningVms--
finishedVms++
@@ -178,6 +193,8 @@ class SimpleVirtProvisioningService(
selectedHv.numberOfActiveServers--
selectedHv.provisionedCores -= imageInstance.flavor.cpuCount
selectedHv.availableMemory += requiredMemory
+ } catch (e: Throwable) {
+ e.printStackTrace()
}
}
}
@@ -196,13 +213,18 @@ class SimpleVirtProvisioningService(
server.flavor.memorySize,
0
)
+ maxCores = max(maxCores, server.flavor.cpuCount)
+ maxMemory = max(maxMemory, server.flavor.memorySize)
hypervisors[server] = hv
}
}
ServerState.SHUTOFF, ServerState.ERROR -> {
val hv = hypervisors[server] ?: return
availableHypervisors -= hv
- requestCycle()
+
+ if (incomingImages.isNotEmpty()) {
+ requestCycle()
+ }
}
else -> throw IllegalStateException()
}
diff --git a/opendc/opendc-experiments-sc20/build.gradle.kts b/opendc/opendc-experiments-sc20/build.gradle.kts
index a78ea745..8611ffa7 100644
--- a/opendc/opendc-experiments-sc20/build.gradle.kts
+++ b/opendc/opendc-experiments-sc20/build.gradle.kts
@@ -32,7 +32,7 @@ plugins {
application {
mainClassName = "com.atlarge.opendc.experiments.sc20.TestExperimentKt"
- applicationDefaultJvmArgs = listOf("-Xmx2200M", "-Xms1800M")
+ applicationDefaultJvmArgs = listOf("-Xmx2500M", "-Xms1800M")
}
dependencies {
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt
index 7c198e56..bac0de21 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt
@@ -12,6 +12,8 @@ import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import java.io.Closeable
+import java.util.concurrent.ArrayBlockingQueue
+import kotlin.concurrent.thread
class Sc20Monitor(
destination: String
@@ -45,6 +47,19 @@ class Sc20Monitor(
.withPageSize(4 * 1024 * 1024) // For compression
.withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
.build()
+ private val queue = ArrayBlockingQueue<GenericData.Record>(2048)
+ private val writerThread = thread(start = true, name = "sc20-writer") {
+ try {
+ while (true) {
+ val record = queue.take()
+ writer.write(record)
+ }
+ } catch (e: InterruptedException) {
+ // Do not rethrow this
+ } finally {
+ writer.close()
+ }
+ }
suspend fun onVmStateChanged(server: Server) {}
@@ -122,10 +137,14 @@ class Sc20Monitor(
record.put("totalRunningVms", runningVms)
record.put("totalFinishedVms", finishedVms)
- writer.write(record)
+ queue.put(record)
}
override fun close() {
- writer.close()
+ // Busy loop to wait for writer thread to finish
+ while (queue.isNotEmpty()) {
+ Thread.sleep(500)
+ }
+ writerThread.interrupt()
}
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt
index 30456204..24ff9eed 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt
@@ -39,7 +39,6 @@ import org.apache.parquet.filter2.compat.FilterCompat
import org.apache.parquet.filter2.predicate.FilterApi
import org.apache.parquet.filter2.predicate.Statistics
import org.apache.parquet.filter2.predicate.UserDefinedPredicate
-import org.apache.parquet.hadoop.ParquetReader
import org.apache.parquet.io.api.Binary
import java.io.File
import java.io.Serializable
@@ -47,7 +46,9 @@ import java.util.Deque
import java.util.SortedSet
import java.util.TreeSet
import java.util.UUID
+import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.LinkedBlockingDeque
+import kotlin.concurrent.thread
import kotlin.random.Random
/**
@@ -69,37 +70,82 @@ class Sc20ParquetTraceReader(
private val iterator: Iterator<TraceEntry<VmWorkload>>
/**
- * Fill the buffers of the VMs
+ * The intermediate buffer to store the read records in.
*/
- private fun pull(reader: ParquetReader<GenericData.Record>, buffers: Map<String, Deque<FlopsHistoryFragment>>) {
+ private val queue = ArrayBlockingQueue<Pair<String, FlopsHistoryFragment>>(128)
+
+ /**
+ * An optional filter for filtering the selected VMs
+ */
+ private val filter =
+ if (selectedVms.isEmpty())
+ null
+ else
+ FilterCompat.get(FilterApi.userDefined(FilterApi.binaryColumn("id"), SelectedVmFilter(TreeSet(selectedVms))))
+
+ /**
+ * A poisonous fragment.
+ */
+ private val poison = Pair("\u0000", FlopsHistoryFragment(0, 0, 0, 0.0, 0))
+
+ /**
+ * The thread to read the records in.
+ */
+ private val readerThread = thread(start = true, name = "sc20-reader") {
+ val reader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "trace.parquet"))
+ .disableCompatibility()
+ .run { if (filter != null) withFilter(filter) else this }
+ .build()
+
+ try {
+ while (true) {
+ val record = reader.read()
+
+ if (record == null) {
+ queue.put(poison)
+ break
+ }
+
+ val id = record["id"].toString()
+ val tick = record["time"] as Long
+ val duration = record["duration"] as Long
+ val cores = record["cores"] as Int
+ val cpuUsage = record["cpuUsage"] as Double
+ val flops = record["flops"] as Long
+
+ val fragment = FlopsHistoryFragment(
+ tick,
+ flops,
+ duration,
+ cpuUsage,
+ cores
+ )
+
+ queue.put(id to fragment)
+ }
+ } catch (e: InterruptedException) {
+ // Do not rethrow this
+ } finally {
+ reader.close()
+ }
+ }
+
+ /**
+ * Fill the buffers with the VMs
+ */
+ private fun pull(buffers: Map<String, Deque<FlopsHistoryFragment>>) {
if (!hasNext) {
return
}
- repeat(buffers.size) {
- val record = reader.read()
+ repeat(16) {
+ val (id, fragment) = queue.take()
- if (record == null) {
+ if (id == poison.first) {
hasNext = false
- reader.close()
return
}
- val id = record["id"].toString()
- val tick = record["time"] as Long
- val duration = record["duration"] as Long
- val cores = record["cores"] as Int
- val cpuUsage = record["cpuUsage"] as Double
- val flops = record["flops"] as Long
-
- val fragment = FlopsHistoryFragment(
- tick,
- flops,
- duration,
- cpuUsage,
- cores
- )
-
buffers[id]?.add(fragment)
}
}
@@ -116,30 +162,20 @@ class Sc20ParquetTraceReader(
val entries = mutableMapOf<UUID, TraceEntry<VmWorkload>>()
val buffers = mutableMapOf<String, Deque<FlopsHistoryFragment>>()
- val filter =
- if (selectedVms.isEmpty())
- null
- else
- FilterCompat.get(FilterApi.userDefined(FilterApi.binaryColumn("id"), SelectedVmFilter(TreeSet(selectedVms))))
-
val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "meta.parquet"))
.disableCompatibility()
.run { if (filter != null) withFilter(filter) else this }
.build()
- val reader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "trace.parquet"))
- .disableCompatibility()
- .run { if (filter != null) withFilter(filter) else this }
- .build()
-
var idx = 0
while (true) {
val record = metaReader.read() ?: break
val id = record["id"].toString()
val submissionTime = record["submissionTime"] as Long
+ val endTime = record["endTime"] as Long
val maxCores = record["maxCores"] as Int
val requiredMemory = record["requiredMemory"] as Long
-
+ val uuid = UUID(0, (idx++).toLong())
println(id)
val buffer = LinkedBlockingDeque<FlopsHistoryFragment>()
@@ -148,16 +184,23 @@ class Sc20ParquetTraceReader(
while (true) {
if (buffer.isEmpty()) {
if (hasNext) {
- pull(reader, buffers)
+ pull(buffers)
continue
} else {
break
}
}
- yield(buffer.poll())
+
+ val fragment = buffer.poll()
+ yield(fragment)
+
+ if (fragment.tick >= endTime) {
+ break
+ }
}
+
+ buffers.remove(id)
}
- val uuid = UUID(0, (idx++).toLong())
val relevantPerformanceInterferenceModelItems =
PerformanceInterferenceModel(
performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSet(),
@@ -191,7 +234,9 @@ class Sc20ParquetTraceReader(
override fun next(): TraceEntry<VmWorkload> = iterator.next()
- override fun close() {}
+ override fun close() {
+ readerThread.interrupt()
+ }
private class SelectedVmFilter(val selectedVms: SortedSet<String>) : UserDefinedPredicate<Binary>(), Serializable {
override fun keep(value: Binary?): Boolean = value != null && selectedVms.contains(value.toStringUsingUTF8())
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
index 19055ad3..8478b592 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
@@ -237,13 +237,10 @@ fun main(args: Array<String>) {
null
}
- var submitted = 0L
- val finish = Channel<Unit>(Channel.RENDEZVOUS)
-
+ val finished = Channel<Unit>(Channel.RENDEZVOUS)
val reader = Sc20ParquetTraceReader(File(traceDirectory), performanceInterferenceModel, getSelectedVmList(), Random(seed))
while (reader.hasNext()) {
val (time, workload) = reader.next()
- submitted++
delay(max(0, time - simulationContext.clock.millis()))
launch {
chan.send(Unit)
@@ -254,24 +251,26 @@ fun main(args: Array<String>) {
// Monitor server events
server.events
.onEach {
- if (it is ServerEvent.StateChanged)
+ if (it is ServerEvent.StateChanged) {
monitor.onVmStateChanged(it.server)
-
- if (scheduler.submittedVms == submitted && scheduler.runningVms <= 1 && !reader.hasNext()) {
- finish.send(Unit)
}
+
+ finished.send(Unit)
}
.collect()
}
}
- finish.receive()
- failureDomain?.cancel()
- launch {
- scheduler.terminate()
+ while (scheduler.finishedVms + scheduler.unscheduledVms != scheduler.submittedVms || reader.hasNext()) {
+ finished.receive()
}
- println(simulationContext.clock.instant())
- println("${System.currentTimeMillis() - start} milliseconds")
+
+ println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}")
+
+ failureDomain?.cancel()
+ scheduler.terminate()
+ reader.close()
+ println("[${simulationContext.clock.millis()}] DONE ${System.currentTimeMillis() - start} milliseconds")
}
runBlocking {
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt
index 7f429b89..d005a157 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt
@@ -46,6 +46,7 @@ fun main() {
.fields()
.name("id").type().stringType().noDefault()
.name("submissionTime").type().longType().noDefault()
+ .name("endTime").type().longType().noDefault()
.name("maxCores").type().intType().noDefault()
.name("requiredMemory").type().longType().noDefault()
.endRecord()
@@ -92,7 +93,6 @@ fun main() {
var vmId = ""
var maxCores = -1
var requiredMemory = -1L
- var timestamp = -1L
var cores = -1
var minTime = Long.MAX_VALUE
@@ -112,7 +112,7 @@ fun main() {
val values = line.split(" ")
vmId = vmFile.name
- timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L
+ val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L
cores = values[coreCol].trim().toInt()
requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong())
maxCores = max(maxCores, cores)
@@ -150,13 +150,16 @@ fun main() {
}
}
+ var maxTime = Long.MIN_VALUE
flopsFragments.forEach { fragment ->
allFragments.add(fragment)
+ maxTime = max(maxTime, fragment.tick)
}
val metaRecord = GenericData.Record(metaSchema)
metaRecord.put("id", vmId)
metaRecord.put("submissionTime", minTime)
+ metaRecord.put("endTime", maxTime)
metaRecord.put("maxCores", maxCores)
metaRecord.put("requiredMemory", requiredMemory)
metaWriter.write(metaRecord)