diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-04-16 15:30:34 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-04-16 15:30:34 +0200 |
| commit | 6cd93b57945b289b2e14556f7ceaa193326eff78 (patch) | |
| tree | 9e2980e74a898a01cb9cd4a37fe0d37ca8d98f9f | |
| parent | e097aeb16d77c260126b65c7f13330076d800d52 (diff) | |
bug: Fix issues related to early termination
8 files changed, 160 insertions, 74 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt index e3227540..36bbfa45 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt @@ -23,19 +23,17 @@ class VmImage( val clock = simulationContext.clock val job = coroutineContext[Job]!! - for (fragments in flopsHistory.chunked(128)) { - for (fragment in fragments) { - job.ensureActive() + for (fragment in flopsHistory) { + job.ensureActive() - if (fragment.flops == 0L) { - delay(fragment.duration) - } else { - val cores = min(fragment.cores, ctx.server.flavor.cpuCount) - val burst = LongArray(cores) { fragment.flops / cores } - val usage = DoubleArray(cores) { fragment.usage / cores } + if (fragment.flops == 0L) { + delay(fragment.duration) + } else { + val cores = min(fragment.cores, ctx.server.flavor.cpuCount) + val burst = LongArray(cores) { fragment.flops / cores } + val usage = DoubleArray(cores) { fragment.usage / cores } - ctx.run(burst, usage, clock.millis() + fragment.duration) - } + ctx.run(burst, usage, clock.millis() + fragment.duration) } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt index 8a32bc43..53fa463b 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt @@ -243,7 +243,7 @@ class SimpleVirtDriver( } // XXX We set the minimum duration to 5 minutes here to prevent the rounding issues that are occurring with the FLOPs. - duration = max(300.0, ceil(duration)) + duration = 300.0 val totalAllocatedUsage = maxUsage - availableUsage var totalAllocatedBurst = 0L @@ -335,7 +335,7 @@ class SimpleVirtDriver( eventFlow.emit( HypervisorEvent.SliceFinished( this@SimpleVirtDriver, - totalRequestedSubBurst, + totalRequestedBurst, min(totalRequestedSubBurst, totalGrantedBurst), // We can run more than requested due to timing totalOvercommissionedBurst, totalInterferedBurst, // Might be smaller than zero due to FP rounding errors, diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt index 09036d0d..520f6dc5 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt @@ -26,6 +26,7 @@ import kotlinx.coroutines.suspendCancellableCoroutine import kotlinx.coroutines.withContext import kotlin.coroutines.Continuation import kotlin.coroutines.resume +import kotlin.math.max @OptIn(ExperimentalCoroutinesApi::class) class SimpleVirtProvisioningService( @@ -57,6 +58,10 @@ class SimpleVirtProvisioningService( public var queuedVms = 0L public var runningVms = 0L public var finishedVms = 0L + public var unscheduledVms = 0L + + private var maxCores = 0 + private var maxMemory = 0L /** * The allocation logic to use. @@ -124,15 +129,24 @@ class SimpleVirtProvisioningService( } private suspend fun schedule() { - val log = simulationContext.log + val clock = simulationContext.clock val imagesToBeScheduled = incomingImages.toSet() for (imageInstance in imagesToBeScheduled) { val requiredMemory = (imageInstance.image as VmImage).requiredMemory - val selectedHv = allocationLogic.select(availableHypervisors, imageInstance) ?: break + val selectedHv = allocationLogic.select(availableHypervisors, imageInstance) + + if (selectedHv == null) { + if (requiredMemory > maxMemory || imageInstance.flavor.cpuCount > maxCores) { + unscheduledVms++ + println("[${clock.millis()}] CANNOT SPAWN ${imageInstance.image}") + } + + break + } try { - log.info("Spawning ${imageInstance.image} on ${selectedHv.server}") + println("[${clock.millis()}] SPAWN ${imageInstance.image} on ${selectedHv.server.uid} ${selectedHv.server.name} ${selectedHv.server.flavor}") incomingImages -= imageInstance // Speculatively update the hypervisor view information to prevent other images in the queue from @@ -157,6 +171,7 @@ class SimpleVirtProvisioningService( when (event) { is ServerEvent.StateChanged -> { if (event.server.state == ServerState.SHUTOFF) { + println("[${clock.millis()}] FINISH ${event.server.uid} ${event.server.name} ${event.server.flavor}") runningVms-- finishedVms++ @@ -178,6 +193,8 @@ class SimpleVirtProvisioningService( selectedHv.numberOfActiveServers-- selectedHv.provisionedCores -= imageInstance.flavor.cpuCount selectedHv.availableMemory += requiredMemory + } catch (e: Throwable) { + e.printStackTrace() } } } @@ -196,13 +213,18 @@ class SimpleVirtProvisioningService( server.flavor.memorySize, 0 ) + maxCores = max(maxCores, server.flavor.cpuCount) + maxMemory = max(maxMemory, server.flavor.memorySize) hypervisors[server] = hv } } ServerState.SHUTOFF, ServerState.ERROR -> { val hv = hypervisors[server] ?: return availableHypervisors -= hv - requestCycle() + + if (incomingImages.isNotEmpty()) { + requestCycle() + } } else -> throw IllegalStateException() } diff --git a/opendc/opendc-experiments-sc20/build.gradle.kts b/opendc/opendc-experiments-sc20/build.gradle.kts index a78ea745..8611ffa7 100644 --- a/opendc/opendc-experiments-sc20/build.gradle.kts +++ b/opendc/opendc-experiments-sc20/build.gradle.kts @@ -32,7 +32,7 @@ plugins { application { mainClassName = "com.atlarge.opendc.experiments.sc20.TestExperimentKt" - applicationDefaultJvmArgs = listOf("-Xmx2200M", "-Xms1800M") + applicationDefaultJvmArgs = listOf("-Xmx2500M", "-Xms1800M") } dependencies { diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt index 7c198e56..bac0de21 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt @@ -12,6 +12,8 @@ import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName import java.io.Closeable +import java.util.concurrent.ArrayBlockingQueue +import kotlin.concurrent.thread class Sc20Monitor( destination: String @@ -45,6 +47,19 @@ class Sc20Monitor( .withPageSize(4 * 1024 * 1024) // For compression .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) .build() + private val queue = ArrayBlockingQueue<GenericData.Record>(2048) + private val writerThread = thread(start = true, name = "sc20-writer") { + try { + while (true) { + val record = queue.take() + writer.write(record) + } + } catch (e: InterruptedException) { + // Do not rethrow this + } finally { + writer.close() + } + } suspend fun onVmStateChanged(server: Server) {} @@ -122,10 +137,14 @@ class Sc20Monitor( record.put("totalRunningVms", runningVms) record.put("totalFinishedVms", finishedVms) - writer.write(record) + queue.put(record) } override fun close() { - writer.close() + // Busy loop to wait for writer thread to finish + while (queue.isNotEmpty()) { + Thread.sleep(500) + } + writerThread.interrupt() } } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt index 30456204..24ff9eed 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt @@ -39,7 +39,6 @@ import org.apache.parquet.filter2.compat.FilterCompat import org.apache.parquet.filter2.predicate.FilterApi import org.apache.parquet.filter2.predicate.Statistics import org.apache.parquet.filter2.predicate.UserDefinedPredicate -import org.apache.parquet.hadoop.ParquetReader import org.apache.parquet.io.api.Binary import java.io.File import java.io.Serializable @@ -47,7 +46,9 @@ import java.util.Deque import java.util.SortedSet import java.util.TreeSet import java.util.UUID +import java.util.concurrent.ArrayBlockingQueue import java.util.concurrent.LinkedBlockingDeque +import kotlin.concurrent.thread import kotlin.random.Random /** @@ -69,37 +70,82 @@ class Sc20ParquetTraceReader( private val iterator: Iterator<TraceEntry<VmWorkload>> /** - * Fill the buffers of the VMs + * The intermediate buffer to store the read records in. */ - private fun pull(reader: ParquetReader<GenericData.Record>, buffers: Map<String, Deque<FlopsHistoryFragment>>) { + private val queue = ArrayBlockingQueue<Pair<String, FlopsHistoryFragment>>(128) + + /** + * An optional filter for filtering the selected VMs + */ + private val filter = + if (selectedVms.isEmpty()) + null + else + FilterCompat.get(FilterApi.userDefined(FilterApi.binaryColumn("id"), SelectedVmFilter(TreeSet(selectedVms)))) + + /** + * A poisonous fragment. + */ + private val poison = Pair("\u0000", FlopsHistoryFragment(0, 0, 0, 0.0, 0)) + + /** + * The thread to read the records in. + */ + private val readerThread = thread(start = true, name = "sc20-reader") { + val reader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "trace.parquet")) + .disableCompatibility() + .run { if (filter != null) withFilter(filter) else this } + .build() + + try { + while (true) { + val record = reader.read() + + if (record == null) { + queue.put(poison) + break + } + + val id = record["id"].toString() + val tick = record["time"] as Long + val duration = record["duration"] as Long + val cores = record["cores"] as Int + val cpuUsage = record["cpuUsage"] as Double + val flops = record["flops"] as Long + + val fragment = FlopsHistoryFragment( + tick, + flops, + duration, + cpuUsage, + cores + ) + + queue.put(id to fragment) + } + } catch (e: InterruptedException) { + // Do not rethrow this + } finally { + reader.close() + } + } + + /** + * Fill the buffers with the VMs + */ + private fun pull(buffers: Map<String, Deque<FlopsHistoryFragment>>) { if (!hasNext) { return } - repeat(buffers.size) { - val record = reader.read() + repeat(16) { + val (id, fragment) = queue.take() - if (record == null) { + if (id == poison.first) { hasNext = false - reader.close() return } - val id = record["id"].toString() - val tick = record["time"] as Long - val duration = record["duration"] as Long - val cores = record["cores"] as Int - val cpuUsage = record["cpuUsage"] as Double - val flops = record["flops"] as Long - - val fragment = FlopsHistoryFragment( - tick, - flops, - duration, - cpuUsage, - cores - ) - buffers[id]?.add(fragment) } } @@ -116,30 +162,20 @@ class Sc20ParquetTraceReader( val entries = mutableMapOf<UUID, TraceEntry<VmWorkload>>() val buffers = mutableMapOf<String, Deque<FlopsHistoryFragment>>() - val filter = - if (selectedVms.isEmpty()) - null - else - FilterCompat.get(FilterApi.userDefined(FilterApi.binaryColumn("id"), SelectedVmFilter(TreeSet(selectedVms)))) - val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "meta.parquet")) .disableCompatibility() .run { if (filter != null) withFilter(filter) else this } .build() - val reader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "trace.parquet")) - .disableCompatibility() - .run { if (filter != null) withFilter(filter) else this } - .build() - var idx = 0 while (true) { val record = metaReader.read() ?: break val id = record["id"].toString() val submissionTime = record["submissionTime"] as Long + val endTime = record["endTime"] as Long val maxCores = record["maxCores"] as Int val requiredMemory = record["requiredMemory"] as Long - + val uuid = UUID(0, (idx++).toLong()) println(id) val buffer = LinkedBlockingDeque<FlopsHistoryFragment>() @@ -148,16 +184,23 @@ class Sc20ParquetTraceReader( while (true) { if (buffer.isEmpty()) { if (hasNext) { - pull(reader, buffers) + pull(buffers) continue } else { break } } - yield(buffer.poll()) + + val fragment = buffer.poll() + yield(fragment) + + if (fragment.tick >= endTime) { + break + } } + + buffers.remove(id) } - val uuid = UUID(0, (idx++).toLong()) val relevantPerformanceInterferenceModelItems = PerformanceInterferenceModel( performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSet(), @@ -191,7 +234,9 @@ class Sc20ParquetTraceReader( override fun next(): TraceEntry<VmWorkload> = iterator.next() - override fun close() {} + override fun close() { + readerThread.interrupt() + } private class SelectedVmFilter(val selectedVms: SortedSet<String>) : UserDefinedPredicate<Binary>(), Serializable { override fun keep(value: Binary?): Boolean = value != null && selectedVms.contains(value.toStringUsingUTF8()) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index 19055ad3..8478b592 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -237,13 +237,10 @@ fun main(args: Array<String>) { null } - var submitted = 0L - val finish = Channel<Unit>(Channel.RENDEZVOUS) - + val finished = Channel<Unit>(Channel.RENDEZVOUS) val reader = Sc20ParquetTraceReader(File(traceDirectory), performanceInterferenceModel, getSelectedVmList(), Random(seed)) while (reader.hasNext()) { val (time, workload) = reader.next() - submitted++ delay(max(0, time - simulationContext.clock.millis())) launch { chan.send(Unit) @@ -254,24 +251,26 @@ fun main(args: Array<String>) { // Monitor server events server.events .onEach { - if (it is ServerEvent.StateChanged) + if (it is ServerEvent.StateChanged) { monitor.onVmStateChanged(it.server) - - if (scheduler.submittedVms == submitted && scheduler.runningVms <= 1 && !reader.hasNext()) { - finish.send(Unit) } + + finished.send(Unit) } .collect() } } - finish.receive() - failureDomain?.cancel() - launch { - scheduler.terminate() + while (scheduler.finishedVms + scheduler.unscheduledVms != scheduler.submittedVms || reader.hasNext()) { + finished.receive() } - println(simulationContext.clock.instant()) - println("${System.currentTimeMillis() - start} milliseconds") + + println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") + + failureDomain?.cancel() + scheduler.terminate() + reader.close() + println("[${simulationContext.clock.millis()}] DONE ${System.currentTimeMillis() - start} milliseconds") } runBlocking { diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt index 7f429b89..d005a157 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt @@ -46,6 +46,7 @@ fun main() { .fields() .name("id").type().stringType().noDefault() .name("submissionTime").type().longType().noDefault() + .name("endTime").type().longType().noDefault() .name("maxCores").type().intType().noDefault() .name("requiredMemory").type().longType().noDefault() .endRecord() @@ -92,7 +93,6 @@ fun main() { var vmId = "" var maxCores = -1 var requiredMemory = -1L - var timestamp = -1L var cores = -1 var minTime = Long.MAX_VALUE @@ -112,7 +112,7 @@ fun main() { val values = line.split(" ") vmId = vmFile.name - timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L + val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L cores = values[coreCol].trim().toInt() requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) maxCores = max(maxCores, cores) @@ -150,13 +150,16 @@ fun main() { } } + var maxTime = Long.MIN_VALUE flopsFragments.forEach { fragment -> allFragments.add(fragment) + maxTime = max(maxTime, fragment.tick) } val metaRecord = GenericData.Record(metaSchema) metaRecord.put("id", vmId) metaRecord.put("submissionTime", minTime) + metaRecord.put("endTime", maxTime) metaRecord.put("maxCores", maxCores) metaRecord.put("requiredMemory", requiredMemory) metaWriter.write(metaRecord) |
