diff options
14 files changed, 298 insertions, 88 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt index 3603ae69..c25834a7 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt @@ -1,6 +1,7 @@ package com.atlarge.opendc.compute.virt.service import com.atlarge.odcsim.SimulationContext +import com.atlarge.odcsim.flow.EventFlow import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.Server @@ -19,6 +20,7 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.Job import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch @@ -57,11 +59,11 @@ class SimpleVirtProvisioningService( */ private val activeImages: MutableSet<ImageView> = mutableSetOf() - public var submittedVms = 0L - public var queuedVms = 0L - public var runningVms = 0L - public var finishedVms = 0L - public var unscheduledVms = 0L + public var submittedVms = 0 + public var queuedVms = 0 + public var runningVms = 0 + public var finishedVms = 0 + public var unscheduledVms = 0 private var maxCores = 0 private var maxMemory = 0L @@ -71,6 +73,13 @@ class SimpleVirtProvisioningService( */ private val allocationLogic = allocationPolicy() + /** + * The [EventFlow] to emit the events. + */ + internal val eventFlow = EventFlow<VirtProvisioningEvent>() + + override val events: Flow<VirtProvisioningEvent> = eventFlow + init { launch { val provisionedNodes = provisioningService.nodes() @@ -96,8 +105,17 @@ class SimpleVirtProvisioningService( image: Image, flavor: Flavor ): Server = withContext(coroutineContext) { - submittedVms++ - queuedVms++ + eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + ++submittedVms, + runningVms, + finishedVms, + ++queuedVms, + unscheduledVms + )) + suspendCancellableCoroutine<Server> { cont -> val vmInstance = ImageView(name, image, flavor, cont) incomingImages += vmInstance @@ -141,7 +159,17 @@ class SimpleVirtProvisioningService( if (selectedHv == null) { if (requiredMemory > maxMemory || imageInstance.flavor.cpuCount > maxCores) { - unscheduledVms++ + eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + runningVms, + finishedVms, + queuedVms, + ++unscheduledVms + )) + incomingImages -= imageInstance logger.warn("Failed to spawn ${imageInstance.image}: does not fit [${clock.millis()}]") @@ -168,8 +196,17 @@ class SimpleVirtProvisioningService( ) imageInstance.server = server imageInstance.continuation.resume(server) - queuedVms-- - runningVms++ + + eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + ++runningVms, + finishedVms, + --queuedVms, + unscheduledVms + )) activeImages += imageInstance server.events @@ -178,8 +215,17 @@ class SimpleVirtProvisioningService( is ServerEvent.StateChanged -> { if (event.server.state == ServerState.SHUTOFF) { logger.info { "Server ${event.server.uid} ${event.server.name} ${event.server.flavor} finished." } - runningVms-- - finishedVms++ + + eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + --runningVms, + ++finishedVms, + queuedVms, + unscheduledVms + )) activeImages -= imageInstance selectedHv.provisionedCores -= server.flavor.cpuCount @@ -211,6 +257,7 @@ class SimpleVirtProvisioningService( if (server in hypervisors) { // Corner case for when the hypervisor already exists availableHypervisors += hypervisors.getValue(server) + } else { val hv = HypervisorView( server.uid, @@ -223,11 +270,33 @@ class SimpleVirtProvisioningService( maxMemory = max(maxMemory, server.flavor.memorySize) hypervisors[server] = hv } + + eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + runningVms, + finishedVms, + queuedVms, + unscheduledVms + )) } ServerState.SHUTOFF, ServerState.ERROR -> { val hv = hypervisors[server] ?: return availableHypervisors -= hv + eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + runningVms, + finishedVms, + queuedVms, + unscheduledVms + )) + if (incomingImages.isNotEmpty()) { requestCycle() } @@ -242,6 +311,17 @@ class SimpleVirtProvisioningService( hv.driver = server.services[VirtDriver] availableHypervisors += hv + eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + runningVms, + finishedVms, + queuedVms, + unscheduledVms + )) + hv.driver.events .onEach { event -> if (event is HypervisorEvent.VmsUpdated) { diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningEvent.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningEvent.kt new file mode 100644 index 00000000..39f75913 --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningEvent.kt @@ -0,0 +1,52 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.compute.virt.service + +import com.atlarge.opendc.compute.virt.driver.VirtDriver + + +/** + * An event that is emitted by the [VirtProvisioningService]. + */ +public sealed class VirtProvisioningEvent { + /** + * The service that has emitted the event. + */ + public abstract val provisioner: VirtProvisioningService + + /** + * An event emitted for writing metrics. + */ + data class MetricsAvailable( + override val provisioner: VirtProvisioningService, + public val totalHostCount: Int, + public val availableHostCount: Int, + public val totalVmCount: Int, + public val activeVmCount: Int, + public val inactiveVmCount: Int, + public val waitingVmCount: Int, + public val failedVmCount: Int + ) : VirtProvisioningEvent() +} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt index 2ad7df84..c4cbd711 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt @@ -5,6 +5,7 @@ import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.virt.driver.VirtDriver import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy +import kotlinx.coroutines.flow.Flow /** * A service for VM provisioning on a cloud. @@ -16,6 +17,11 @@ interface VirtProvisioningService { val allocationPolicy: AllocationPolicy /** + * The events emitted by the service. + */ + public val events: Flow<VirtProvisioningEvent> + + /** * Obtain the active hypervisors for this provisioner. */ public suspend fun drivers(): Set<VirtDriver> diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt index ac43b6ac..548400d6 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt @@ -36,6 +36,7 @@ import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.compute.virt.HypervisorEvent import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService +import com.atlarge.opendc.compute.virt.service.VirtProvisioningEvent import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy import com.atlarge.opendc.core.failure.CorrelatedFaultInjector import com.atlarge.opendc.core.failure.FailureDomain @@ -150,13 +151,13 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: Ex // Monitor hypervisor events for (hypervisor in hypervisors) { // TODO Do not expose VirtDriver directly but use Hypervisor class. - reporter.reportHostStateChange(clock.millis(), hypervisor, (hypervisor as SimpleVirtDriver).server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) + reporter.reportHostStateChange(clock.millis(), hypervisor, (hypervisor as SimpleVirtDriver).server) hypervisor.server.events .onEach { event -> val time = clock.millis() when (event) { is ServerEvent.StateChanged -> { - reporter.reportHostStateChange(time, hypervisor, event.server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) + reporter.reportHostStateChange(time, hypervisor, event.server) } } } @@ -173,11 +174,7 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: Ex event.cpuUsage, event.cpuDemand, event.numberOfDeployedImages, - event.hostServer, - scheduler.submittedVms, - scheduler.queuedVms, - scheduler.runningVms, - scheduler.finishedVms + event.hostServer ) } } @@ -188,6 +185,16 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: Ex .onEach { reporter.reportPowerConsumption(hypervisor.server, it) } .launchIn(domain) } + + scheduler.events + .onEach { event -> + when (event) { + is VirtProvisioningEvent.MetricsAvailable -> + reporter.reportProvisionerMetrics(clock.millis(), event) + } + + } + .launchIn(domain) } /** @@ -197,7 +204,7 @@ suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtP val domain = simulationContext.domain try { - var submitted = 0L + var submitted = 0 val finished = Channel<Unit>(Channel.CONFLATED) val hypervisors = TreeSet(scheduler.drivers().map { (it as SimpleVirtDriver).server.name }) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt index 9455cb9d..7d65930c 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt @@ -33,15 +33,11 @@ import com.atlarge.opendc.experiments.sc20.util.DatabaseHelper import com.atlarge.opendc.format.environment.EnvironmentReader import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader import com.atlarge.opendc.format.trace.TraceReader -import kotlinx.coroutines.CoroutineDispatcher -import kotlinx.coroutines.asCoroutineDispatcher -import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking -import kotlinx.coroutines.withContext import mu.KotlinLogging import java.io.Closeable import java.io.File import java.util.concurrent.Executors +import java.util.concurrent.Future import java.util.concurrent.atomic.AtomicInteger import javax.sql.DataSource import kotlin.system.measureTimeMillis @@ -159,11 +155,19 @@ public class ExperimentRunner( val plan = createPlan() val total = plan.size val finished = AtomicInteger() - val dispatcher = Executors.newWorkStealingPool(parallelism).asCoroutineDispatcher() + val executorService = Executors.newCachedThreadPool() + val planIterator = plan.iterator() + val futures = mutableListOf<Future<*>>() - runBlocking { - val mainDispatcher = coroutineContext[CoroutineDispatcher.Key]!! - for (run in plan) { + while (planIterator.hasNext()) { + futures.clear() + + repeat(parallelism) { + if (!planIterator.hasNext()) { + return@repeat + } + + val run = planIterator.next() val scenarioId = scenarioIds[run.scenario]!! rawTraceReaders.computeIfAbsent(run.scenario.workload.name) { name -> @@ -171,15 +175,14 @@ public class ExperimentRunner( Sc20RawParquetTraceReader(File(tracePath, name)) } - launch(dispatcher) { - launch(mainDispatcher) { + val future = executorService.submit { + synchronized(helper) { helper.startRun(scenarioId, run.id) } logger.info { "[${finished.get()}/$total] Starting run ($scenarioId, ${run.id})" } try { - val duration = measureTimeMillis { val reporter = reporterProvider.createReporter(scenarioIds[run.scenario]!!, run.id) val traceReader = createTraceReader(run.scenario.workload.name, performanceInterferenceModel, run) @@ -187,6 +190,7 @@ public class ExperimentRunner( try { run.scenario(run, reporter, environmentReader, traceReader) + logger.info { "Done" } } finally { reporter.close() } @@ -195,17 +199,23 @@ public class ExperimentRunner( finished.incrementAndGet() logger.info { "[${finished.get()}/$total] Finished run ($scenarioId, ${run.id}) in $duration milliseconds" } - withContext(mainDispatcher) { + synchronized(helper) { helper.finishRun(scenarioId, run.id, hasFailed = false) } } catch (e: Throwable) { logger.error("A run has failed", e) finished.incrementAndGet() - withContext(mainDispatcher) { + synchronized(helper) { helper.finishRun(scenarioId, run.id, hasFailed = true) } } } + + futures += future + } + + for (future in futures) { + future.get() } } } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt index b072fb12..9cbfcdc1 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt @@ -29,6 +29,7 @@ import com.atlarge.opendc.experiments.sc20.reporter.ExperimentPostgresReporter import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporter import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporterProvider import com.atlarge.opendc.experiments.sc20.reporter.PostgresHostMetricsWriter +import com.atlarge.opendc.experiments.sc20.reporter.PostgresProvisionerMetricsWriter import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader import com.atlarge.opendc.format.trace.sc20.Sc20VmPlacementReader import com.github.ajalt.clikt.core.CliktCommand @@ -132,6 +133,7 @@ class ExperimentCli : CliktCommand(name = "sc20-experiment") { override fun run() { val ds = HikariDataSource() + ds.maximumPoolSize = Runtime.getRuntime().availableProcessors() * 3 ds.jdbcUrl = jdbcUrl ds.addDataSourceProperty("reWriteBatchedInserts", "true") @@ -178,10 +180,12 @@ internal sealed class Reporter(name: String) : OptionGroup(name), ExperimentRepo override fun createReporter(scenario: Long, run: Int): ExperimentReporter { val hostWriter = PostgresHostMetricsWriter(ds, batchSize) - val delegate = ExperimentPostgresReporter(scenario, run, hostWriter) + val provisionerWriter = PostgresProvisionerMetricsWriter(ds, batchSize) + val delegate = ExperimentPostgresReporter(scenario, run, hostWriter, provisionerWriter) return object : ExperimentReporter by delegate { override fun close() { hostWriter.close() + provisionerWriter.close() } } } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt index a4a42b6e..11b35bf9 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt @@ -69,10 +69,10 @@ object HorVerPortfolio : AbstractSc20Portfolio("horizontal_vs_vertical") { ) override val workloads = listOf( - // Workload("solvinity", 0.1), - // Workload("solvinity", 0.25), + Workload("solvinity", 0.1), + Workload("solvinity", 0.25), Workload("solvinity", 0.5), - Workload("solvinity", 0.10) + Workload("solvinity", 1.0) ) override val operationalPhenomena = listOf( diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporter.kt index 1c752cb1..049035cc 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporter.kt @@ -26,6 +26,7 @@ package com.atlarge.opendc.experiments.sc20.reporter import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.virt.driver.VirtDriver +import com.atlarge.opendc.compute.virt.service.VirtProvisioningEvent import java.io.Closeable /** @@ -43,17 +44,13 @@ interface ExperimentReporter : Closeable { fun reportHostStateChange( time: Long, driver: VirtDriver, - server: Server, - submittedVms: Long, - queuedVms: Long, - runningVms: Long, - finishedVms: Long + server: Server ) {} /** * Report the power consumption of a host. */ - fun reportPowerConsumption(host: Server, draw: Double) + fun reportPowerConsumption(host: Server, draw: Double) {} /** * This method is invoked for a host for each slice that is finishes. @@ -68,10 +65,11 @@ interface ExperimentReporter : Closeable { cpuDemand: Double, numberOfDeployedImages: Int, hostServer: Server, - submittedVms: Long, - queuedVms: Long, - runningVms: Long, - finishedVms: Long, duration: Long = 5 * 60 * 1000L ) {} + + /** + * This method is invoked for a provisioner event. + */ + fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) {} } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetExperimentReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetExperimentReporter.kt index 58d384e7..87bf524f 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetExperimentReporter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetExperimentReporter.kt @@ -24,7 +24,6 @@ package com.atlarge.opendc.experiments.sc20.reporter -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.virt.driver.VirtDriver @@ -90,11 +89,7 @@ class ExperimentParquetReporter(destination: File) : override fun reportHostStateChange( time: Long, driver: VirtDriver, - server: Server, - submittedVms: Long, - queuedVms: Long, - runningVms: Long, - finishedVms: Long + server: Server ) { logger.info("Host ${server.uid} changed state ${server.state} [$time]") @@ -111,10 +106,6 @@ class ExperimentParquetReporter(destination: File) : 0.0, 0, server, - submittedVms, - queuedVms, - runningVms, - finishedVms, duration ) @@ -141,10 +132,6 @@ class ExperimentParquetReporter(destination: File) : cpuDemand: Double, numberOfDeployedImages: Int, hostServer: Server, - submittedVms: Long, - queuedVms: Long, - runningVms: Long, - finishedVms: Long, duration: Long ) { val record = GenericData.Record(schema) @@ -161,10 +148,10 @@ class ExperimentParquetReporter(destination: File) : record.put("host_state", hostServer.state) record.put("host_usage", cpuUsage) record.put("power_draw", lastPowerConsumption[hostServer] ?: 200.0) - record.put("total_submitted_vms", submittedVms) - record.put("total_queued_vms", queuedVms) - record.put("total_running_vms", runningVms) - record.put("total_finished_vms", finishedVms) + record.put("total_submitted_vms", -1) + record.put("total_queued_vms", -1) + record.put("total_running_vms", -1) + record.put("total_finished_vms", -1) queue.put(record) } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt index a92278d8..6f220640 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt @@ -24,15 +24,20 @@ package com.atlarge.opendc.experiments.sc20.reporter -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.virt.driver.VirtDriver +import com.atlarge.opendc.compute.virt.service.VirtProvisioningEvent import mu.KotlinLogging private val logger = KotlinLogging.logger {} -class ExperimentPostgresReporter(val scenario: Long, val run: Int, val writer: PostgresHostMetricsWriter) : ExperimentReporter { +class ExperimentPostgresReporter( + val scenario: Long, + val run: Int, + val hostWriter: PostgresHostMetricsWriter, + val provisionerWriter: PostgresProvisionerMetricsWriter +) : ExperimentReporter { private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>() override fun reportVmStateChange(time: Long, server: Server) {} @@ -40,11 +45,7 @@ class ExperimentPostgresReporter(val scenario: Long, val run: Int, val writer: P override fun reportHostStateChange( time: Long, driver: VirtDriver, - server: Server, - submittedVms: Long, - queuedVms: Long, - runningVms: Long, - finishedVms: Long + server: Server ) { val lastServerState = lastServerStates[server] logger.debug("Host ${server.uid} changed state ${server.state} [$time]") @@ -61,10 +62,6 @@ class ExperimentPostgresReporter(val scenario: Long, val run: Int, val writer: P 0.0, 0, server, - submittedVms, - queuedVms, - runningVms, - finishedVms, duration ) @@ -93,13 +90,9 @@ class ExperimentPostgresReporter(val scenario: Long, val run: Int, val writer: P cpuDemand: Double, numberOfDeployedImages: Int, hostServer: Server, - submittedVms: Long, - queuedVms: Long, - runningVms: Long, - finishedVms: Long, duration: Long ) { - writer.write( + hostWriter.write( scenario, run, HostMetrics( time, duration, @@ -116,5 +109,22 @@ class ExperimentPostgresReporter(val scenario: Long, val run: Int, val writer: P ) } + override fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) { + provisionerWriter.write( + scenario, + run, + ProvisionerMetrics( + time, + event.totalHostCount, + event.availableHostCount, + event.totalVmCount, + event.activeVmCount, + event.inactiveVmCount, + event.waitingVmCount, + event.failedVmCount + ) + ) + } + override fun close() {} } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresHostMetricsWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresHostMetricsWriter.kt index 55b80e4c..5eb55f20 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresHostMetricsWriter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresHostMetricsWriter.kt @@ -54,4 +54,6 @@ public class PostgresHostMetricsWriter(ds: DataSource, batchSize: Int) : stmt.setDouble(13, action.metrics.cpuDemand) stmt.setDouble(14, action.metrics.powerDraw) } + + override fun toString(): String = "host-writer" } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt index 367a3a5a..33c2d40e 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt @@ -48,7 +48,7 @@ public abstract class PostgresMetricsWriter<T>( /** * The thread for the actual writer. */ - private val writerThread: Thread = thread(name = "host-metrics-writer") { run() } + private val writerThread: Thread = thread(name = "metrics-writer") { run() } /** * Write the specified metrics to the database. @@ -79,6 +79,7 @@ public abstract class PostgresMetricsWriter<T>( * Start the writer thread. */ override fun run() { + writerThread.name = toString() val conn = ds.connection var batch = 0 @@ -114,7 +115,9 @@ public abstract class PostgresMetricsWriter<T>( } } } + } finally { + stmt.executeBatch() conn.commit() conn.close() } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresProvisionerMetricsWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresProvisionerMetricsWriter.kt new file mode 100644 index 00000000..7bc88959 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresProvisionerMetricsWriter.kt @@ -0,0 +1,55 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.reporter + +import java.sql.Connection +import java.sql.PreparedStatement +import java.sql.Timestamp +import javax.sql.DataSource + +/** + * A [PostgresMetricsWriter] for persisting [ProvisionerMetrics]. + */ +public class PostgresProvisionerMetricsWriter(ds: DataSource, batchSize: Int) : + PostgresMetricsWriter<ProvisionerMetrics>(ds, batchSize) { + override fun createStatement(conn: Connection): PreparedStatement { + return conn.prepareStatement("INSERT INTO provisioner_metrics (scenario_id, run_id, timestamp, host_total_count, host_available_count, vm_total_count, vm_active_count, vm_inactive_count, vm_waiting_count, vm_failed_count) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") + } + + override fun persist(action: Action.Write<ProvisionerMetrics>, stmt: PreparedStatement) { + stmt.setLong(1, action.scenario) + stmt.setInt(2, action.run) + stmt.setTimestamp(3, Timestamp(action.metrics.time)) + stmt.setInt(4, action.metrics.totalHostCount) + stmt.setInt(5, action.metrics.availableHostCount) + stmt.setInt(6, action.metrics.totalVmCount) + stmt.setInt(7, action.metrics.activeVmCount) + stmt.setInt(8, action.metrics.inactiveVmCount) + stmt.setInt(9, action.metrics.waitingVmCount) + stmt.setInt(10, action.metrics.failedVmCount) + } + + override fun toString(): String = "provisioner-writer" +} diff --git a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt index 5177c04a..1edb7bc2 100644 --- a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt +++ b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt @@ -158,7 +158,7 @@ class Sc20IntegrationTest { var totalOvercommissionedBurst = 0L var totalInterferedBurst = 0L - override suspend fun reportHostSlice( + override fun reportHostSlice( time: Long, requestedBurst: Long, grantedBurst: Long, @@ -168,10 +168,6 @@ class Sc20IntegrationTest { cpuDemand: Double, numberOfDeployedImages: Int, hostServer: Server, - submittedVms: Long, - queuedVms: Long, - runningVms: Long, - finishedVms: Long, duration: Long ) { totalRequestedBurst += requestedBurst |
