diff options
Diffstat (limited to 'opendc')
3 files changed, 364 insertions, 254 deletions
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt index bac0de21..4b8b80a8 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt @@ -1,66 +1,34 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + package com.atlarge.opendc.experiments.sc20 -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Server -import com.atlarge.opendc.compute.core.ServerState -import com.atlarge.opendc.compute.metal.driver.BareMetalDriver import com.atlarge.opendc.compute.virt.driver.VirtDriver -import kotlinx.coroutines.flow.first -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData -import org.apache.hadoop.fs.Path -import org.apache.parquet.avro.AvroParquetWriter -import org.apache.parquet.hadoop.metadata.CompressionCodecName import java.io.Closeable -import java.util.concurrent.ArrayBlockingQueue -import kotlin.concurrent.thread - -class Sc20Monitor( - destination: String -) : Closeable { - private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>() - private val schema = SchemaBuilder - .record("slice") - .namespace("com.atlarge.opendc.experiments.sc20") - .fields() - .name("time").type().longType().noDefault() - .name("duration").type().longType().noDefault() - .name("requestedBurst").type().longType().noDefault() - .name("grantedBurst").type().longType().noDefault() - .name("overcommissionedBurst").type().longType().noDefault() - .name("interferedBurst").type().longType().noDefault() - .name("cpuUsage").type().doubleType().noDefault() - .name("cpuDemand").type().doubleType().noDefault() - .name("numberOfDeployedImages").type().intType().noDefault() - .name("server").type().stringType().noDefault() - .name("hostState").type().stringType().noDefault() - .name("hostUsage").type().doubleType().noDefault() - .name("powerDraw").type().doubleType().noDefault() - .name("totalSubmittedVms").type().longType().noDefault() - .name("totalQueuedVms").type().longType().noDefault() - .name("totalRunningVms").type().longType().noDefault() - .name("totalFinishedVms").type().longType().noDefault() - .endRecord() - private val writer = AvroParquetWriter.builder<GenericData.Record>(Path(destination)) - .withSchema(schema) - .withCompressionCodec(CompressionCodecName.SNAPPY) - .withPageSize(4 * 1024 * 1024) // For compression - .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) - .build() - private val queue = ArrayBlockingQueue<GenericData.Record>(2048) - private val writerThread = thread(start = true, name = "sc20-writer") { - try { - while (true) { - val record = queue.take() - writer.write(record) - } - } catch (e: InterruptedException) { - // Do not rethrow this - } finally { - writer.close() - } - } +interface Sc20Monitor : Closeable { suspend fun onVmStateChanged(server: Server) {} suspend fun serverStateChanged( @@ -70,32 +38,7 @@ class Sc20Monitor( queuedVms: Long, runningVms: Long, finishedVms: Long - ) { - val lastServerState = lastServerStates[server] - if (server.state == ServerState.SHUTOFF && lastServerState != null) { - val duration = simulationContext.clock.millis() - lastServerState.second - onSliceFinish( - simulationContext.clock.millis(), - 0, - 0, - 0, - 0, - 0.0, - 0.0, - 0, - server, - submittedVms, - queuedVms, - runningVms, - finishedVms, - duration - ) - } - - println("[${simulationContext.clock.millis()}] HOST ${server.uid} ${server.state}") - - lastServerStates[server] = Pair(server.state, simulationContext.clock.millis()) - } + ) {} suspend fun onSliceFinish( time: Long, @@ -112,39 +55,5 @@ class Sc20Monitor( runningVms: Long, finishedVms: Long, duration: Long = 5 * 60 * 1000L - ) { - // Assume for now that the host is not virtualized and measure the current power draw - val driver = hostServer.services[BareMetalDriver.Key] - val usage = driver.usage.first() - val powerDraw = driver.powerDraw.first() - - val record = GenericData.Record(schema) - record.put("time", time) - record.put("duration", duration) - record.put("requestedBurst", requestedBurst) - record.put("grantedBurst", grantedBurst) - record.put("overcommissionedBurst", overcommissionedBurst) - record.put("interferedBurst", interferedBurst) - record.put("cpuUsage", cpuUsage) - record.put("cpuDemand", cpuDemand) - record.put("numberOfDeployedImages", numberOfDeployedImages) - record.put("server", hostServer.uid) - record.put("hostState", hostServer.state) - record.put("hostUsage", usage) - record.put("powerDraw", powerDraw) - record.put("totalSubmittedVms", submittedVms) - record.put("totalQueuedVms", queuedVms) - record.put("totalRunningVms", runningVms) - record.put("totalFinishedVms", finishedVms) - - queue.put(record) - } - - override fun close() { - // Busy loop to wait for writer thread to finish - while (queue.isNotEmpty()) { - Thread.sleep(500) - } - writerThread.interrupt() - } + ) {} } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetMonitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetMonitor.kt new file mode 100644 index 00000000..5e554196 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetMonitor.kt @@ -0,0 +1,149 @@ +package com.atlarge.opendc.experiments.sc20 + +import com.atlarge.odcsim.simulationContext +import com.atlarge.opendc.compute.core.Server +import com.atlarge.opendc.compute.core.ServerState +import com.atlarge.opendc.compute.metal.driver.BareMetalDriver +import com.atlarge.opendc.compute.virt.driver.VirtDriver +import kotlinx.coroutines.flow.first +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericData +import org.apache.hadoop.fs.Path +import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.metadata.CompressionCodecName +import java.util.concurrent.ArrayBlockingQueue +import kotlin.concurrent.thread + +class Sc20ParquetMonitor( + destination: String +) : Sc20Monitor { + private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>() + private val schema = SchemaBuilder + .record("slice") + .namespace("com.atlarge.opendc.experiments.sc20") + .fields() + .name("time").type().longType().noDefault() + .name("duration").type().longType().noDefault() + .name("requestedBurst").type().longType().noDefault() + .name("grantedBurst").type().longType().noDefault() + .name("overcommissionedBurst").type().longType().noDefault() + .name("interferedBurst").type().longType().noDefault() + .name("cpuUsage").type().doubleType().noDefault() + .name("cpuDemand").type().doubleType().noDefault() + .name("numberOfDeployedImages").type().intType().noDefault() + .name("server").type().stringType().noDefault() + .name("hostState").type().stringType().noDefault() + .name("hostUsage").type().doubleType().noDefault() + .name("powerDraw").type().doubleType().noDefault() + .name("totalSubmittedVms").type().longType().noDefault() + .name("totalQueuedVms").type().longType().noDefault() + .name("totalRunningVms").type().longType().noDefault() + .name("totalFinishedVms").type().longType().noDefault() + .endRecord() + private val writer = AvroParquetWriter.builder<GenericData.Record>(Path(destination)) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withPageSize(4 * 1024 * 1024) // For compression + .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) + .build() + private val queue = ArrayBlockingQueue<GenericData.Record>(2048) + private val writerThread = thread(start = true, name = "sc20-writer") { + try { + while (true) { + val record = queue.take() + writer.write(record) + } + } catch (e: InterruptedException) { + // Do not rethrow this + } finally { + writer.close() + } + } + + override suspend fun onVmStateChanged(server: Server) {} + + override suspend fun serverStateChanged( + driver: VirtDriver, + server: Server, + submittedVms: Long, + queuedVms: Long, + runningVms: Long, + finishedVms: Long + ) { + val lastServerState = lastServerStates[server] + if (server.state == ServerState.SHUTOFF && lastServerState != null) { + val duration = simulationContext.clock.millis() - lastServerState.second + onSliceFinish( + simulationContext.clock.millis(), + 0, + 0, + 0, + 0, + 0.0, + 0.0, + 0, + server, + submittedVms, + queuedVms, + runningVms, + finishedVms, + duration + ) + } + + println("[${simulationContext.clock.millis()}] HOST ${server.uid} ${server.state}") + + lastServerStates[server] = Pair(server.state, simulationContext.clock.millis()) + } + + override suspend fun onSliceFinish( + time: Long, + requestedBurst: Long, + grantedBurst: Long, + overcommissionedBurst: Long, + interferedBurst: Long, + cpuUsage: Double, + cpuDemand: Double, + numberOfDeployedImages: Int, + hostServer: Server, + submittedVms: Long, + queuedVms: Long, + runningVms: Long, + finishedVms: Long, + duration: Long + ) { + // Assume for now that the host is not virtualized and measure the current power draw + val driver = hostServer.services[BareMetalDriver.Key] + val usage = driver.usage.first() + val powerDraw = driver.powerDraw.first() + + val record = GenericData.Record(schema) + record.put("time", time) + record.put("duration", duration) + record.put("requestedBurst", requestedBurst) + record.put("grantedBurst", grantedBurst) + record.put("overcommissionedBurst", overcommissionedBurst) + record.put("interferedBurst", interferedBurst) + record.put("cpuUsage", cpuUsage) + record.put("cpuDemand", cpuDemand) + record.put("numberOfDeployedImages", numberOfDeployedImages) + record.put("server", hostServer.uid) + record.put("hostState", hostServer.state) + record.put("hostUsage", usage) + record.put("powerDraw", powerDraw) + record.put("totalSubmittedVms", submittedVms) + record.put("totalQueuedVms", queuedVms) + record.put("totalRunningVms", runningVms) + record.put("totalFinishedVms", finishedVms) + + queue.put(record) + } + + override fun close() { + // Busy loop to wait for writer thread to finish + while (queue.isNotEmpty()) { + Thread.sleep(500) + } + writerThread.interrupt() + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index a2f609a5..6f1e9aae 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -29,11 +29,14 @@ import com.atlarge.odcsim.SimulationEngineProvider import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.ServerEvent +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel +import com.atlarge.opendc.compute.core.workload.VmWorkload import com.atlarge.opendc.compute.metal.NODE_CLUSTER import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.compute.virt.HypervisorEvent import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService +import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy import com.atlarge.opendc.compute.virt.service.allocation.NumberOfActiveServersAllocationPolicy @@ -43,7 +46,9 @@ import com.atlarge.opendc.compute.virt.service.allocation.ReplayAllocationPolicy import com.atlarge.opendc.core.failure.CorrelatedFaultInjector import com.atlarge.opendc.core.failure.FailureDomain import com.atlarge.opendc.core.failure.FaultInjector +import com.atlarge.opendc.format.environment.EnvironmentReader import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader +import com.atlarge.opendc.format.trace.TraceReader import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader import com.atlarge.opendc.format.trace.sc20.Sc20VmPlacementReader import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper @@ -53,15 +58,19 @@ import com.xenomachina.argparser.default import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withContext import java.io.File import java.io.FileReader +import java.lang.IllegalArgumentException import java.util.ServiceLoader +import java.util.TreeSet import kotlin.math.max import kotlin.random.Random @@ -102,6 +111,25 @@ class ExperimentParameters(parser: ArgParser) { } /** + * Construct the failure domain for the experiments. + */ +suspend fun createFailureDomain(seed: Int, bareMetalProvisioner: ProvisioningService, chan: Channel<Unit>): Domain { + val root = simulationContext.domain + val domain = root.newDomain(name = "failures") + domain.launch { + chan.receive() + val random = Random(seed) + val injectors = mutableMapOf<String, FaultInjector>() + for (node in bareMetalProvisioner.nodes()) { + val cluster = node.metadata[NODE_CLUSTER] as String + val injector = injectors.getOrPut(cluster) { createFaultInjector(simulationContext.domain, random) } + injector.enqueue(node.metadata["driver"] as FailureDomain) + } + } + return domain +} + +/** * Obtain the [FaultInjector] to use for the experiments. */ fun createFaultInjector(domain: Domain, random: Random): FaultInjector { @@ -116,141 +144,89 @@ fun createFaultInjector(domain: Domain, random: Random): FaultInjector { } /** - * Main entry point of the experiment. + * Create the trace reader from which the VM workloads are read. */ -@OptIn(ExperimentalCoroutinesApi::class) -fun main(args: Array<String>) { - ArgParser(args).parseInto(::ExperimentParameters).run { - println("trace-directory: $traceDirectory") - println("environment-file: $environmentFile") - println("performance-interference-file: $performanceInterferenceFile") - println("vm-placement-file: $vmPlacementFile") - println("selected-vms-file: $selectedVmsFile") - println("seed: $seed") - println("failures: $failures") - println("allocation-policy: $allocationPolicy") - - val start = System.currentTimeMillis() - val monitor = Sc20Monitor(outputFile) - - val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - val system = provider("test") - val root = system.newDomain("root") - val chan = Channel<Unit>(Channel.CONFLATED) - - val vmPlacements = if (vmPlacementFile == null) { - emptyMap() - } else { - Sc20VmPlacementReader(File(vmPlacementFile!!).inputStream().buffered()).construct() - } - - val allocationPolicies = mapOf( - "mem" to AvailableMemoryAllocationPolicy(), - "mem-inv" to AvailableMemoryAllocationPolicy(true), - "core-mem" to AvailableCoreMemoryAllocationPolicy(), - "core-mem-inv" to AvailableCoreMemoryAllocationPolicy(true), - "active-servers" to NumberOfActiveServersAllocationPolicy(), - "active-servers-inv" to NumberOfActiveServersAllocationPolicy(true), - "provisioned-cores" to ProvisionedCoresAllocationPolicy(), - "provisioned-cores-inv" to ProvisionedCoresAllocationPolicy(true), - "replay" to ReplayAllocationPolicy(vmPlacements), - "random" to RandomAllocationPolicy(Random(seed)) - ) - - if (allocationPolicy !in allocationPolicies) { - println("error: unknown allocation policy $allocationPolicy") - println("Available:") - allocationPolicies.keys.forEach { key -> println(key) } - } - - root.launch { - val environment = Sc20ClusterEnvironmentReader(File(environmentFile)) - .use { it.construct(root) } - - val performanceInterferenceStream = if (performanceInterferenceFile != null) { - File(performanceInterferenceFile!!).inputStream().buffered() - } else { - object {}.javaClass.getResourceAsStream("/env/performance-interference.json") - } - - val performanceInterferenceModel = Sc20PerformanceInterferenceReader(performanceInterferenceStream) - .construct() - - println(simulationContext.clock.instant()) +fun createTraceReader(path: File, performanceInterferenceModel: PerformanceInterferenceModel, vms: List<String>, seed: Int): Sc20ParquetTraceReader { + return Sc20ParquetTraceReader(path, performanceInterferenceModel, vms, Random(seed)) +} - val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService.Key] +/** + * Construct the environment for a VM provisioner and return the provisioner instance. + */ +suspend fun createProvisioner( + root: Domain, + environmentReader: EnvironmentReader, + allocationPolicy: AllocationPolicy +): Pair<ProvisioningService, SimpleVirtProvisioningService> = withContext(root.coroutineContext) { + val environment = environmentReader.use { it.construct(root) } + val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService.Key] - // Wait for the bare metal nodes to be spawned - delay(10) + // Wait for the bare metal nodes to be spawned + delay(10) - val scheduler = SimpleVirtProvisioningService( - allocationPolicies.getValue(allocationPolicy), - simulationContext, - bareMetalProvisioner - ) + val scheduler = SimpleVirtProvisioningService(allocationPolicy, simulationContext, bareMetalProvisioner) - // Wait for the hypervisors to be spawned - delay(10) + // Wait for the hypervisors to be spawned + delay(10) - val hypervisors = scheduler.drivers() + bareMetalProvisioner to scheduler +} - // Monitor hypervisor events - for (hypervisor in hypervisors) { - // TODO Do not expose VirtDriver directly but use Hypervisor class. - monitor.serverStateChanged(hypervisor, (hypervisor as SimpleVirtDriver).server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) - hypervisor.server.events - .onEach { event -> - when (event) { - is ServerEvent.StateChanged -> { - monitor.serverStateChanged(hypervisor, event.server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) - } - } - } - .launchIn(this) - hypervisor.events - .onEach { event -> - when (event) { - is HypervisorEvent.SliceFinished -> monitor.onSliceFinish( - simulationContext.clock.millis(), - event.requestedBurst, - event.grantedBurst, - event.overcommissionedBurst, - event.interferedBurst, - event.cpuUsage, - event.cpuDemand, - event.numberOfDeployedImages, - event.hostServer, - scheduler.submittedVms, - scheduler.queuedVms, - scheduler.runningVms, - scheduler.finishedVms - ) - } +/** + * Attach the specified monitor to the VM provisioner. + */ +@OptIn(ExperimentalCoroutinesApi::class) +suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Sc20Monitor) { + val domain = simulationContext.domain + val hypervisors = scheduler.drivers() + + // Monitor hypervisor events + for (hypervisor in hypervisors) { + // TODO Do not expose VirtDriver directly but use Hypervisor class. + monitor.serverStateChanged(hypervisor, (hypervisor as SimpleVirtDriver).server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) + hypervisor.server.events + .onEach { event -> + when (event) { + is ServerEvent.StateChanged -> { + monitor.serverStateChanged(hypervisor, event.server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) } - .launchIn(this) + } } - - val failureDomain = if (failures) { - println("ENABLING failures") - val domain = root.newDomain(name = "failures") - domain.launch { - chan.receive() - val random = Random(seed) - val injectors = mutableMapOf<String, FaultInjector>() - for (node in bareMetalProvisioner.nodes()) { - val cluster = node.metadata[NODE_CLUSTER] as String - val injector = injectors.getOrPut(cluster) { createFaultInjector(simulationContext.domain, random) } - injector.enqueue(node.metadata["driver"] as FailureDomain) - } + .launchIn(domain) + hypervisor.events + .onEach { event -> + when (event) { + is HypervisorEvent.SliceFinished -> monitor.onSliceFinish( + simulationContext.clock.millis(), + event.requestedBurst, + event.grantedBurst, + event.overcommissionedBurst, + event.interferedBurst, + event.cpuUsage, + event.cpuDemand, + event.numberOfDeployedImages, + event.hostServer, + scheduler.submittedVms, + scheduler.queuedVms, + scheduler.runningVms, + scheduler.finishedVms + ) } - domain - } else { - null } + .launchIn(domain) + } +} +/** + * Process the trace. + */ +suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, vmPlacements: Map<String, String>, monitor: Sc20Monitor) { + try { + coroutineScope { var submitted = 0L - val finished = Channel<Unit>(Channel.RENDEZVOUS) - val reader = Sc20ParquetTraceReader(File(traceDirectory), performanceInterferenceModel, getSelectedVmList(), Random(seed)) + val finished = Channel<Unit>(Channel.CONFLATED) + val hypervisors = TreeSet(scheduler.drivers().map { (it as SimpleVirtDriver).server.name }) + while (reader.hasNext()) { val (time, workload) = reader.next() @@ -262,9 +238,8 @@ fun main(args: Array<String>) { println("Could not find placement data in VM placement file for VM $vmId") continue } - val machinesInCluster = - hypervisors.filter { (it as SimpleVirtDriver).server.name.contains(clusterName) } - if (machinesInCluster.isEmpty()) { + val machineInCluster = hypervisors.ceiling(clusterName)?.let { it.contains(clusterName) } ?: false + if (machineInCluster) { println("Ignored VM") continue } @@ -294,21 +269,98 @@ fun main(args: Array<String>) { while (scheduler.finishedVms + scheduler.unscheduledVms != submitted || reader.hasNext()) { finished.receive() } + } + } finally { + reader.close() + } +} - println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") - - failureDomain?.cancel() - scheduler.terminate() - reader.close() - println("[${simulationContext.clock.millis()}] DONE ${System.currentTimeMillis() - start} milliseconds") +/** + * Main entry point of the experiment. + */ +@OptIn(ExperimentalCoroutinesApi::class) +fun main(args: Array<String>) { + val cli = ArgParser(args).parseInto(::ExperimentParameters) + println("trace-directory: ${cli.traceDirectory}") + println("environment-file: ${cli.environmentFile}") + println("performance-interference-file: ${cli.performanceInterferenceFile}") + println("selected-vms-file: ${cli.selectedVmsFile}") + println("seed: ${cli.seed}") + println("failures: ${cli.failures}") + println("allocation-policy: ${cli.allocationPolicy}") + + val start = System.currentTimeMillis() + val monitor: Sc20Monitor = Sc20ParquetMonitor(cli.outputFile) + + val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() + val system = provider("test") + val root = system.newDomain("root") + + val chan = Channel<Unit>(Channel.CONFLATED) + + val performanceInterferenceModel = try { + val performanceInterferenceStream = if (cli.performanceInterferenceFile != null) { + File(cli.performanceInterferenceFile!!).inputStream().buffered() + } else { + object {}.javaClass.getResourceAsStream("/env/performance-interference.json") } + Sc20PerformanceInterferenceReader(performanceInterferenceStream) + .construct() + } catch (e: Throwable) { + monitor.close() + throw e + } + val vmPlacements = if (cli.vmPlacementFile == null) { + emptyMap() + } else { + Sc20VmPlacementReader(File(cli.vmPlacementFile!!).inputStream().buffered()).construct() + } + val environmentReader = Sc20ClusterEnvironmentReader(File(cli.environmentFile)) + val traceReader = try { + createTraceReader(File(cli.traceDirectory), performanceInterferenceModel, cli.getSelectedVmList(), cli.seed) + } catch (e: Throwable) { + monitor.close() + throw e + } + val allocationPolicy = when (cli.allocationPolicy) { + "mem" -> AvailableMemoryAllocationPolicy() + "mem-inv" -> AvailableMemoryAllocationPolicy(true) + "core-mem" -> AvailableCoreMemoryAllocationPolicy() + "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true) + "active-servers" -> NumberOfActiveServersAllocationPolicy() + "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true) + "provisioned-cores" -> ProvisionedCoresAllocationPolicy() + "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true) + "random" -> RandomAllocationPolicy(Random(cli.seed)) + "replay" -> ReplayAllocationPolicy(vmPlacements) + else -> throw IllegalArgumentException("Unknown allocation policy: ${cli.allocationPolicy}") + } + + root.launch { + val (bareMetalProvisioner, scheduler) = createProvisioner(root, environmentReader, allocationPolicy) - runBlocking { - system.run() - system.terminate() + val failureDomain = if (cli.failures) { + println("ENABLING failures") + createFailureDomain(cli.seed, bareMetalProvisioner, chan) + } else { + null } - // Explicitly close the monitor to flush its buffer - monitor.close() + attachMonitor(scheduler, monitor) + processTrace(traceReader, scheduler, chan, vmPlacements, monitor) + + println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") + + failureDomain?.cancel() + scheduler.terminate() + println("[${simulationContext.clock.millis()}] DONE ${System.currentTimeMillis() - start} milliseconds") } + + runBlocking { + system.run() + system.terminate() + } + + // Explicitly close the monitor to flush its buffer + monitor.close() } |
